42 std::shared_ptr<ndn::svs::SVSPubSub> svsPubSub,
43 const std::string &syncPrefix,
const std::string &upstreamEdgeName,
44 std::optional<std::shared_ptr<CongestionReporter>> congestionReporter);
48 void setConsumerCallback(ConsumerCallback consumerCallback);
50 bool repartition(std::vector<uint32_t> partitions);
52 std::vector<u_int32_t> getPartitions();
57 void validatePartitionConfiguration(uint32_t numberOfPartitions,
58 uint32_t consumerPartitionIndex,
59 uint32_t totalNumberOfConsumers);
61 void subscribeCallBack(
const ndn::svs::SVSPubSub::SubscriptionData &subData);
63 ndn::Name prepareDataName(uint32_t partitionNumber);
71 void setTopicPartitions(uint32_t numberOfPartitions,
72 uint32_t consumerPartitionIndex,
73 uint32_t totalNumberOfConsumers);
75 void subscribeToTopicPartition(uint64_t topicPartition);
77 void unsubscribeFromAllPartitions();
81 void cleanUpTimestamps(
82 std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);
84 void reportCongestion(CongestionReason congestionReason);
86 uint64_t determineIdleTime(
87 std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);
90 const std::weak_ptr<ndn::svs::SVSPubSub> m_svsPubSub;
91 const std::string m_subTopic;
93 std::vector<uint32_t> m_partitions;
95 std::vector<uint32_t> m_subscriptionHandles;
97 std::deque<std::chrono::time_point<std::chrono::steady_clock>>
98 m_consumptionTimestamps;
101 std::chrono::seconds m_maxConsumptionAge = std::chrono::seconds(1);
103 std::optional<ConsumerCallback> m_consumerCallback;
105 std::optional<std::shared_ptr<CongestionReporter>> m_congestionReporter;
107 const std::string &m_upstreamEdgeName;
109 std::chrono::time_point<std::chrono::steady_clock> m_idleSince;