IceFlow NDN-based stream processing library written in C++
Loading...
Searching...
No Matches
producer.hpp
1/*
2 * Copyright 2024 The IceFlow Authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * SPDX-License-Identifier: Apache-2.0
17 */
18
19#ifndef ICEFLOW_PRODUCER_HPP
20#define ICEFLOW_PRODUCER_HPP
21
22#include "ndn-svs/svspubsub.hpp"
23
24#include <random>
25#include <unordered_set>
26
27#include <time.h>
28
29#ifdef USE_GRPC
30
31#include "node-executor.grpc.pb.h"
32#include "node-executor.pb.h"
33
34#endif // USE_GRPC
35
36#include "congestion-reporter.hpp"
37#include "stats.hpp"
38
39namespace iceflow {
40
48public:
50 std::shared_ptr<ndn::svs::SVSPubSub> svsPubSub,
51 const std::string &nodePrefix, const std::string &syncPrefix,
52 const std::string &upstreamEdgeName, uint32_t numberOfPartitions,
53 std::optional<std::shared_ptr<CongestionReporter>> congestionReporter);
54
56
57 void pushData(const std::vector<uint8_t> &data);
58
59 void setTopicPartitions(uint64_t numberOfPartitions);
60
61 EdgeProductionStats getProductionStats();
62
63private:
64 uint32_t getNextPartitionNumber();
65
66 void reportCongestion(CongestionReason congestionReason);
67
68 void saveTimestamp(std::chrono::steady_clock::time_point timestamp);
69
70 void cleanUpTimestamps(
71 std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);
72
73 ndn::Name prepareDataName(uint32_t partitionNumber);
74
75 uint64_t determineIdleTime(
76 std::chrono::time_point<std::chrono::steady_clock> referenceTimepoint);
77
78private:
79 const std::weak_ptr<ndn::svs::SVSPubSub> m_svsPubSub;
80 const std::string m_pubTopic;
81
82 std::string m_nodePrefix;
83
84 uint32_t m_numberOfPartitions;
85 std::unordered_set<uint32_t> m_topicPartitions;
86
87 std::mt19937 m_randomNumberGenerator;
88
89 std::optional<std::shared_ptr<CongestionReporter>> m_congestionReporter;
90
91 std::deque<std::chrono::time_point<std::chrono::steady_clock>>
92 m_productionTimestamps;
93
94 // TODO: Make configurable
95 std::chrono::seconds m_maxProductionTimestampAge = std::chrono::seconds(1);
96
97 const std::string &m_downstreamEdgeName;
98
99 std::chrono::time_point<std::chrono::steady_clock> m_idleSince;
100};
101} // namespace iceflow
102
103#endif // ICEFLOW_PRODUCER_HPP
Definition producer.hpp:47
Definition stats.hpp:26