LBANN  0.103.0
LivermoreBigArtificialNeuralNetworkToolkit
thread_pool.hpp
Go to the documentation of this file.
1 // Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC.
3 // Produced at the Lawrence Livermore National Laboratory.
4 // Written by the LBANN Research Team (B. Van Essen, et al.) listed in
5 // the CONTRIBUTORS file. <lbann-dev@llnl.gov>
6 //
7 // LLNL-CODE-697807.
8 // All rights reserved.
9 //
10 // This file is part of LBANN: Livermore Big Artificial Neural Network
11 // Toolkit. For details, see http://software.llnl.gov/LBANN or
12 // https://github.com/LLNL/LBANN.
13 //
14 // Licensed under the Apache License, Version 2.0 (the "Licensee"); you
15 // may not use this file except in compliance with the License. You may
16 // obtain a copy of the License at:
17 //
18 // http://www.apache.org/licenses/LICENSE-2.0
19 //
20 // Unless required by applicable law or agreed to in writing, software
21 // distributed under the License is distributed on an "AS IS" BASIS,
22 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
23 // implied. See the License for the specific language governing
24 // permissions and limitations under the license.
26 #ifndef LBANN_UTILS_THREADS_THREAD_POOL_HPP_INCLUDED
27 #define LBANN_UTILS_THREADS_THREAD_POOL_HPP_INCLUDED
28 
29 #include "lbann_config.hpp"
30 
32 #include "thread_safe_queue.hpp"
33 #include "type_erased_function.hpp"
34 
35 #if defined(LBANN_TOPO_AWARE)
36 #include <hwloc.h>
37 #if defined(HWLOC_API_VERSION) && (HWLOC_API_VERSION < 0x00010b00)
38 #define HWLOC_OBJ_NUMANODE HWLOC_OBJ_NODE
39 #endif
40 #endif
41 
42 #include <sched.h>
43 
44 #include <future>
45 #include <thread>
46 #include <unordered_map>
47 #include <vector>
48 
49 namespace lbann {
50 
52 {
53 public:
54  using thread_container_type = std::vector<std::thread>;
55  using size_type = typename thread_container_type::size_type;
56 
57 private:
62  {
67  {
68  for (auto& t : threads_)
69  if (t.joinable())
70  t.join();
71  }
74  };
75 
76 public:
79  thread_pool();
80 
86  thread_pool(size_type max_threads);
87 
90 
92  void launch_threads(size_type num_threads);
94  void launch_pinned_threads(size_type num_threads, int cpu_offset);
96  void reap_threads();
98  void relaunch_pinned_threads(size_type num_threads);
99 
101  template <typename FunctionT>
102  std::future<typename std::result_of<FunctionT()>::type>
103  submit_job(FunctionT func)
104  {
105  using return_type = typename std::result_of<FunctionT()>::type;
106 
107  std::packaged_task<return_type()> task(std::move(func));
108  auto future = task.get_future();
109  global_work_queue_.push(std::move(task));
110  return future;
111  }
112 
115  template <typename FunctionT>
116  void submit_job_to_work_group(FunctionT func)
117  {
118  using return_type = typename std::result_of<FunctionT()>::type;
119 
120  std::packaged_task<return_type()> task(std::move(func));
121  m_work_group.emplace_back(task.get_future());
122  global_work_queue_.push(std::move(task));
123 
124  return;
125  }
126 
129  {
130  std::string error_message;
131  for (auto& f : m_work_group) {
132  bool valid = f.get();
133  if (!valid) {
134  error_message = "invalid future in work group";
135  }
136  }
137  m_work_group.clear();
138  if (!error_message.empty()) {
139  LBANN_ERROR(error_message);
140  }
141  return true;
142  }
143 
145  size_type get_num_threads() const noexcept { return threads_.size(); }
146 
148  int get_local_thread_id();
149 
152 
153 private:
155  void do_thread_work_();
156 #if defined(LBANN_TOPO_AWARE)
157  void do_thread_work_pinned_thread_(int tid,
158  hwloc_topology_t topo,
159  hwloc_cpuset_t cpuset);
160 #endif // LBANN_TOPO_AWARE
161 private:
164 
167 
170 
172  std::atomic<bool> all_work_done_;
173 
174  std::mutex m_thread_map_mutex;
175  std::unordered_map<std::thread::id, int> m_thread_id_to_local_id_map;
176 
178  std::vector<std::future<bool>> m_work_group;
179 
181 
182 }; // class thread_pool
183 
184 } // namespace lbann
185 #endif /* LBANN_UTILS_THREADS_THREAD_POOL_HPP_INCLUDED */
~thread_joiner()
Destructor: safely shut all threads down.
Definition: thread_pool.hpp:66
std::unordered_map< std::thread::id, int > m_thread_id_to_local_id_map
thread_safe_queue< type_erased_function > global_work_queue_
The thread-safe work queue.
void submit_job_to_work_group(FunctionT func)
Submit a job to the pool&#39;s queue and place the future into a work group.
~thread_pool()
Destroy the threadpool.
Definition: thread_pool.hpp:89
#define LBANN_ERROR(...)
Definition: exception.hpp:37
void do_thread_work_()
The task executed by each thread.
std::future< typename std::result_of< FunctionT()>::type > submit_job(FunctionT func)
Submit a job to the pool&#39;s queue.
typename thread_container_type::size_type size_type
Definition: thread_pool.hpp:55
thread_container_type & threads_
Thread container reference.
Definition: thread_pool.hpp:73
thread_pool()
Construct an empty threadpool. Size must be set with launch().
std::vector< std::future< bool > > m_work_group
Work Group.
void relaunch_pinned_threads(size_type num_threads)
thread_joiner thread_joiner_
RAII "deleter" for the threads.
thread_joiner(thread_container_type &threads)
Grab container reference.
Definition: thread_pool.hpp:64
thread_container_type threads_
Container holding the threads.
void launch_threads(size_type num_threads)
Launch the threads.
RAII object that destroys threads.
Definition: thread_pool.hpp:61
std::vector< std::thread > thread_container_type
Definition: thread_pool.hpp:54
void launch_pinned_threads(size_type num_threads, int cpu_offset)
Launch the threads and pin them to the Hyperthreaded cores.
std::mutex m_thread_map_mutex
int get_local_thread_id()
Convert the C++ thread id into a local thread pool id.
size_type get_num_threads() const noexcept
Query the number of worker threads actually present.
std::atomic< bool > all_work_done_
Flag to track if more work is to be done.
bool finish_work_group()
Wait for all of the jobs in a work group to finish.
int get_threads_offset()
Convert the C++ thread id into a local thread pool id.
A queue that is safe for multiple threads to push to or pull from "simultaneously".