casacore
Loading...
Searching...
No Matches
ConditionalQueue.h
Go to the documentation of this file.
1#ifndef SISCO_CONDITIONAL_QUEUE_H_
2#define SISCO_CONDITIONAL_QUEUE_H_
3
4#include <cassert>
5#include <condition_variable>
6#include <list>
7
8namespace casacore::sisco {
9
19template <typename T>
21 public:
22 ConditionalQueue(size_t max_size) : max_size_(max_size) {}
23
24 ~ConditionalQueue() = default;
25
34 void Push(T&& value, std::unique_lock<std::mutex>& lock) {
35 assert(lock.owns_lock());
36 while (values_.size() == max_size_) {
37 pop_condition_.wait(lock);
38 }
39 values_.emplace_back(std::move(value));
40 // We need to notify all, because if only one thread is awoken that
41 // is looking for a specific condition that is not met, the signal
42 // is lost while another nother thread's condition may be met.
43 push_condition_.notify_all();
44 }
45
49 template <typename... Args>
50 void Emplace(std::unique_lock<std::mutex>& lock, Args&&... args) {
51 assert(lock.owns_lock());
52 while (values_.size() == max_size_) {
53 pop_condition_.wait(lock);
54 }
55 values_.emplace_back(std::forward<Args>(args)...);
56 // We need to notify all, because if only one thread is awoken that
57 // is looking for a specific condition that is not met, the signal
58 // is lost while another nother thread's condition may be met.
59 push_condition_.notify_all();
60 }
61
71 template <typename Condition>
72 bool PopIf(T& result, std::unique_lock<std::mutex>& lock,
73 Condition condition) {
74 assert(lock.owns_lock());
75
76 typename std::list<T>::iterator iterator = GetNext(condition);
77 while (iterator == values_.end()) {
78 if (values_.empty() && is_finished_) return false;
79 push_condition_.wait(lock);
80 iterator = GetNext(condition);
81 }
82
83 result = std::move(*iterator);
84 values_.erase(iterator);
85 pop_condition_.notify_one();
86
87 // If this thread emptied the finished queue while another thread is waiting
88 // for a value, it needs to be notified to finish.
89 if (values_.empty() && is_finished_) {
90 push_condition_.notify_all();
91 }
92
93 return true;
94 }
95
99 void NotifyOneChange() { push_condition_.notify_all(); }
100
104 void Finish(std::unique_lock<std::mutex>& lock) {
105 [[maybe_unused]] const std::unique_lock<std::mutex>& avoid_lock_warning =
106 lock;
107 assert(lock.owns_lock());
108 is_finished_ = true;
109 push_condition_.notify_all();
110 }
111
112 size_t Size(std::unique_lock<std::mutex>& lock) const {
113 assert(lock.owns_lock());
114 return values_.size();
115 }
116
117 size_t MaxSize() const { return max_size_; }
118
119 private:
120 template <typename Condition>
121 std::list<T>::iterator GetNext(Condition& condition) {
122 for (typename std::list<T>::iterator i = values_.begin();
123 i != values_.end(); ++i) {
124 if (condition(*i)) return i;
125 }
126 return values_.end();
127 }
128
129 std::list<T> values_;
130 bool is_finished_ = false;
131 size_t max_size_;
132 std::condition_variable pop_condition_;
133 std::condition_variable push_condition_;
134};
135
136} // namespace casacore::sisco
137
138#endif
size_t Size(std::unique_lock< std::mutex > &lock) const
bool PopIf(T &result, std::unique_lock< std::mutex > &lock, Condition condition)
Pop an object from the queue for which a specified condition holds.
std::list< T >::iterator GetNext(Condition &condition)
void Push(T &&value, std::unique_lock< std::mutex > &lock)
Place a new object at the end of the queue.
void Emplace(std::unique_lock< std::mutex > &lock, Args &&... args)
Same as Push(), but constructs in place.
std::condition_variable pop_condition_
void Finish(std::unique_lock< std::mutex > &lock)
Makes PopIf return false once the queue is empty.
void NotifyOneChange()
Notify the queue that the condition for values might have changed.
std::condition_variable push_condition_
NewDelAllocator< T > NewDelAllocator< T >::value
Definition Allocator.h:368