cpp-pthread  (v1.7.3)
Simple C++ wrapper to pthread functions.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Modules Pages
sync_queue.hpp
Go to the documentation of this file.
1 // sync_queue.hpp
3 // substation
4 //
5 // Created by herbert koelman on 25/02/2016.
6 // Copyright © 2016 urbix-software. All rights reserved.
7 //
8 
9 #ifndef pthread_synchronized_queue_hpp
10 #define pthread_synchronized_queue_hpp
11 
12 #include <list> // std::lis
13 
14 #include "pthread/pthread.hpp"
15 #if __cplusplus < 201103L
16 #else
17 #include <atomic>
18 #endif
19 #include <string>
20 
21 
22 namespace pthread {
23 
28  namespace util {
29 
43  template<typename T> class sync_queue {
44  public:
45 
52  void put (const T& item);
53 
59  void put (const T& item, int wait_time);
60 
67  void get ( T& item);
68 
76  void get ( T& item, int wait_time );
77 
79  bool empty() const {
80  return _items.empty();
81  }
82 
84 #if __cplusplus < 201103L
85  size_t size() {
86  // we use read/write locks when std::atomic is not available
88 #else
89  size_t size() const {
90 #endif
91  return _items.size();
92  }
93 
95 #if __cplusplus < 201103L
96  size_t max_size() {
97  // we use read/write locks when std::atomic is not available
99 #else
100  size_t max_size() const {
101 #endif
102  return _max_size ;
103  }
104 
109  void set_max_size( size_t ms ){
110  if (ms >= 0){
111 #if __cplusplus < 201103L
112  // we use read/write locks when std::atomic is not available
114 #endif
115  _max_size = ms;
116  }else{
117 #if __cplusplus < 201103L
118  throw queue_exception("synchronized_queue's max size must be greater then 0.");
119 #else
120  throw queue_exception("synchronized_queue's max size must be greater then 0, max_size " + std::to_string(ms) + " is not.");
121 #endif
122  }
123  }
124 
129  explicit sync_queue( int ms = 10 );
130 
132  virtual ~sync_queue();
133 
134  private:
135 
136  pthread::mutex _mutex;
137  pthread::condition_variable _not_empty_cv;
138  pthread::condition_variable _not_full_cv;
139  std::list<T> _items ;
140 #if __cplusplus < 201103L
141  pthread::read_write_lock _rwlock;
142  int _max_size ;
143 #else
144  std::atomic<int> _max_size ;
145 #endif
146 
147  };
148 
149 
152  // template implementation ------------------------------------------------
153 
154  template<typename T> void sync_queue<T>::get( T& item ){
156 
157 #if __cplusplus < 201103L
158  while ( (! (not_empty = !_items.empty())) && not_empty_cv.wait(_mutex) ){
159  }
160 #else
161  _not_empty_cv.wait(lck,[this]{ return !_items.empty(); });
162 #endif
163 
164  item=_items.front();
165  _items.pop_front();
166  _not_full_cv.notify_one();
167  }
168 
169  template<typename T> void sync_queue<T>::get( T& item, int wait_time ){
170 
172 
173 #if __cplusplus < 201103L
174  bool not_empty = true;
175  auto delay = wait_time;
176  while ( ! (not_empty = !_items.empty()) && (_not_empty_cv.wait_for(_mutex, delay) == pthread::cv_status::no_timeout)){
177  delay = -1 ;
178  }
179 #else
180  bool not_empty = _not_empty_cv.wait_for(lck,wait_time, [this]{ return !_items.empty(); }); // keep waiting if item list is full
181 #endif
182 
183  if ( not_empty ){
184  item=_items.front();
185  _items.pop_front();
186  _not_full_cv.notify_one();
187  } else {
188  _not_full_cv.notify_all();
189  throw queue_timeout("synchronized_queue::get() timed out.");
190  }
191  }
192 
193  template<typename T> void sync_queue<T>::put( const T& item ) {
195 
196 #if __cplusplus < 201103L
197  bool not_full = true;
198  while ( ! (not_full = (_items.size() < _max_size))){
199  _not_full_cv.wait(_mutex);
200  }
201 #else
202  _not_full_cv.wait(_mutex,[this]{ return _items.size() < _max_size; });
203 #endif
204 
205  _items.push_back(item);
206  _not_empty_cv.notify_one(); // signal that there is at least a new message
207  }
208 
209  template<typename T> void sync_queue<T>::put( const T& item, int wait_time ){
210 
212 
213 #if __cplusplus < 201103L
214  bool not_full = true;
215  auto delay = wait_time;
216  while ( ! (not_full = (_items.size() < _max_size)) && (_not_full_cv.wait_for(_mutex, delay) == pthread::cv_status::no_timeout)){
217  delay = -1 ;
218  }
219 #else
220  // The following method signature uses lambda which is not supported by AIX XL C/C++ 13.1.2
221  bool not_full = _not_full_cv.wait_for(lck, wait_time, [this]{ return _items.size() < _max_size; });
222 #endif
223 
224  if ( not_full ){
225  _items.push_back(item);
226  _not_empty_cv.notify_one();
227  } else {
228  _not_empty_cv.notify_all();
229  throw queue_full("synchronized_queue::put() timeout, queue is full.");
230  }
231  }
232 
233 
234  template<typename T> sync_queue<T>::sync_queue( int ms ): _max_size(ms) {
235  }
236 
237  template<typename T> sync_queue<T>::~sync_queue(){
238  // Intentionally unimplemented...
239  }
240 
241  }; // namespace util
242 }; // namespace pthread
243 
244 
245 #endif /* sync_queue_hpp */
void put(const T &item)
Definition: sync_queue.hpp:193
size_t size() const
Definition: sync_queue.hpp:89
void set_max_size(size_t ms)
Definition: sync_queue.hpp:109
size_t max_size() const
Definition: sync_queue.hpp:100
cv_status wait_for(mutex &mtx, int millis)