26 #ifndef LBANN_UTILS_THREADS_THREAD_POOL_HPP_INCLUDED 27 #define LBANN_UTILS_THREADS_THREAD_POOL_HPP_INCLUDED 29 #include "lbann_config.hpp" 35 #if defined(LBANN_TOPO_AWARE) 37 #if defined(HWLOC_API_VERSION) && (HWLOC_API_VERSION < 0x00010b00) 38 #define HWLOC_OBJ_NUMANODE HWLOC_OBJ_NODE 46 #include <unordered_map> 55 using size_type =
typename thread_container_type::size_type;
101 template <
typename FunctionT>
102 std::future<typename std::result_of<FunctionT()>::type>
105 using return_type =
typename std::result_of<FunctionT()>::type;
107 std::packaged_task<return_type()> task(std::move(func));
108 auto future = task.get_future();
115 template <
typename FunctionT>
118 using return_type =
typename std::result_of<FunctionT()>::type;
120 std::packaged_task<return_type()> task(std::move(func));
130 std::string error_message;
132 bool valid = f.get();
134 error_message =
"invalid future in work group";
137 m_work_group.clear();
138 if (!error_message.empty()) {
156 #if defined(LBANN_TOPO_AWARE) 157 void do_thread_work_pinned_thread_(
int tid,
158 hwloc_topology_t topo,
159 hwloc_cpuset_t cpuset);
160 #endif // LBANN_TOPO_AWARE ~thread_joiner()
Destructor: safely shut all threads down.
std::unordered_map< std::thread::id, int > m_thread_id_to_local_id_map
thread_safe_queue< type_erased_function > global_work_queue_
The thread-safe work queue.
void submit_job_to_work_group(FunctionT func)
Submit a job to the pool's queue and place the future into a work group.
~thread_pool()
Destroy the threadpool.
void do_thread_work_()
The task executed by each thread.
std::future< typename std::result_of< FunctionT()>::type > submit_job(FunctionT func)
Submit a job to the pool's queue.
typename thread_container_type::size_type size_type
thread_container_type & threads_
Thread container reference.
thread_pool()
Construct an empty threadpool. Size must be set with launch().
std::vector< std::future< bool > > m_work_group
Work Group.
void relaunch_pinned_threads(size_type num_threads)
thread_joiner thread_joiner_
RAII "deleter" for the threads.
thread_joiner(thread_container_type &threads)
Grab container reference.
thread_container_type threads_
Container holding the threads.
void launch_threads(size_type num_threads)
Launch the threads.
RAII object that destroys threads.
std::vector< std::thread > thread_container_type
void launch_pinned_threads(size_type num_threads, int cpu_offset)
Launch the threads and pin them to the Hyperthreaded cores.
std::mutex m_thread_map_mutex
int get_local_thread_id()
Convert the C++ thread id into a local thread pool id.
size_type get_num_threads() const noexcept
Query the number of worker threads actually present.
std::atomic< bool > all_work_done_
Flag to track if more work is to be done.
bool finish_work_group()
Wait for all of the jobs in a work group to finish.
int get_threads_offset()
Convert the C++ thread id into a local thread pool id.
A queue that is safe for multiple threads to push to or pull from "simultaneously".