C++ Distributed Hash Table
thread_pool.h
1 /*
2  * Copyright (C) 2014-2022 Savoir-faire Linux Inc.
3  *
4  * Author: Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #pragma once
21 
22 #include "def.h"
23 
24 #include <condition_variable>
25 #include <vector>
26 #include <queue>
27 #include <future>
28 #include <functional>
29 
30 #include <ciso646> // fix windows compiler bug
31 
32 namespace dht {
33 
34 class OPENDHT_PUBLIC ThreadPool {
35 public:
36  static ThreadPool& computation();
37  static ThreadPool& io();
38 
39  ThreadPool();
40  ThreadPool(size_t maxThreads);
41  ~ThreadPool();
42 
43  void run(std::function<void()>&& cb);
44 
45  template<class T>
46  std::future<T> get(std::function<T()>&& cb) {
47  auto ret = std::make_shared<std::promise<T>>();
48  run([cb = std::move(cb), ret]() mutable {
49  try {
50  ret->set_value(cb());
51  } catch (...) {
52  try {
53  ret->set_exception(std::current_exception());
54  } catch(...) {}
55  }
56  });
57  return ret->get_future();
58  }
59  template<class T>
60  std::shared_future<T> getShared(std::function<T()>&& cb) {
61  return get(std::move(cb));
62  }
63 
64  void stop();
65  void join();
66 
67 private:
68  std::mutex lock_ {};
69  std::condition_variable cv_ {};
70  std::queue<std::function<void()>> tasks_ {};
71  std::vector<std::unique_ptr<std::thread>> threads_;
72  unsigned readyThreads_ {0};
73  bool running_ {true};
74 
75  const unsigned maxThreads_;
76 };
77 
78 class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this<Executor> {
79 public:
80  Executor(ThreadPool& pool, unsigned maxConcurrent = 1)
81  : threadPool_(pool), maxConcurrent_(maxConcurrent)
82  {}
83 
84  void run(std::function<void()>&& task);
85 
86 private:
87  std::reference_wrapper<ThreadPool> threadPool_;
88  const unsigned maxConcurrent_ {1};
89  std::mutex lock_ {};
90  unsigned current_ {0};
91  std::queue<std::function<void()>> tasks_ {};
92 
93  void run_(std::function<void()>&& task);
94  void schedule();
95 };
96 
97 class OPENDHT_PUBLIC ExecutionContext {
98 public:
100  : threadPool_(pool), state_(std::make_shared<SharedState>())
101  {}
102 
103  ~ExecutionContext() {
104  state_->destroy();
105  }
106 
108  void stop() {
109  state_->destroy(false);
110  }
111 
112  void run(std::function<void()>&& task) {
113  std::lock_guard<std::mutex> lock(state_->mtx);
114  if (state_->shutdown_) return;
115  state_->pendingTasks++;
116  threadPool_.get().run([task = std::move(task), state = state_] {
117  state->run(task);
118  });
119  }
120 
121 private:
122  struct SharedState {
123  std::mutex mtx {};
124  std::condition_variable cv {};
125  unsigned pendingTasks {0};
126  unsigned ongoingTasks {0};
128  bool shutdown_ {false};
130  std::atomic_bool destroyed {false};
131 
132  void destroy(bool wait = true) {
133  std::unique_lock<std::mutex> lock(mtx);
134  if (destroyed) return;
135  if (wait) {
136  cv.wait(lock, [this] { return pendingTasks == 0 && ongoingTasks == 0; });
137  }
138  shutdown_ = true;
139  if (not wait) {
140  cv.wait(lock, [this] { return ongoingTasks == 0; });
141  }
142  destroyed = true;
143  }
144 
145  void run(const std::function<void()>& task) {
146  {
147  std::lock_guard<std::mutex> lock(mtx);
148  pendingTasks--;
149  ongoingTasks++;
150  }
151  if (destroyed) return;
152  task();
153  {
154  std::lock_guard<std::mutex> lock(mtx);
155  ongoingTasks--;
156  cv.notify_all();
157  }
158  }
159  };
160  std::reference_wrapper<ThreadPool> threadPool_;
161  std::shared_ptr<SharedState> state_;
162 };
163 
164 }
Definition: callbacks.h:35