LeechCraft  0.6.70-18450-gabe19ee3b0
Modular cross-platform feature rich live environment.
corochanneltest.cpp
Go to the documentation of this file.
1 /**********************************************************************
2  * LeechCraft - modular cross-platform feature rich internet client.
3  * Copyright (C) 2006-2014 Georg Rudoy
4  *
5  * Distributed under the Boost Software License, Version 1.0.
6  * (See accompanying file LICENSE or copy at https://www.boost.org/LICENSE_1_0.txt)
7  **********************************************************************/
8 
9 #include "corochanneltest.h"
10 #include <QtConcurrentRun>
11 #include <QtTest>
12 #include "coro.h"
13 #include "coro/channel.h"
14 #include "coro/channelutils.h"
15 #include "coro/getresult.h"
16 
17 QTEST_GUILESS_MAIN (LC::Util::CoroChannelTest)
18 
19 namespace LC::Util
20 {
21  void CoroChannelTest::testSingleRecv ()
22  {
23  using namespace std::chrono_literals;
24 
25  constexpr auto producersCount = 32;
26  constexpr auto repCount = 100;
27  constexpr auto sleepLength = 1ms;
28 
29  Channel<int> ch { this };
30 
31  std::vector<std::thread> threads;
32  std::atomic_int expected;
33  for (int i = 0; i < producersCount; ++i)
34  threads.emplace_back ([&, i]
35  {
36  for (int j = 0; j < repCount; ++j)
37  {
38  const auto val = j * producersCount + i;
39  ch.Send (val);
40  expected.fetch_add (val, std::memory_order::relaxed);
41  std::this_thread::sleep_for (sleepLength);
42  }
43  });
44 
45  auto mainThread = std::this_thread::get_id ();
46  auto reader = [] (auto mainThread, Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
47  {
48  int sum = 0;
49  while (auto next = co_await ch->Receive ())
50  {
51  [=]
52  {
53  auto thisThread = std::this_thread::get_id ();
54  QCOMPARE (thisThread, mainThread);
55  } ();
56 
57  sum += *next;
58  }
59  co_return sum;
60  } (mainThread, &ch);
61 
62  for (auto& thread : threads)
63  thread.join ();
64 
65  ch.Close ();
66 
67  auto result = GetTaskResult (reader);
68  QCOMPARE (result, expected);
69  }
70 
71  void CoroChannelTest::testManyRecvs ()
72  {
73  using namespace std::chrono_literals;
74 
75  constexpr auto producersCount = 32;
76  constexpr auto consumersCount = 8;
77  constexpr auto repCount = 100;
78  constexpr auto sleepLength = 1ms;
79 
80  Channel<int> ch;
81 
82  std::vector<std::thread> producers;
83  std::atomic_int expected;
84  for (int i = 0; i < producersCount; ++i)
85  producers.emplace_back ([&, i]
86  {
87  for (int j = 0; j < repCount; ++j)
88  {
89  const auto val = j * producersCount + i;
90  ch.Send (val);
91  expected.fetch_add (val, std::memory_order::relaxed);
92  std::this_thread::sleep_for (sleepLength);
93  }
94  });
95 
96  std::atomic_int sum;
97  std::vector<std::thread> consumers;
98  for (int i = 0; i < consumersCount; ++i)
99  consumers.emplace_back ([&]
100  {
101  auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
102  {
103  int sum = 0;
104  while (auto next = co_await ch->Receive ())
105  sum += *next;
106  co_return sum;
107  } (&ch);
108  sum.fetch_add (GetTaskResult (reader));
109  });
110 
111  for (auto& thread : producers)
112  thread.join ();
113 
114  ch.Close ();
115 
116  for (auto& thread : consumers)
117  thread.join ();
118 
119  QCOMPARE (sum, expected);
120  }
121 
122  void CoroChannelTest::testSingleThreaded ()
123  {
124  constexpr auto iterations = 1000;
125 
126  Channel<int> ch;
127 
128  auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
129  {
130  int sum = 0;
131  while (auto next = co_await ch->Receive ())
132  sum += *next;
133  co_return sum;
134  } (&ch);
135 
136  int expected = 0;
137  for (int i = 0; i < iterations; ++i)
138  {
139  expected += i;
140  ch.Send (i);
141  }
142 
143  ch.Close ();
144 
145  const auto result = GetTaskResult (reader);
146  QCOMPARE (result, expected);
147  }
148 
149  void CoroChannelTest::testSingleThreadedTimered ()
150  {
151  using namespace std::chrono_literals;
152 
153  constexpr auto iterations = 100;
154  constexpr auto interval = 1ms;
155 
156  Channel<int> ch;
157 
158  auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
159  {
160  int sum = 0;
161  while (auto next = co_await ch->Receive ())
162  sum += *next;
163  co_return sum;
164  } (&ch);
165 
166  int expected = 0;
167 
168  QTimer timer;
169  timer.callOnTimeout ([&, i = 0] mutable
170  {
171  expected += i;
172  ch.Send (i);
173  if (++i == iterations)
174  {
175  timer.stop ();
176  ch.Close ();
177  }
178  });
179  timer.start (interval);
180 
181  const auto result = GetTaskResult (reader);
182  QCOMPARE (result, expected);
183  }
184 
185  void CoroChannelTest::testMerge ()
186  {
187  using namespace std::chrono_literals;
188 
189  constexpr auto numChannels = 100;
190  constexpr auto iterations = 100;
191  constexpr auto interval = 1ms;
192 
193  QVector<Channel_ptr<int>> channels;
194  std::generate_n (std::back_inserter (channels), numChannels, [] { return std::make_shared<Channel<int>> (); });
195 
196  auto merged = MergeChannels (channels);
197 
198  auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
199  {
200  int sum = 0;
201  while (auto next = co_await ch->Receive ())
202  sum += *next;
203  co_return sum;
204  } (merged.get ());
205 
206  int expected = 0;
207 
208  QTimer timer;
209  timer.callOnTimeout ([&, i = 0] mutable
210  {
211  for (int j = 0; j < numChannels; ++j)
212  {
213  auto value = i * numChannels + j;
214  expected += value;
215  channels [j]->Send (value);
216  }
217  if (++i == iterations)
218  {
219  timer.stop ();
220  for (auto chan : channels)
221  chan->Close ();
222  }
223  });
224  timer.start (interval);
225 
226  const auto result = GetTaskResult (reader);
227  QCOMPARE (result, expected);
228  }
229 }
Channel_ptr< T > MergeChannels(QVector< Channel_ptr< T >> channels)
Definition: channelutils.h:19
T GetTaskResult(Task< T, Extensions... > task)
Definition: getresult.h:19