10 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 11 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H 16 template <
typename Environment>
17 class NonBlockingThreadPoolTempl :
public Eigen::ThreadPoolInterface {
19 typedef typename Environment::Task Task;
20 typedef RunQueue<Task, 1024> Queue;
22 NonBlockingThreadPoolTempl(
int num_threads, Environment env = Environment())
24 threads_(num_threads),
26 coprimes_(num_threads),
27 waiters_(num_threads),
39 for (
int i = 1; i <= num_threads; i++) {
41 unsigned b = num_threads;
49 coprimes_.push_back(i);
52 for (
int i = 0; i < num_threads; i++) {
53 queues_.push_back(
new Queue());
55 for (
int i = 0; i < num_threads; i++) {
56 threads_.push_back(env_.CreateThread([
this, i]() { WorkerLoop(i); }));
60 ~NonBlockingThreadPoolTempl() {
68 for (
size_t i = 0; i < threads_.size(); i++)
delete threads_[i];
69 for (
size_t i = 0; i < threads_.size(); i++)
delete queues_[i];
72 void Schedule(std::function<
void()> fn) {
73 Task t = env_.CreateTask(std::move(fn));
74 PerThread* pt = GetPerThread();
75 if (pt->pool ==
this) {
77 Queue* q = queues_[pt->thread_id];
78 t = q->PushFront(std::move(t));
82 Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
83 t = q->PushBack(std::move(t));
98 int NumThreads() const final {
99 return static_cast<int>(threads_.size());
102 int CurrentThreadId() const final {
103 const PerThread* pt =
104 const_cast<NonBlockingThreadPoolTempl*
>(
this)->GetPerThread();
105 if (pt->pool ==
this) {
106 return pt->thread_id;
113 typedef typename Environment::EnvThread Thread;
116 constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
117 NonBlockingThreadPoolTempl* pool;
123 MaxSizeVector<Thread*> threads_;
124 MaxSizeVector<Queue*> queues_;
125 MaxSizeVector<unsigned> coprimes_;
126 std::vector<EventCount::Waiter> waiters_;
127 std::atomic<unsigned> blocked_;
128 std::atomic<bool> spinning_;
129 std::atomic<bool> done_;
133 void WorkerLoop(
int thread_id) {
134 PerThread* pt = GetPerThread();
136 pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
137 pt->thread_id = thread_id;
138 Queue* q = queues_[thread_id];
139 EventCount::Waiter* waiter = &waiters_[thread_id];
141 Task t = q->PopFront();
151 if (!spinning_ && !spinning_.exchange(
true)) {
152 for (
int i = 0; i < 1000 && !t.f; i++) {
158 if (!WaitForWork(waiter, &t)) {
172 PerThread* pt = GetPerThread();
173 const size_t size = queues_.size();
174 unsigned r = Rand(&pt->rand);
175 unsigned inc = coprimes_[r % coprimes_.size()];
176 unsigned victim = r % size;
177 for (
unsigned i = 0; i < size; i++) {
178 Task t = queues_[victim]->PopBack();
183 if (victim >= size) {
193 bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
199 int victim = NonEmptyQueueIndex();
201 ec_.CancelWait(waiter);
202 *t = queues_[victim]->PopBack();
209 if (done_ && blocked_ == threads_.size()) {
210 ec_.CancelWait(waiter);
216 if (NonEmptyQueueIndex() != -1) {
230 ec_.CommitWait(waiter);
235 int NonEmptyQueueIndex() {
236 PerThread* pt = GetPerThread();
237 const size_t size = queues_.size();
238 unsigned r = Rand(&pt->rand);
239 unsigned inc = coprimes_[r % coprimes_.size()];
240 unsigned victim = r % size;
241 for (
unsigned i = 0; i < size; i++) {
242 if (!queues_[victim]->Empty()) {
246 if (victim >= size) {
253 static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
254 EIGEN_THREAD_LOCAL PerThread per_thread_;
255 PerThread* pt = &per_thread_;
259 static EIGEN_STRONG_INLINE
unsigned Rand(uint64_t* state) {
260 uint64_t current = *state;
262 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
264 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
268 typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
272 #endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H Namespace containing all symbols from the Eigen library.
Definition: AdolcForward:45