LBANN  0.103.0
LivermoreBigArtificialNeuralNetworkToolkit
thread_safe_queue.hpp
Go to the documentation of this file.
1 // Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC.
3 // Produced at the Lawrence Livermore National Laboratory.
4 // Written by the LBANN Research Team (B. Van Essen, et al.) listed in
5 // the CONTRIBUTORS file. <lbann-dev@llnl.gov>
6 //
7 // LLNL-CODE-697807.
8 // All rights reserved.
9 //
10 // This file is part of LBANN: Livermore Big Artificial Neural Network
11 // Toolkit. For details, see http://software.llnl.gov/LBANN or
12 // https://github.com/LLNL/LBANN.
13 //
14 // Licensed under the Apache License, Version 2.0 (the "Licensee"); you
15 // may not use this file except in compliance with the License. You may
16 // obtain a copy of the License at:
17 //
18 // http://www.apache.org/licenses/LICENSE-2.0
19 //
20 // Unless required by applicable law or agreed to in writing, software
21 // distributed under the License is distributed on an "AS IS" BASIS,
22 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
23 // implied. See the License for the specific language governing
24 // permissions and limitations under the license.
26 #ifndef LBANN_UTILS_THREADS_THREAD_SAFE_QUEUE_HPP_INCLUDED
27 #define LBANN_UTILS_THREADS_THREAD_SAFE_QUEUE_HPP_INCLUDED
28 
29 #include <lbann/utils/memory.hpp>
30 
31 #include <condition_variable>
32 #include <memory>
33 #include <mutex>
34 
35 namespace lbann {
36 
50 template <typename T>
52 {
53 private:
57  struct _Node
58  {
59  std::unique_ptr<T> data_;
60  std::unique_ptr<_Node> next_;
61  };
62 
63 public:
66  : head_(make_unique<_Node>()), tail_(head_.get()), m_stop_threads(false)
67  {}
68 
70  void push(T value)
71  {
72  // Make the new data outside of the lock to minimize lock time
73  auto new_value = std::make_unique<T>(std::move(value));
74  auto new_node = std::make_unique<_Node>();
75 
76  // Adding to the queue only modifies the tail
77  {
78  std::lock_guard<std::mutex> lk(tail_mtx_);
79  tail_->data_ = std::move(new_value);
80  tail_->next_ = std::move(new_node);
81  tail_ = tail_->next_.get();
82  }
83  // Update the condition variable (for wait_and_pop)
84  data_available_.notify_one();
85  }
86 
87  void wake_all(bool stop = false)
88  {
89  {
90  std::lock_guard<std::mutex> lk(head_mtx_);
91  m_stop_threads = stop;
92  }
93  // Update the condition variable (for wait_and_pop)
94  data_available_.notify_all();
95  }
96 
98  void set_stop_threads(bool flag) { m_stop_threads = flag; }
99 
104  std::unique_ptr<T> try_pop()
105  {
106  std::unique_lock<std::mutex> lk(head_mtx_);
107  if (head_.get() == do_get_tail_())
108  return nullptr;
109 
110  // Remove the head
111  auto popped_head = std::move(head_);
112  head_ = std::move(popped_head->next_);
113 
114  return std::move(popped_head->data_);
115  }
116 
118  std::unique_ptr<T> wait_and_pop()
119  {
120  std::unique_lock<std::mutex> lk(head_mtx_);
121  data_available_.wait(lk, [&] {
122  return ((head_.get() != do_get_tail_()) ||
123  ((head_.get() == do_get_tail_()) && m_stop_threads));
124  });
125 
126  // There is no more work to do, bail
127  if (head_.get() == do_get_tail_() && m_stop_threads) {
128  return nullptr;
129  }
130 
131  // Remove the head
132  auto popped_head = std::move(head_);
133  head_ = std::move(popped_head->next_);
134 
135  return std::move(popped_head->data_);
136  }
137 
139  bool empty() const
140  {
141  std::lock_guard<std::mutex> lk(head_mtx_);
142  return (head_.get() == do_get_tail_());
143  }
144 
145 private:
147  _Node* do_get_tail_() const
148  {
149  std::lock_guard<std::mutex> lk(tail_mtx_);
150  return tail_;
151  }
152 
153 private:
155  mutable std::mutex head_mtx_;
156 
158  mutable std::mutex tail_mtx_;
159 
161  std::unique_ptr<_Node> head_;
162 
164  _Node* tail_;
165 
167  std::condition_variable data_available_;
168 
170 
171 }; // class thread_safe_queue
172 
173 } // namespace lbann
174 #endif /* LBANN_UTILS_THREADS_THREAD_SAFE_QUEUE_HPP_INCLUDED */
void set_stop_threads(bool flag)
Allow the thread pool to set / reset the flags.
std::unique_ptr< T > try_pop()
Try to remove the first value from the queue.
std::unique_ptr< _Node > head_
The first node in the list.
bool empty() const
Check if queue is empty.
std::unique_ptr< _Node > next_
std::unique_ptr< T > wait_and_pop()
Wait for data and then return it.
std::string get()
std::mutex tail_mtx_
The mutex protecting the tail of the list.
std::condition_variable data_available_
Condition variable tripped when data added.
std::mutex head_mtx_
The mutex protecting the head of the list.
void push(T value)
Adds a value to back of the queue.
thread_safe_queue()
Default constructor; creates an empty queue.
_Node * do_get_tail_() const
Get the tail pointer.
_Node * tail_
The last node in the list.
A queue that is safe for multiple threads to push to or pull from "simultaneously".
A data value in the thread-safe FIFO queue.
void wake_all(bool stop=false)