LeviLamina
Loading...
Searching...
No Matches
ConcurrentPriorityQueue.h
1#pragma once
2
3// This header-only concurrency data structure implementation is based on Intel's
4// implementation in its Threading Building Blocks ("Intel Material").
5
6/*
7 Intel Material Copyright 2005-2008 Intel Corporation. All Rights Reserved.
8*/
9
10#include <atomic>
11#include <memory>
12#include <thread>
13#include <utility>
14
15#include "ll/api/base/CompilerPredefine.h"
16
17namespace ll::data {
18namespace detail {
20// It should be used only in situations where having a compile-time upper
21// bound is more useful than a run-time exact answer.
22constexpr size_t max_nfs_size = std::hardware_destructive_interference_size;
23
27 // Should be equal to approximately the number of "pause" instructions
28 // that take the same time as an context switch. Must be a power of two.
29 static constexpr int LOOPS_BEFORE_YIELD = 16;
30 int count;
31
32 static inline void machine_pause([[maybe_unused]] int delay) {
33#ifdef LL_MACHINE_PAUSE
34 while (delay-- > 0) {
35 LL_MACHINE_PAUSE;
36 }
37#else
38 std::this_thread::yield();
39#endif
40 }
41
42public:
43 // In many cases, an object of this type is initialized eagerly on hot path,
44 // as in for(atomic_backoff b; ; b.pause()) { /*loop body*/ }
45 // For this reason, the construction cost must be very small!
46 atomic_backoff() : count(1) {}
47 // This constructor pauses immediately; do not use on hot paths!
48 atomic_backoff(bool) : count(1) { pause(); }
49
52 atomic_backoff& operator=(atomic_backoff const&) = delete;
53
55 void pause() {
56 if (count <= LOOPS_BEFORE_YIELD) {
57 machine_pause(count);
58 // Pause twice as long the next time.
59 count *= 2;
60 } else {
61 // Pause is so long that we might as well yield CPU to scheduler.
62 std::this_thread::yield();
63 }
64 }
65};
66
68
69template <typename T, typename C>
70T spin_wait_while(std::atomic<T> const& location, C comp, std::memory_order order) {
71 atomic_backoff backoff;
72 T snapshot = location.load(order);
73 while (comp(snapshot)) {
74 backoff.pause();
75 snapshot = location.load(order);
76 }
77 return snapshot;
78}
79
81
82template <typename T, typename U>
83T spin_wait_while_eq(
84 std::atomic<T> const& location,
85 const U value,
86 std::memory_order order = std::memory_order_acquire
87) {
88 return spin_wait_while(location, [&value](T t) { return t == value; }, order);
89}
90
92
93template <typename T, typename U>
94T spin_wait_until_eq(
95 std::atomic<T> const& location,
96 const U value,
97 std::memory_order order = std::memory_order_acquire
98) {
99 return spin_wait_while(location, [&value](T t) { return t != value; }, order);
100}
101// Base class for aggregated operation
102template <typename Derived>
104public:
105 // Zero value means "wait" status, all other values are "user" specified values and
106 // are defined into the scope of a class which uses "status"
107 std::atomic<uintptr_t> status;
108
109 std::atomic<Derived*> next;
110 aggregated_operation() : status{}, next(nullptr) {}
111};
112
113// Aggregator base class
114/* An aggregator for collecting operations coming from multiple sources and executing
115 them serially on a single thread. OperationType must be derived from
116 aggregated_operation. The parameter HandlerType is a functor that will be passed the
117 list of operations and is expected to handle each operation appropriately, setting the
118 status of each operation to non-zero. */
119template <typename OperationType>
121public:
122 aggregator_generic() : pending_operations(nullptr), handler_busy(false) {}
123
124 // Execute an operation
125 /* Places an operation into the waitlist (pending_operations), and either handles the list,
126 or waits for the operation to complete, or returns. */
127 template <typename HandlerType>
128 void execute(OperationType* op, HandlerType& handle_operations) {
129 // op->status should be read before inserting the operation into the
130 // aggregator waitlist since it can become invalid after executing a
131 // handler (if the operation has 'short' life time.)
132 const uintptr_t status = op->status.load(std::memory_order_relaxed);
133
134 // ITT note: &(op->status) tag is used to cover accesses to this op node. This
135 // thread has created the operation, and now releases it so that the handler
136 // thread may handle the associated operation w/o triggering a race condition;
137 // thus this tag will be acquired just before the operation is handled in the
138 // handle_operations functor.
139 // insert the operation in the queue.
140 OperationType* res = pending_operations.load(std::memory_order_relaxed);
141 do {
142 op->next.store(res, std::memory_order_relaxed);
143 } while (!pending_operations.compare_exchange_strong(res, op));
144 if (!res) { // first in the list; handle the operations
145 // ITT note: &pending_operations tag covers access to the handler_busy flag,
146 // which this waiting handler thread will try to set before entering
147 // handle_operations.
148 start_handle_operations(handle_operations);
149 // The operation with 'short' life time can already be destroyed
150 }
151 // Not first; wait for op to be ready
152 else if (!status) { // operation is blocking here.
153 spin_wait_while_eq(op->status, uintptr_t(0));
154 }
155 }
156
157private:
158 // Trigger the handling of operations when the handler is free
159 template <typename HandlerType>
160 void start_handle_operations(HandlerType& handle_operations) {
161 OperationType* op_list;
162
163 // ITT note: &handler_busy tag covers access to pending_operations as it is passed
164 // between active and waiting handlers. Below, the waiting handler waits until
165 // the active handler releases, and the waiting handler acquires &handler_busy as
166 // it becomes the active_handler. The release point is at the end of this
167 // function, when all operations in pending_operations have been handled by the
168 // owner of this aggregator.
169 // get the handler_busy:
170 // only one thread can possibly spin here at a time
171 spin_wait_until_eq(handler_busy, uintptr_t(0));
172 // acquire fence not necessary here due to causality rule and surrounding atomics
173 handler_busy.store(1, std::memory_order_relaxed);
174
175 // ITT note: &pending_operations tag covers access to the handler_busy flag
176 // itself. Capturing the state of the pending_operations signifies that
177 // handler_busy has been set and a new active handler will now process that list's
178 // operations.
179 // grab pending_operations
180 op_list = pending_operations.exchange(nullptr);
181
182 // handle all the operations
183 handle_operations(op_list);
184
185 // release the handler
186 handler_busy.store(0, std::memory_order_release);
187 }
188
189 // An atomically updated list (aka mailbox) of pending operations
190 std::atomic<OperationType*> pending_operations;
191 // Controls threads access to handle_operations
192 std::atomic<uintptr_t> handler_busy;
193};
194
195template <typename HandlerType, typename OperationType>
196class aggregator : public aggregator_generic<OperationType> {
197 HandlerType handle_operations;
198
199public:
200 aggregator() = default;
201
202 void initialize_handler(HandlerType h) { handle_operations = h; }
203
204 void execute(OperationType* op) { aggregator_generic<OperationType>::execute(op, handle_operations); }
205};
206
207template <typename T, typename Compare = std::less<T>, typename Allocator = std::allocator<T>>
209public:
210 using value_type = T;
211 using reference = T&;
212 using const_reference = T const&;
213
214 using size_type = std::size_t;
215 using difference_type = std::ptrdiff_t;
216
217 using allocator_type = Allocator;
218
219protected:
220 using vector_type = std::vector<value_type, allocator_type>;
221
222public:
224
225 explicit concurrent_priority_queue(allocator_type const& alloc) : mark(0), my_size(0), my_compare(), c(alloc) {
226 my_aggregator.initialize_handler(functor{this});
227 }
228
229 explicit concurrent_priority_queue(Compare const& compare, allocator_type const& alloc = allocator_type())
230 : mark(0),
231 my_size(0),
232 my_compare(compare),
233 c(alloc) {
234 my_aggregator.initialize_handler(functor{this});
235 }
236
237 explicit concurrent_priority_queue(size_type init_capacity, allocator_type const& alloc = allocator_type())
238 : mark(0),
239 my_size(0),
240 my_compare(),
241 c(alloc) {
242 c.reserve(init_capacity);
243 my_aggregator.initialize_handler(functor{this});
244 }
245
247 size_type init_capacity,
248 Compare const& compare,
249 allocator_type const& alloc = allocator_type()
250 )
251 : mark(0),
252 my_size(0),
253 my_compare(compare),
254 c(alloc) {
255 c.reserve(init_capacity);
256 my_aggregator.initialize_handler(functor{this});
257 }
258
259 template <typename InputIterator>
261 InputIterator begin,
262 InputIterator end,
263 Compare const& compare,
264 allocator_type const& alloc = allocator_type()
265 )
266 : mark(0),
267 my_compare(compare),
268 c(begin, end, alloc) {
269 my_aggregator.initialize_handler(functor{this});
270 heapify();
271 my_size.store(c.size(), std::memory_order_relaxed);
272 }
273
274 template <typename InputIterator>
275 concurrent_priority_queue(InputIterator begin, InputIterator end, allocator_type const& alloc = allocator_type())
276 : concurrent_priority_queue(begin, end, Compare(), alloc) {}
277
279 std::initializer_list<value_type> init,
280 Compare const& compare,
281 allocator_type const& alloc = allocator_type()
282 )
283 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
284
285 concurrent_priority_queue(std::initializer_list<value_type> init, allocator_type const& alloc = allocator_type())
286 : concurrent_priority_queue(init, Compare(), alloc) {}
287
289 : mark(other.mark),
290 my_size(other.my_size.load(std::memory_order_relaxed)),
291 my_compare(other.my_compare),
292 c(other.c) {
293 my_aggregator.initialize_handler(functor{this});
294 }
295
296 concurrent_priority_queue(concurrent_priority_queue const& other, allocator_type const& alloc)
297 : mark(other.mark),
298 my_size(other.my_size.load(std::memory_order_relaxed)),
299 my_compare(other.my_compare),
300 c(other.c, alloc) {
301 my_aggregator.initialize_handler(functor{this});
302 }
303
305 : mark(other.mark),
306 my_size(other.my_size.load(std::memory_order_relaxed)),
307 my_compare(other.my_compare),
308 c(std::move(other.c)) {
309 my_aggregator.initialize_handler(functor{this});
310 }
311
312 concurrent_priority_queue(concurrent_priority_queue&& other, allocator_type const& alloc)
313 : mark(other.mark),
314 my_size(other.my_size.load(std::memory_order_relaxed)),
315 my_compare(other.my_compare),
316 c(std::move(other.c), alloc) {
317 my_aggregator.initialize_handler(functor{this});
318 }
319
321 if (this != &other) {
322 c = other.c;
323 mark = other.mark;
324 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
325 }
326 return *this;
327 }
328
330 if (this != &other) {
331 c = std::move(other.c);
332 mark = other.mark;
333 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
334 }
335 return *this;
336 }
337
338 concurrent_priority_queue& operator=(std::initializer_list<value_type> init) {
339 assign(init.begin(), init.end());
340 return *this;
341 }
342
343 template <typename InputIterator>
344 void assign(InputIterator begin, InputIterator end) {
345 c.assign(begin, end);
346 mark = 0;
347 my_size.store(c.size(), std::memory_order_relaxed);
348 heapify();
349 }
350
351 void assign(std::initializer_list<value_type> init) { assign(init.begin(), init.end()); }
352
353 concurrent_priority_queue& operator=(vector_type&& vec) {
354 c = std::move(vec);
355 mark = 0;
356 my_size.store(c.size(), std::memory_order_relaxed);
357 heapify();
358 return *this;
359 }
360
361 /* Returned value may not reflect results of pending operations.
362 This operation reads shared data and will trigger a race condition. */
363 [[nodiscard]] bool empty() const { return size() == 0; }
364
365 // Returns the current number of elements contained in the queue
366 /* Returned value may not reflect results of pending operations.
367 This operation reads shared data and will trigger a race condition. */
368 size_type size() const { return my_size.load(std::memory_order_relaxed); }
369
370 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
371 void push(value_type const& value)
372 requires(std::is_copy_constructible_v<value_type>)
373 {
374 cpq_operation op_data(value, PUSH_OP);
375 my_aggregator.execute(&op_data);
376 if (op_data.status == FAILED) throw std::bad_alloc{};
377 }
378
379 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
380 void push(value_type&& value) {
381 cpq_operation op_data(value, PUSH_RVALUE_OP);
382 my_aggregator.execute(&op_data);
383 if (op_data.status == FAILED) throw std::bad_alloc{};
384 }
385
386 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
387 template <typename... Args>
388 void emplace(Args&&... args) {
389 push(value_type(std::forward<Args>(args)...));
390 }
391
392 // Gets a reference to and removes highest priority element
393 /* If a highest priority element was found, sets elem and returns true,
394 otherwise returns false.
395 This operation can be safely used concurrently with other push, try_pop or emplace operations. */
396 bool try_pop(value_type& value) {
397 cpq_operation op_data(value, POP_OP);
398 my_aggregator.execute(&op_data);
399 return op_data.status == SUCCEEDED;
400 }
401
402 template <class F>
403 bool try_pop_if(F&& fn) {
404 function_data fn_data{(void*)std::addressof(fn), +[](void* f, value_type& value) -> bool {
405 return std::invoke(static_cast<F&&>(*((F*)f)), value);
406 }};
407 cpq_operation op_data(fn_data, POP_FN_OP);
408 my_aggregator.execute(&op_data);
409 return op_data.status == SUCCEEDED;
410 }
411
412 // This operation affects the whole container => it is not thread-safe
413 void clear() {
414 c.clear();
415 mark = 0;
416 my_size.store(0, std::memory_order_relaxed);
417 }
418
419 // This operation affects the whole container => it is not thread-safe
420 void swap(concurrent_priority_queue& other) {
421 if (this != &other) {
422 using std::swap;
423 swap(c, other.c);
424 swap(mark, other.mark);
425
426 size_type sz = my_size.load(std::memory_order_relaxed);
427 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
428 other.my_size.store(sz, std::memory_order_relaxed);
429 }
430 }
431
432 allocator_type get_allocator() const { return c.get_allocator(); }
433
434private:
435 enum operation_type { PUSH_OP, PUSH_RVALUE_OP, POP_OP, POP_FN_OP };
436 enum operation_status { WAIT = 0, SUCCEEDED, FAILED };
437
438 struct function_data {
439 using function_type = bool(void*, value_type&);
440 void* data;
441 function_type* func;
442
443 bool operator()(value_type& val) const { return func(data, val); }
444 };
445
446 class cpq_operation : public aggregated_operation<cpq_operation> {
447 public:
448 operation_type type;
449 union {
450 function_data const* fn;
451 value_type* elem;
452 };
453 cpq_operation(value_type const& value, operation_type t) : type(t), elem(const_cast<value_type*>(&value)) {}
454
455 cpq_operation(function_data const& f, operation_type t) : type(t), fn(&f) {}
456 };
457
458 class functor {
460
461 public:
462 functor() : my_cpq(nullptr) {}
463 functor(concurrent_priority_queue* cpq) : my_cpq(cpq) {}
464
465 void operator()(cpq_operation* op_list) { my_cpq->handle_operations(op_list); }
466 };
467
468 void handle_operations(cpq_operation* op_list) {
469 cpq_operation *tmp, *pop_list = nullptr;
470
471 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
472 while (op_list) {
473 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
474 // node. This thread is going to handle the operation, and so will acquire it
475 // and perform the associated operation w/o triggering a race condition; the
476 // thread that created the operation is waiting on the status field, so when
477 // this thread is done with the operation, it will perform a
478 // store_with_release to give control back to the waiting thread in
479 // aggregator::insert_operation.
480
481 tmp = op_list;
482 op_list = op_list->next.load(std::memory_order_relaxed);
483
484 switch (tmp->type) {
485 case PUSH_OP:
486 try {
487 if constexpr (std::is_copy_constructible_v<value_type>) {
488 c.push_back(*(tmp->elem));
489 }
490 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
491 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
492 } catch (...) {
493 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
494 }
495 break;
496 case PUSH_RVALUE_OP:
497 try {
498 c.push_back(std::move(*(tmp->elem)));
499 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
500 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
501 } catch (...) {
502 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
503 }
504 break;
505 case POP_OP:
506 if (mark < c.size() && my_compare(c[0], c.back())) {
507 // there are newly pushed elems and the last one is higher than top
508 *(tmp->elem) = std::move(c.back());
509 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
510 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
511 c.pop_back();
512 } else { // no convenient item to pop; postpone
513 tmp->next.store(pop_list, std::memory_order_relaxed);
514 pop_list = tmp;
515 }
516 break;
517 case POP_FN_OP:
518 if (mark < c.size() && my_compare(c[0], c.back())) {
519 // there are newly pushed elems and the last one is higher than top
520 if ((*(tmp->fn))(c.back())) {
521 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
522 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
523 c.pop_back();
524 } else {
525 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
526 }
527 } else { // no convenient item to pop; postpone
528 tmp->next.store(pop_list, std::memory_order_relaxed);
529 pop_list = tmp;
530 }
531 break;
532 default:
533 LL_UNREACHABLE;
534 }
535 }
536
537 // Second pass processes pop operations
538 while (pop_list) {
539 tmp = pop_list;
540 pop_list = pop_list->next.load(std::memory_order_relaxed);
541 if (c.empty()) {
542 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
543 } else {
544 if (mark < c.size() && my_compare(c[0], c.back())) {
545 // there are newly pushed elems and the last one is higher than top
546 if (tmp->type == POP_FN_OP ? ((*(tmp->fn))(c.back()))
547 : (*(tmp->elem) = std::move(c.back()), true)) {
548 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
549 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
550 c.pop_back();
551 } else {
552 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
553 }
554 } else { // extract top and push last element down heap
555 if (tmp->type == POP_FN_OP ? ((*(tmp->fn))(c[0])) : (*(tmp->elem) = std::move(c[0]), true)) {
556 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
557 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
558 reheap();
559 } else {
560 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
561 }
562 }
563 }
564 }
565
566 // heapify any leftover pushed elements before doing the next
567 // batch of operations
568 if (mark < c.size()) heapify();
569 }
570
571 // Merge unsorted elements into heap
572 void heapify() {
573 if (!mark && c.size() > 0) mark = 1;
574 for (; mark < c.size(); ++mark) {
575 // for each unheapified element under size
576 size_type cur_pos = mark;
577 value_type to_place = std::move(c[mark]);
578 do { // push to_place up the heap
579 size_type parent = (cur_pos - 1) >> 1;
580 if (!my_compare(c[parent], to_place)) break;
581 c[cur_pos] = std::move(c[parent]);
582 cur_pos = parent;
583 } while (cur_pos);
584 c[cur_pos] = std::move(to_place);
585 }
586 }
587
588 // Re-heapify after an extraction
589 // Re-heapify by pushing last element down the heap from the root.
590 void reheap() {
591 size_type cur_pos = 0, child = 1;
592
593 while (child < mark) {
594 size_type target = child;
595 if (child + 1 < mark && my_compare(c[child], c[child + 1])) ++target;
596 // target now has the higher priority child
597 if (my_compare(c[target], c.back())) break;
598 c[cur_pos] = std::move(c[target]);
599 cur_pos = target;
600 child = (cur_pos << 1) + 1;
601 }
602 if (cur_pos != c.size() - 1) c[cur_pos] = std::move(c.back());
603 c.pop_back();
604 if (mark > c.size()) mark = c.size();
605 }
606
608
609 alignas(max_nfs_size) aggregator_type my_aggregator;
610 // Padding added to avoid false sharing
611 char padding1[max_nfs_size - sizeof(aggregator_type) % max_nfs_size];
612 // The point at which unsorted elements begin
613 size_type mark;
614 std::atomic<size_type> my_size;
615 Compare my_compare;
616
617 // Padding added to avoid false sharing
618 char padding2[max_nfs_size - (2 * sizeof(size_type)) - sizeof(Compare) % max_nfs_size];
620
638protected:
639 vector_type c;
640
641 friend bool operator==(concurrent_priority_queue const& lhs, concurrent_priority_queue const& rhs) {
642 return lhs.c == rhs.c;
643 }
644};
645
646} // namespace detail
647
648using detail::concurrent_priority_queue;
649
650} // namespace ll::data
Definition ConcurrentPriorityQueue.h:103
Definition ConcurrentPriorityQueue.h:120
Definition ConcurrentPriorityQueue.h:196
Class that implements exponential backoff.
Definition ConcurrentPriorityQueue.h:25
void pause()
Pause for a while.
Definition ConcurrentPriorityQueue.h:55
atomic_backoff(atomic_backoff const &)=delete
No Copy.
Definition ConcurrentPriorityQueue.h:208
vector_type c
Storage for the heap of elements in queue, plus unheapified elements.
Definition ConcurrentPriorityQueue.h:639