20 assert(nThreadsServicingQueue == 0);
21 if (stopWhenEmpty)
assert(taskQueue.empty());
28 ++nThreadsServicingQueue;
44 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
45 if (
newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
55 Function f = taskQueue.begin()->second;
56 taskQueue.erase(taskQueue.begin());
65 --nThreadsServicingQueue;
69 --nThreadsServicingQueue;
77 taskQueue.insert(std::make_pair(t, f));
84 assert(delta_seconds > 0s && delta_seconds <= 1h);
90 std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
92 for (
const auto& element : taskQueue) {
93 temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
97 taskQueue = std::move(temp_queue);
116 std::chrono::system_clock::time_point& last)
const 119 size_t result = taskQueue.size();
120 if (!taskQueue.empty()) {
121 first = taskQueue.begin()->first;
122 last = taskQueue.rbegin()->first;
130 return nThreadsServicingQueue;
141 if (m_are_callbacks_running)
return;
142 if (m_callbacks_pending.empty())
return;
149 std::function<void()> callback;
152 if (m_are_callbacks_running)
return;
153 if (m_callbacks_pending.empty())
return;
154 m_are_callbacks_running =
true;
156 callback = std::move(m_callbacks_pending.front());
157 m_callbacks_pending.pop_front();
162 struct RAIICallbacksRunning {
165 ~RAIICallbacksRunning()
169 instance->m_are_callbacks_running =
false;
173 } raiicallbacksrunning(
this);
184 m_callbacks_pending.emplace_back(std::move(func));
192 bool should_continue =
true;
193 while (should_continue) {
196 should_continue = !m_callbacks_pending.empty();
203 return m_callbacks_pending.size();
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
void MaybeScheduleProcessQueue()
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
std::function< void()> Function
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
CScheduler * m_pscheduler
std::condition_variable newTaskScheduled
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
RecursiveMutex m_cs_callbacks_pending
void serviceQueue()
Services the queue 'forever'.
#define WAIT_LOCK(cs, name)
size_t CallbacksPending()
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Simple class for background tasks that should be run periodically or once "after a while"...
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)