10 #include <QtConcurrentRun> 21 void CoroChannelTest::testSingleRecv ()
25 constexpr
auto producersCount = 32;
26 constexpr
auto repCount = 100;
27 constexpr
auto sleepLength = 1ms;
29 Channel<int> ch {
this };
31 std::vector<std::thread> threads;
32 std::atomic_int expected;
33 for (
int i = 0; i < producersCount; ++i)
34 threads.emplace_back ([&, i]
36 for (int j = 0; j < repCount; ++j)
38 const auto val = j * producersCount + i;
40 expected.fetch_add (val, std::memory_order::relaxed);
41 std::this_thread::sleep_for (sleepLength);
45 auto mainThread = std::this_thread::get_id ();
46 auto reader = [] (
auto mainThread, Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
49 while (
auto next = co_await ch->Receive ())
53 auto thisThread = std::this_thread::get_id ();
54 QCOMPARE (thisThread, mainThread);
62 for (
auto& thread : threads)
68 QCOMPARE (result, expected);
71 void CoroChannelTest::testManyRecvs ()
75 constexpr
auto producersCount = 32;
76 constexpr
auto consumersCount = 8;
77 constexpr
auto repCount = 100;
78 constexpr
auto sleepLength = 1ms;
82 std::vector<std::thread> producers;
83 std::atomic_int expected;
84 for (
int i = 0; i < producersCount; ++i)
85 producers.emplace_back ([&, i]
87 for (int j = 0; j < repCount; ++j)
89 const auto val = j * producersCount + i;
91 expected.fetch_add (val, std::memory_order::relaxed);
92 std::this_thread::sleep_for (sleepLength);
97 std::vector<std::thread> consumers;
98 for (
int i = 0; i < consumersCount; ++i)
99 consumers.emplace_back ([&]
101 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
104 while (auto next = co_await ch->Receive ())
111 for (
auto& thread : producers)
116 for (
auto& thread : consumers)
119 QCOMPARE (sum, expected);
122 void CoroChannelTest::testSingleThreaded ()
124 constexpr
auto iterations = 1000;
128 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
131 while (
auto next = co_await ch->Receive ())
137 for (
int i = 0; i < iterations; ++i)
146 QCOMPARE (result, expected);
149 void CoroChannelTest::testSingleThreadedTimered ()
153 constexpr
auto iterations = 100;
154 constexpr
auto interval = 1ms;
158 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
161 while (
auto next = co_await ch->Receive ())
169 timer.callOnTimeout ([&, i = 0]
mutable 173 if (++i == iterations)
179 timer.start (interval);
182 QCOMPARE (result, expected);
185 void CoroChannelTest::testMerge ()
189 constexpr
auto numChannels = 100;
190 constexpr
auto iterations = 100;
191 constexpr
auto interval = 1ms;
193 QVector<Channel_ptr<int>> channels;
194 std::generate_n (std::back_inserter (channels), numChannels, [] {
return std::make_shared<Channel<int>> (); });
198 auto reader = [] (Channel<int> *ch) -> Task<int, ThreadSafetyExtension>
201 while (
auto next = co_await ch->Receive ())
209 timer.callOnTimeout ([&, i = 0]
mutable 211 for (
int j = 0; j < numChannels; ++j)
213 auto value = i * numChannels + j;
215 channels [j]->Send (value);
217 if (++i == iterations)
220 for (
auto chan : channels)
224 timer.start (interval);
227 QCOMPARE (result, expected);
Channel_ptr< T > MergeChannels(QVector< Channel_ptr< T >> channels)
T GetTaskResult(Task< T, Extensions... > task)