LeviLamina
Loading...
Searching...
No Matches
Collect.h
1#pragma once
2
3#include <atomic>
4
5#include "ll/api/coro/CoroPromise.h"
6
7namespace ll::coro {
8
9template <class Container>
11public:
12 using ExpectedResult = typename Container::value_type::ExpectedResult;
13
14 using allocator_type =
15 typename std::allocator_traits<typename Container::allocator_type>::template rebind_alloc<ExpectedResult>;
16
17private:
18 std::coroutine_handle<> handle;
19 Container tasks;
20 std::vector<ExpectedResult, allocator_type> results;
21 std::atomic_size_t counter;
22
23public:
24 CollectAllAwaiter(CollectAllAwaiter const&) = delete;
25 CollectAllAwaiter& operator=(CollectAllAwaiter const&) = delete;
26
27 constexpr CollectAllAwaiter(Container&& input)
28 : tasks(std::move(input)),
29 results(tasks.get_allocator()),
30 counter(tasks.size()) {
31 results.resize(tasks.size());
32 }
33 constexpr CollectAllAwaiter(CollectAllAwaiter&&) = default;
34
35 constexpr bool await_ready() const noexcept { return tasks.empty(); }
36
37 template <std::derived_from<CoroPromiseBase> P>
38 void await_suspend(std::coroutine_handle<P> h) {
39 handle = h;
40 NonNullExecutorRef exec = h.promise().exec;
41 for (size_t i = 0; i < tasks.size(); ++i) {
42 tasks[i].launch(exec, [this, i](auto&& res) {
43 try {
44 results[i] = std::move(res);
45 } catch (...) {}
46 if (--counter == 0) {
47 handle.resume();
48 }
49 });
50 }
51 }
52
53 constexpr auto await_resume() { return std::move(results); }
54};
55template <template <class> class T, class... Ts>
57public:
58 using Input = std::tuple<T<Ts>...>;
59 using Result = std::tuple<typename T<Ts>::ExpectedResult...>;
60
61private:
62 std::coroutine_handle<> handle;
63 std::atomic_size_t counter{sizeof...(Ts)};
64 Input tasks;
65 Result results;
66
67public:
69 CollectAllTupleAwaiter& operator=(CollectAllTupleAwaiter const&) = delete;
70
71 constexpr CollectAllTupleAwaiter(T<Ts>&&... in) : tasks(std::move(in)...) {}
72 constexpr CollectAllTupleAwaiter(Input&& in) : tasks(std::move(in)) {}
74
75 constexpr bool await_ready() const noexcept { return sizeof...(Ts) == 0; }
76
77 template <std::derived_from<CoroPromiseBase> P>
78 void await_suspend(std::coroutine_handle<P> h) {
79 handle = h;
80 NonNullExecutorRef exec = h.promise().exec;
81 [&]<size_t... I>(std::index_sequence<I...>) {
82 (
83 [&](auto& task, auto& result) {
84 task.launch(exec, [this, &result](auto&& res) {
85 try {
86 result = std::move(res);
87 } catch (...) {}
88 if (--counter == 0) {
89 handle.resume();
90 }
91 });
92 }(std::get<I>(tasks), std::get<I>(results)),
93 ...
94 );
95 }(std::index_sequence_for<Ts...>());
96 }
97
98 constexpr auto await_resume() { return std::move(results); }
99};
100} // namespace ll::coro
Definition Container.h:30
Definition Result.h:6
Definition Collect.h:56
Definition Collect.h:10