Electroneum
task_region.h
Go to the documentation of this file.
1 // Copyrights(c) 2017-2019, The Electroneum Project
2 // Copyrights(c) 2014-2017, The Monero Project
3 //
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without modification, are
7 // permitted provided that the following conditions are met:
8 //
9 // 1. Redistributions of source code must retain the above copyright notice, this list of
10 // conditions and the following disclaimer.
11 //
12 // 2. Redistributions in binary form must reproduce the above copyright notice, this list
13 // of conditions and the following disclaimer in the documentation and/or other
14 // materials provided with the distribution.
15 //
16 // 3. Neither the name of the copyright holder nor the names of its contributors may be
17 // used to endorse or promote products derived from this software without specific
18 // prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27 // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28 // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 #pragma once
30 
31 #include <atomic>
32 #include <boost/thread/condition_variable.hpp>
33 #include <boost/thread/mutex.hpp>
34 #include <memory>
35 #include <type_traits>
36 #include <utility>
37 
38 #include "common/thread_group.h"
39 
40 namespace tools
41 {
42 
56 {
57  struct state
58  {
59  using id = unsigned;
60 
61  explicit state(std::shared_ptr<state> next_src) noexcept
62  : next(std::move(next_src))
63  , ready(0)
64  , pending(0)
66  , all_complete() {
67  }
68 
69  state(const state&) = default;
70  state(state&&) = default;
71  ~state() = default;
72  state& operator=(const state&) = default;
73  state& operator=(state&&) = default;
74 
75  void track_id(id task_id) noexcept {
76  pending |= task_id;
77  ready |= task_id;
78  }
79 
81  bool can_run(id task_id) noexcept {
82  return (ready.fetch_and(~task_id) & task_id);
83  }
84 
86  void mark_completed(id task_id) noexcept;
87 
89  void abort() noexcept;
90 
92  void wait() noexcept;
93 
95  void wait(thread_group& threads) noexcept;
96 
97  private:
98  /* This implementation is a bit pessimistic, it ensures that all copies
99  of a wrapped task can only be executed once. `thread_group` should never
100  do this, but some variable needs to track whether an abort should be done
101  anyway... */
102  std::shared_ptr<state> next;
103  std::atomic<id> ready;
104  std::atomic<id> pending;
106  boost::condition_variable all_complete;
107  };
108 
109  template<typename F>
110  struct wrapper
111  {
112  wrapper(state::id id_src, std::shared_ptr<state> st_src, F f_src)
113  : task_id(id_src), st(std::move(st_src)), f(std::move(f_src)) {
114  }
115 
116  wrapper(const wrapper&) = default;
117  wrapper(wrapper&&) = default;
118  wrapper& operator=(const wrapper&) = default;
119  wrapper& operator=(wrapper&&) = default;
120 
121  void operator()() {
122  if (st) {
123  if (st->can_run(task_id)) {
124  f();
125  }
126  st->mark_completed(task_id);
127  }
128  }
129 
130  private:
132  std::shared_ptr<state> st;
133  F f;
134  };
135 
136 public:
137  friend struct task_region_;
138 
139  task_region_handle() = delete;
140  task_region_handle(const task_region_handle&) = delete;
142 
144  ~task_region_handle() noexcept {
145  if (st) {
146  st->abort();
147  st->wait(threads);
148  }
149  }
150 
153 
159  template<typename F>
160  void run(F&& f) {
161  if (threads.count() == 0) {
162  f();
163  } else {
164  if (!st || next_id == 0) {
165  create_state();
166  }
167  const state::id this_id = next_id;
168  next_id <<= 1;
169 
170  st->track_id(this_id);
171  threads.dispatch(wrapper<F>{this_id, st, std::move(f)});
172  }
173  }
174 
176  void wait() noexcept {
177  if (st) {
178  do_wait();
179  }
180  }
181 
182 private:
183  explicit task_region_handle(thread_group& threads_src)
184  : st(nullptr), threads(threads_src), next_id(0) {
185  }
186 
187  void create_state();
188  void do_wait() noexcept;
189 
190  std::shared_ptr<state> st;
193 };
194 
199 struct task_region_ {
200  template<typename F>
201  void operator()(thread_group& threads, F&& f) const {
202  static_assert(
203  std::is_same<void, typename std::result_of<F(task_region_handle&)>::type>::value,
204  "f cannot have a return value"
205  );
206  task_region_handle region{threads};
207  f(region);
208  region.wait();
209  }
210 
211  template<typename F>
212  void operator()(thread_group&& threads, F&& f) const {
213  (*this)(threads, std::forward<F>(f));
214  }
215 
216  template<typename F>
217  void operator()(F&& f) const {
219  (*this)(threads, std::forward<F>(f));
220  }
221 };
222 
223 constexpr const task_region_ task_region{};
224 }
Definition: task_region.h:55
Definition: unordered_containers_boost_serialization.h:38
state(std::shared_ptr< state > next_src) noexcept
Definition: task_region.h:61
void do_wait() noexcept
Definition: task_region.cpp:90
#define F(w, k)
Definition: sha512-blocks.c:61
std::shared_ptr< state > st
Definition: task_region.h:190
std::shared_ptr< state > next
Definition: task_region.h:102
state & operator=(const state &)=default
Definition: block_queue.cpp:41
Definition: task_region.h:199
boost::condition_variable all_complete
Definition: task_region.h:106
state::id next_id
Definition: task_region.h:192
void mark_completed(id task_id) noexcept
Mark id as completed, and synchronize with waiting threads.
Definition: task_region.cpp:44
task_region_handle(thread_group &threads_src)
Definition: task_region.h:183
std::atomic< id > ready
Tracks whether a task has been invoked.
Definition: task_region.h:103
void track_id(id task_id) noexcept
Definition: task_region.h:75
Definition: task_region.h:57
wrapper(state::id id_src, std::shared_ptr< state > st_src, F f_src)
Definition: task_region.h:112
thread_group & threads
Definition: task_region.h:191
void operator()(thread_group &threads, F &&f) const
Definition: task_region.h:201
std::shared_ptr< state > st
Definition: task_region.h:132
void abort() noexcept
Tell all unstarted functions in region to return immediately.
Definition: task_region.cpp:53
Manages zero or more threads for work dispatching.
Definition: thread_group.h:44
std::atomic< id > pending
Tracks when a task has completed or aborted.
Definition: task_region.h:104
Various Tools.
Definition: base58.cpp:43
void run(F &&f)
Definition: task_region.h:160
type
Definition: json.h:74
unsigned id
Definition: task_region.h:59
const state::id task_id
Definition: task_region.h:131
constexpr const task_region_ task_region
Definition: task_region.h:223
std::size_t count() const noexcept
Definition: thread_group.h:70
Definition: task_region.h:110
void operator()(F &&f) const
Definition: task_region.h:217
void operator()(thread_group &&threads, F &&f) const
Definition: task_region.h:212
void dispatch(F &&f)
Definition: thread_group.h:90
F f
Definition: task_region.h:133
~task_region_handle() noexcept
Cancels unstarted pending tasks, and waits for them to respond.
Definition: task_region.h:144
void wait() noexcept
Blocks until all functions in region have aborted or completed.
Definition: task_region.cpp:61
void wait() noexcept
Wait until all functions provided to run have completed.
Definition: task_region.h:176
void create_state()
Definition: task_region.cpp:85
void operator()()
Definition: task_region.h:121
bool can_run(id task_id) noexcept
Definition: task_region.h:81
boost::mutex sync_on_complete
Definition: task_region.h:105