IceFlow NDN-based stream processing library written in C++
Loading...
Searching...
No Matches
ringbuffer.hpp
1/*
2 * Copyright 2023 The IceFlow Authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * SPDX-License-Identifier: Apache-2.0
17 */
18
19#ifndef ICEFLOW_CORE_RINGBUFFER_HPP
20#define ICEFLOW_CORE_RINGBUFFER_HPP
21
22#include <condition_variable>
23
24#include "boost/circular_buffer.hpp"
25
26namespace iceflow {
27
35template <typename T>
36
38public:
42 // TODO: make it configurable - pass this parameter from the Config
43 // file/Constructor
44 RingBuffer() : m_queue(10000) {}
45
49 RingBuffer &operator=(const RingBuffer &) = delete;
50
57 RingBuffer(RingBuffer const &other) {
58 std::lock_guard<std::mutex> lock(other.m_mutex);
59 m_queue = other.m_queue;
60 }
61
62 int size() const { // queue size
63 std::lock_guard<std::mutex> lock(m_mutex);
64 int x = static_cast<int>(m_queue.size());
65 return x;
66 }
67
73 void push(const T &value) {
74 {
75 std::unique_lock<std::mutex> lock(m_mutex);
76 m_queue.push_back(value);
77 lock.unlock();
78 }
79 m_queueCondition.notify_all();
80 }
81 void pushData(T &value, int threshold) {
82 {
83 std::unique_lock<std::mutex> lock(m_mutex);
84 while (m_queue.size() >= threshold) {
85 m_queueCondition.wait(lock);
86 }
87 m_queue.push_back(value);
88 lock.unlock();
89 }
90 m_queueCondition.notify_all();
91 }
92
98 bool empty() const {
99 std::lock_guard<std::mutex> lock(m_mutex);
100 return m_queue.empty();
101 }
102
112 bool tryAndPop(T &value) {
113 std::lock_guard<std::mutex> lock(m_mutex);
114 if (m_queue.empty())
115 return false;
116 value = m_queue.front();
117 m_queue.pop();
118 return true;
119 }
120
129 std::shared_ptr<T> tryAndPop() {
130 std::lock_guard<std::mutex> lock(m_mutex);
131 if (m_queue.empty())
132 return std::shared_ptr<T>();
133 std::shared_ptr<T> value = std::make_shared<T>(m_queue.front());
134 m_queue.pop();
135 return value;
136 }
137
145 void waitAndPop(T &value) {
146 std::unique_lock<std::mutex> lock(m_mutex);
147 m_queueCondition.wait(lock, [this] { return !m_queue.empty(); });
148 value = m_queue.front();
149 m_queue.pop();
150 }
151
160 std::shared_ptr<T> waitAndPop() {
161 std::unique_lock<std::mutex> lock(m_mutex);
162 m_queueCondition.wait(lock, [this] { return !m_queue.empty(); });
163 std::shared_ptr<T> value = std::make_shared<T>(m_queue.front());
164 m_queue.pop();
165 return value;
166 }
167
176 std::unique_lock<std::mutex> lock(m_mutex);
177 m_queueCondition.wait(lock, [this] { return !m_queue.empty(); });
178 T value = m_queue.front();
179 m_queue.pop_front();
180 lock.unlock();
181 m_queueCondition.notify_all();
182 return value;
183 }
184
194 bool waitForAndPop(T &value, std::chrono::milliseconds waitDuration) {
195 std::unique_lock<std::mutex> lock(m_mutex);
196 if (m_queueCondition.wait_for(lock, waitDuration,
197 [this] { return !m_queue.empty(); })) {
198 // Got something
199 value = m_queue.front();
200 m_queue.pop();
201 return true;
202 }
203 // Timed out
204 return false;
205 }
206
215 std::shared_ptr<T> waitForAndPop(std::chrono::milliseconds waitDuration) {
216 std::unique_lock<std::mutex> lock(m_mutex);
217 if (m_queueCondition.wait_for(lock, waitDuration,
218 [this] { return !m_queue.empty(); })) {
219 // Got something
220 std::shared_ptr<T> value = std::make_shared<T>(m_queue.front());
221 m_queue.pop();
222 return value;
223 }
224 // Timed out
225 return std::shared_ptr<T>();
226 }
227
228private:
229 mutable std::mutex m_mutex;
230 boost::circular_buffer<T> m_queue;
231
232 std::condition_variable m_queueCondition;
233};
234
235} // namespace iceflow
236
237#endif // ICEFLOW_CORE_RINGBUFFER_HPP
This class provides a thread safe implementation of a queue. It uses boost::circular_buffer<T> as an ...
Definition ringbuffer.hpp:37
void push(const T &value)
Pushes a value to the queue.
Definition ringbuffer.hpp:73
bool waitForAndPop(T &value, std::chrono::milliseconds waitDuration)
Waits and pops an item from the queue and copies it to value passed in as an argument....
Definition ringbuffer.hpp:194
T waitAndPopValue()
Waits and pops an item from the queue and returns a copy of the item This is a blocking call and it w...
Definition ringbuffer.hpp:175
std::shared_ptr< T > waitAndPop()
Waits and pops an item from the queue and returns a shared_ptr<T> This is a blocking call and it wait...
Definition ringbuffer.hpp:160
RingBuffer(RingBuffer const &other)
Copy constructor of the RingBuffer class. This enables an instance of this class to be passed around ...
Definition ringbuffer.hpp:57
bool empty() const
Checks if the queue is empty.
Definition ringbuffer.hpp:98
std::shared_ptr< T > waitForAndPop(std::chrono::milliseconds waitDuration)
Waits and pops an item from the queue and returns a std::shared_ptr<T>. This is a blocking call with ...
Definition ringbuffer.hpp:215
bool tryAndPop(T &value)
Tries to pop a value from the queue and copy it to the value passed in as an argument....
Definition ringbuffer.hpp:112
RingBuffer()
Constructor of the RingBuffer class.
Definition ringbuffer.hpp:44
std::shared_ptr< T > tryAndPop()
Tries to pop an item from the queue and returns a std::shared_ptr<T> This is a non blocking call and ...
Definition ringbuffer.hpp:129
void waitAndPop(T &value)
Waits and pops an item from the queue and copies it to value passed in as an argument This is a block...
Definition ringbuffer.hpp:145
RingBuffer & operator=(const RingBuffer &)=delete
Copy assignment.