LeechCraft  0.6.70-18450-gabe19ee3b0
Modular cross-platform feature rich live environment.
channel.h
Go to the documentation of this file.
1 /**********************************************************************
2  * LeechCraft - modular cross-platform feature rich internet client.
3  * Copyright (C) 2006-2014 Georg Rudoy
4  *
5  * Distributed under the Boost Software License, Version 1.0.
6  * (See accompanying file LICENSE or copy at https://www.boost.org/LICENSE_1_0.txt)
7  **********************************************************************/
8 
9 #pragma once
10 
11 #include <atomic>
12 #include <coroutine>
13 #include <deque>
14 #include <mutex>
15 #include <optional>
16 #include <stdexcept>
17 #include <utility>
18 #include "channelfwd.h"
19 
20 namespace LC::Util
21 {
22  template<typename T>
23  class Channel
24  {
25  struct ReceiveAwaiter
26  {
27  Channel& Ch_;
28  std::optional<T> Slot_;
29  std::coroutine_handle<> Handle_;
30  bool Registered_ = false;
31 
32  explicit ReceiveAwaiter (Channel& ch)
33  : Ch_ { ch }
34  {
35  }
36 
37  ~ReceiveAwaiter ()
38  {
39  if (Registered_)
40  {
41  std::lock_guard guard { Ch_.Lock_ };
42  std::erase (Ch_.Awaiters_, this);
43  }
44  }
45 
46  bool await_ready () const noexcept
47  {
48  return false;
49  }
50 
51  bool await_suspend (std::coroutine_handle<> handle)
52  {
53  std::lock_guard guard { Ch_.Lock_ };
54  if (!Ch_.Elems_.empty ())
55  {
56  Slot_ = std::move (Ch_.Elems_.front ());
57  Ch_.Elems_.pop_front ();
58  return false;
59  }
60 
61  if (Ch_.Closed_)
62  return false;
63 
64  Ch_.Awaiters_.push_back (this);
65  Handle_ = handle;
66  Registered_ = true;
67  return true;
68  }
69 
70  std::optional<T> await_resume () noexcept
71  {
72  return std::move (Slot_);
73  }
74  };
75 
76  mutable std::mutex Lock_;
77  std::deque<T> Elems_;
78  std::deque<ReceiveAwaiter*> Awaiters_;
79 
80  std::function<void (std::coroutine_handle<>)> RunHandle_ { [] (std::coroutine_handle<> handle) { handle (); } };
81 
82  bool Closed_ = false;
83  public:
84  using ItemType_t = T;
85 
86  Channel () = default;
87 
88  Channel (const Channel&) = delete;
89  Channel (Channel&&) = delete;
90  Channel& operator= (const Channel&) = delete;
91  Channel& operator= (Channel&&) = delete;
92 
93  explicit Channel (QObject *context)
94  : RunHandle_ { [context] (auto handle) { QMetaObject::invokeMethod (context, handle); } }
95  {
96  }
97 
98  void Close ()
99  {
100  std::deque<ReceiveAwaiter*> awaiters;
101 
102  {
103  std::lock_guard guard { Lock_ };
104  if (Closed_)
105  return;
106 
107  Closed_ = true;
108  awaiters = std::exchange (Awaiters_, {});
109  for (auto awaiter : awaiters)
110  awaiter->Registered_ = false;
111  }
112 
113  for (auto awaiter : awaiters)
114  RunHandle_ (awaiter->Handle_);
115  }
116 
117  template<typename U = T>
118  void Send (U&& value)
119  {
120  ReceiveAwaiter *next = nullptr;
121  {
122  std::lock_guard guard { Lock_ };
123  if (Closed_)
124  throw std::runtime_error { "sending into a closed channel" };
125 
126  if (!Awaiters_.empty ())
127  {
128  next = Awaiters_.front ();
129  Awaiters_.pop_front ();
130  next->Registered_ = false;
131  }
132  else
133  Elems_.emplace_back (std::forward<U> (value));
134  }
135 
136  if (next)
137  {
138  next->Slot_.emplace (std::forward<U> (value));
139  RunHandle_ (next->Handle_);
140  }
141  }
142 
143  bool IsEmpty () const
144  {
145  std::lock_guard guard { Lock_ };
146  return Elems_.empty ();
147  }
148 
149  ReceiveAwaiter Receive ()
150  {
151  return ReceiveAwaiter { *this };
152  }
153 
154  auto operator co_await ()
155  {
156  return Receive ();
157  }
158  };
159 }
Channel(QObject *context)
Definition: channel.h:93
bool IsEmpty() const
Definition: channel.h:143
ReceiveAwaiter Receive()
Definition: channel.h:149
auto Tup2 &&tup2 noexcept
Definition: ctstringutils.h:68
void Close()
Definition: channel.h:98
void Send(U &&value)
Definition: channel.h:118
Channel & operator=(const Channel &)=delete