forked from nbsdx/ThreadPool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadPool.h
159 lines (139 loc) · 4.43 KB
/
ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#ifndef CONCURRENT_THREADPOOL_H
#define CONCURRENT_THREADPOOL_H
#include <atomic>
#include <thread>
#include <mutex>
#include <array>
#include <list>
#include <functional>
#include <condition_variable>
namespace nbsdx {
namespace concurrent {
/**
* Simple ThreadPool that creates `ThreadCount` threads upon its creation,
* and pulls from a queue to get new jobs. The default is 10 threads.
*
* This class requires a number of c++11 features be present in your compiler.
*/
template <unsigned ThreadCount = 10>
class ThreadPool {
std::array<std::thread, ThreadCount> threads;
std::list<std::function<void(void)>> queue;
std::atomic_int jobs_left;
std::atomic_bool bailout;
std::atomic_bool finished;
std::condition_variable job_available_var;
std::condition_variable wait_var;
std::mutex wait_mutex;
std::mutex queue_mutex;
/**
* Take the next job in the queue and run it.
* Notify the main thread that a job has completed.
*/
void Task() {
while( !bailout ) {
next_job()();
--jobs_left;
wait_var.notify_one();
}
}
/**
* Get the next job; pop the first item in the queue,
* otherwise wait for a signal from the main thread.
*/
std::function<void(void)> next_job() {
std::function<void(void)> res;
std::unique_lock<std::mutex> job_lock( queue_mutex );
// Wait for a job if we don't have any.
job_available_var.wait( job_lock, [this]{ queue.size() || bailout; } );
// Get job from the queue
if( !bailout ) {
res = queue.front();
queue.pop_front();
}
else { // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
res = []{};
++jobs_left;
}
return res;
}
public:
ThreadPool()
: jobs_left( 0 )
, bailout( false )
, finished( false )
{
for( unsigned i = 0; i < ThreadCount; ++i )
threads[ i ] = std::move( std::thread( [this,i]{ this->Task(); } ) );
}
/**
* JoinAll on deconstruction
*/
~ThreadPool() {
JoinAll();
}
/**
* Get the number of threads in this pool
*/
inline unsigned Size() const {
return ThreadCount;
}
/**
* Get the number of jobs left in the queue.
*/
inline unsigned JobsRemaining() {
std::lock_guard<std::mutex> guard( queue_mutex );
return queue.size();
}
/**
* Add a new job to the pool. If there are no jobs in the queue,
* a thread is woken up to take the job. If all threads are busy,
* the job is added to the end of the queue.
*/
void AddJob( std::function<void(void)> job ) {
std::lock_guard<std::mutex> guard( queue_mutex );
queue.emplace_back( job );
++jobs_left;
job_available_var.notify_one();
}
/**
* Join with all threads. Block until all threads have completed.
* Params: WaitForAll: If true, will wait for the queue to empty
* before joining with threads. If false, will complete
* current jobs, then inform the threads to exit.
* The queue will be empty after this call, and the threads will
* be done. After invoking `ThreadPool::JoinAll`, the pool can no
* longer be used. If you need the pool to exist past completion
* of jobs, look to use `ThreadPool::WaitAll`.
*/
void JoinAll( bool WaitForAll = true ) {
if( !finished ) {
if( WaitForAll ) {
WaitAll();
}
// note that we're done, and wake up any thread that's
// waiting for a new job
bailout = true;
job_available_var.notify_all();
for( auto &x : threads )
if( x.joinable() )
x.join();
finished = true;
}
}
/**
* Wait for the pool to empty before continuing.
* This does not call `std::thread::join`, it only waits until
* all jobs have finshed executing.
*/
void WaitAll() {
if( jobs_left > 0 ) {
std::unique_lock<std::mutex> lk( wait_mutex );
wait_var.wait( lk, [this]{ return this->jobs_left == 0; } );
lk.unlock();
}
}
};
} // namespace concurrent
} // namespace nbsdx
#endif //CONCURRENT_THREADPOOL_H