Line data Source code
1 : /**
2 : Copyright (c) 2021-2022 Roman Katuntsev <sbkarr@stappler.org>
3 : Copyright (c) 2023 Stappler LLC <admin@stappler.dev>
4 :
5 : Permission is hereby granted, free of charge, to any person obtaining a copy
6 : of this software and associated documentation files (the "Software"), to deal
7 : in the Software without restriction, including without limitation the rights
8 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 : copies of the Software, and to permit persons to whom the Software is
10 : furnished to do so, subject to the following conditions:
11 :
12 : The above copyright notice and this permission notice shall be included in
13 : all copies or substantial portions of the Software.
14 :
15 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 : THE SOFTWARE.
22 : **/
23 :
24 : #ifndef STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_
25 : #define STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_
26 :
27 : #include "SPMemFunction.h"
28 :
29 : #define SP_PRIORITY_QUEUE_RANGE_DEBUG 0
30 :
31 : namespace STAPPLER_VERSIONIZED stappler::memory {
32 :
33 : void PriorityQueue_lock_noOp(void *);
34 : void PriorityQueue_lock_std_mutex(void *);
35 : void PriorityQueue_unlock_std_mutex(void *);
36 :
37 : // Real-time task priority queue
38 : // It's designed for relatively low pending tasks (below PreallocatedNodes),
39 : // with relatively low tasks with priority different from zero
40 : template <typename Value>
41 : class PriorityQueue {
42 : public:
43 : static constexpr size_t PreallocatedNodes = 8;
44 : static constexpr size_t StorageNodes = 64;
45 :
46 : using LockFnPtr = void (*) (void *);
47 : using PriorityType = int32_t;
48 :
49 : struct StorageBlock;
50 :
51 : struct alignas(Value) AlignedStorage {
52 : uint8_t buffer[sizeof(Value)];
53 : };
54 :
55 : // Nodes will be sequentially placed in continuous region of memory, so, they should have proper alignment
56 : struct Node {
57 : AlignedStorage storage;
58 : Node *next;
59 : StorageBlock *block;
60 : PriorityType priority;
61 : };
62 :
63 : struct StorageBlock {
64 : std::array<Node, StorageNodes> nodes;
65 : uint32_t used = 0;
66 : };
67 :
68 : struct LockInterface {
69 : void *lockPtr = nullptr;
70 : LockFnPtr lockFn = &PriorityQueue_lock_noOp;
71 : LockFnPtr unlockFn = &PriorityQueue_lock_noOp;
72 :
73 894 : void clear() {
74 894 : lockPtr = nullptr;
75 894 : lockFn = &PriorityQueue_lock_noOp;
76 894 : unlockFn = &PriorityQueue_lock_noOp;
77 894 : }
78 :
79 219174995 : void lock() {
80 219174995 : lockFn(lockPtr);
81 219665890 : }
82 :
83 219631210 : void unlock() {
84 219631210 : unlockFn(lockPtr);
85 219708742 : }
86 :
87 : bool operator==(const LockInterface &other) const {
88 : return lockPtr == other.lockPtr && lockFn == other.lockFn && unlockFn == other.unlockFn;
89 : }
90 :
91 260 : bool operator!=(const LockInterface &other) const {
92 260 : return lockPtr != other.lockPtr || lockFn != other.lockFn || unlockFn != other.unlockFn;
93 : }
94 : };
95 :
96 : struct NodeInterface {
97 : Node *first = nullptr;
98 : Node *last = nullptr;
99 : LockInterface lock;
100 : };
101 :
102 338 : PriorityQueue() {
103 338 : initNodes(&_preallocated[0], &_preallocated[_preallocated.size() - 1], nullptr);
104 338 : _free.first = &_preallocated[0];
105 338 : _free.last = &_preallocated[_preallocated.size() - 1];
106 338 : }
107 :
108 317 : ~PriorityQueue() {
109 : // disable locking
110 317 : _queue.lock.clear();
111 317 : _free.lock.clear();
112 :
113 317 : auto n = _queue.first;
114 407 : while (n) {
115 90 : Value * val = (Value *)(n->storage.buffer);
116 90 : val->~Value();
117 90 : freeNode(n);
118 90 : n = n->next;
119 : }
120 317 : }
121 :
122 : PriorityQueue(const PriorityQueue &) = delete;
123 : PriorityQueue &operator=(const PriorityQueue &) = delete;
124 :
125 : PriorityQueue(PriorityQueue &&) = delete;
126 : PriorityQueue &operator=(PriorityQueue &&) = delete;
127 :
128 210 : size_t capacity() const {
129 210 : return _capacity;
130 : }
131 :
132 240 : size_t free_capacity() {
133 240 : std::unique_lock<LockInterface> lock(_free.lock);
134 240 : size_t ret = 0;
135 240 : auto node = _free.first;
136 5280 : while (node) {
137 5040 : ++ ret;
138 5040 : node = node->next;
139 : }
140 240 : return ret;
141 240 : }
142 :
143 : void setQueueLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
144 : _queue.lock.lockFn = lockFn;
145 : _queue.lock.unlockFn = unlockFn;
146 : _queue.lock.lockPtr = ptr;
147 : }
148 :
149 : void setFreeLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
150 : _free.lock.lockFn = lockFn;
151 : _free.lock.unlockFn = unlockFn;
152 : _free.lock.lockPtr = ptr;
153 : }
154 :
155 : void setLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
156 : setQueueLocking(lockFn, unlockFn, ptr);
157 : setFreeLocking(lockFn, unlockFn, ptr);
158 : }
159 :
160 188 : void setQueueLocking(std::mutex &mutex) {
161 188 : _queue.lock.lockFn = PriorityQueue_lock_std_mutex;
162 188 : _queue.lock.unlockFn = PriorityQueue_unlock_std_mutex;
163 188 : _queue.lock.lockPtr = &mutex;
164 188 : }
165 :
166 188 : void setFreeLocking(std::mutex &mutex) {
167 188 : _free.lock.lockFn = PriorityQueue_lock_std_mutex;
168 188 : _free.lock.unlockFn = PriorityQueue_unlock_std_mutex;
169 188 : _free.lock.lockPtr = &mutex;
170 188 : }
171 :
172 : void setLocking(std::mutex &mutex) {
173 : setQueueLocking(mutex);
174 : setFreeLocking(mutex);
175 : }
176 :
177 130 : void clear() {
178 130 : auto tmpFreeLock = _free.lock;
179 130 : auto tmpQueueLock = _queue.lock;
180 :
181 130 : _free.lock.clear();
182 130 : _queue.lock.clear();
183 :
184 130 : if (tmpFreeLock != tmpQueueLock) {
185 100 : tmpFreeLock.lock();
186 100 : tmpQueueLock.lock();
187 : } else {
188 30 : tmpQueueLock.lock();
189 : }
190 :
191 4210 : while (auto node = popNode()) {
192 4080 : Value * val = (Value *)(node->storage.buffer);
193 4080 : val->~Value();
194 4080 : freeNode(node);
195 : }
196 :
197 130 : if (tmpFreeLock != tmpQueueLock) {
198 100 : tmpFreeLock.unlock();
199 100 : tmpQueueLock.unlock();
200 : } else {
201 30 : tmpQueueLock.unlock();
202 : }
203 :
204 130 : _free.lock = tmpFreeLock;
205 130 : _queue.lock = tmpQueueLock;
206 130 : }
207 :
208 21 : bool empty() {
209 21 : std::unique_lock<LockInterface> lock(_queue.lock);
210 42 : return empty(lock);
211 21 : }
212 :
213 : // inform queue that lock already acquired
214 : template <class T>
215 627 : bool empty(std::unique_lock<T> &lock) {
216 627 : return _queue.first == nullptr;
217 : }
218 :
219 : template <typename ... Args>
220 1252761 : void push(PriorityType p, bool insertFirst, Args && ... args) {
221 1252761 : auto node = allocateNode();
222 1252767 : node->priority = p;
223 1252767 : new (node->storage.buffer) Value(std::forward<Args>(args)...);
224 1252768 : pushNode(node, insertFirst);
225 1252768 : }
226 :
227 : // pop node, move value into temporary, then free node, then call callback
228 : // optimized for long callbacks and simple move constructor
229 : bool pop_prefix(std::unique_lock<LockInterface> &lock, const callback<void(PriorityType, Value &&)> &cb) {
230 : if (auto node = popNode(lock)) {
231 : auto p = node->priority;
232 : Value * val = (Value *)(node->storage.buffer);
233 : Value tmp(move(*val));
234 : val->~Value();
235 : freeNode(node);
236 : cb(p, move(tmp));
237 : return true;
238 : }
239 : return false;
240 : }
241 :
242 : bool pop_prefix(const callback<void(PriorityType, Value &&)> &cb) {
243 : if (auto node = popNode()) {
244 : auto p = node->priority;
245 : Value * val = (Value *)(node->storage.buffer);
246 : Value tmp(move(*val));
247 : val->~Value();
248 : freeNode(node);
249 : cb(p, move(tmp));
250 : return true;
251 : }
252 : return false;
253 : }
254 :
255 : // pop node, run callback on value, directly stored in node, then free node
256 : // no additional move, but with extra cost for detached node, that blocked until callback ends
257 : bool pop_direct(std::unique_lock<LockInterface> &lock, const callback<void(PriorityType, Value &&)> &cb) {
258 : if (auto node = popNode(lock)) {
259 : Value * val = (Value *)(node->storage.buffer);
260 : cb(node->priority, move(*val));
261 : val->~Value();
262 : freeNode(node);
263 : return true;
264 : }
265 : return false;
266 : }
267 :
268 215540070 : bool pop_direct(const callback<void(PriorityType, Value &&)> &cb) {
269 215540070 : if (auto node = popNode()) {
270 1229958 : Value * val = (Value *)(node->storage.buffer);
271 1229958 : cb(node->priority, move(*val));
272 1243957 : val->~Value();
273 1243929 : freeNode(node);
274 1244093 : return true;
275 : }
276 214725862 : return false;
277 : }
278 :
279 190 : void foreach(const callback<void(PriorityType, const Value &)> &cb) {
280 190 : std::unique_lock<LockInterface> lock(_queue.lock);
281 :
282 190 : auto node = _queue.first;
283 4750 : while (node) {
284 4560 : cb(node->priority, *(Value *)(node->storage.buffer));
285 4560 : node = node->next;
286 : }
287 190 : }
288 :
289 : protected:
290 2349 : void initNodes(Node *first, Node *last, StorageBlock *block) {
291 : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
292 : std::unique_lock rangesLock(_rangesLock);
293 : _ranges.emplace_back(uintptr_t(first), uintptr_t(last));
294 : rangesLock.unlock();
295 : #endif
296 :
297 : // make linked-list from continuous region of memory
298 131408 : while (first != last) {
299 129059 : first->next = first + 1;
300 129059 : first->block = block;
301 129059 : first = first->next;
302 : }
303 2349 : last->next = nullptr;
304 2349 : last->block = block;
305 2349 : }
306 :
307 : // node lifecycle:
308 : // (on producer thread)
309 : // - allocate (blocking)
310 : // - fill (app-side)
311 : // - push (blocking)
312 : //
313 : // (on consumer thread)
314 : // - pop (blocking)
315 : // - dispose (app-size)
316 : // - free (blocking)
317 :
318 215476723 : Node *popNode() {
319 215476723 : std::unique_lock<LockInterface> lock(_queue.lock);
320 431898024 : return popNode(lock);
321 215909354 : }
322 :
323 215910203 : Node *popNode(std::unique_lock<LockInterface> &lock) {
324 215910203 : Node *ret = nullptr;
325 215910203 : if (_queue.first) {
326 1248197 : ret = _queue.first;
327 1248197 : if (_queue.first == _queue.last) {
328 961761 : _queue.first = _queue.last = nullptr;
329 : } else {
330 286436 : _queue.first = ret->next;
331 : }
332 1248197 : ret->next = nullptr;
333 : }
334 215910203 : return ret;
335 : }
336 :
337 1252768 : void pushNode(Node *node, bool insertFirst) {
338 1252768 : std::unique_lock<LockInterface> lock(_queue.lock);
339 1252768 : node->next = nullptr;
340 1252768 : if (!_queue.first) {
341 961853 : _queue.last = _queue.first = node;
342 : } else {
343 290915 : if (insertFirst) {
344 2130 : if (node->priority <= _queue.first->priority) {
345 176 : node->next = _queue.first;
346 176 : _queue.first = node;
347 1954 : } else if (_queue.last->priority < node->priority) {
348 56 : _queue.last->next = node;
349 56 : _queue.last = node;
350 : } else {
351 1898 : Node *n = _queue.first;
352 37756 : while (n->next && n->next->priority < node->priority) {
353 35858 : n = n->next;
354 : }
355 1898 : node->next = n->next;
356 1898 : n->next = node;
357 : }
358 : } else {
359 288785 : if (node->priority < _queue.first->priority) {
360 284 : node->next = _queue.first;
361 284 : _queue.first = node;
362 288501 : } else if (_queue.last->priority <= node->priority) {
363 275011 : _queue.last->next = node;
364 275011 : _queue.last = node;
365 : } else {
366 13490 : Node *n = _queue.first;
367 433658 : while (n->next && n->next->priority <= node->priority) {
368 420168 : n = n->next;
369 : }
370 13490 : node->next = n->next;
371 13490 : n->next = node;
372 : }
373 : }
374 : }
375 1252768 : }
376 :
377 1252759 : Node *allocateNode() {
378 1252759 : Node *ret = nullptr;
379 1252759 : std::unique_lock<LockInterface> lock(_free.lock);
380 1252767 : if (_free.first) {
381 1250756 : ret = _free.first;
382 1250756 : if (_free.first == _free.last) {
383 2744 : _free.first = _free.last = nullptr;
384 : } else {
385 1248012 : _free.first = ret->next;
386 : }
387 : } else {
388 2011 : auto block = allocateBlock(lock);
389 :
390 : // append others
391 2011 : if (_free.last) {
392 0 : _free.last->next = &block->nodes[1];
393 : } else {
394 2011 : _free.first = &block->nodes[1];
395 : }
396 2011 : _free.last = &block->nodes[block->nodes.size() - 1];
397 :
398 : // return first new node
399 2011 : ret = &block->nodes[0];
400 : }
401 1252766 : ret->next = nullptr;
402 1252766 : if (ret->block) {
403 60643 : ++ ret->block->used;
404 : }
405 1252768 : return ret;
406 1252766 : }
407 :
408 1248087 : void freeNode(Node *node) {
409 1248087 : if (node->block) {
410 : // add to the end of the list
411 56847 : std::unique_lock<LockInterface> lock(_free.lock);
412 56863 : node->next = nullptr;
413 :
414 56863 : -- node->block->used;
415 56863 : if (node->block->used == 0) {
416 1951 : auto blockToRemove = node->block;
417 : // remove all nodes from this block from free list
418 1951 : Node *n = _free.first;
419 1951 : Node *last = nullptr;
420 :
421 : // skip preallocated nodes, that should be first in free list
422 16927 : while (n && !n->block) {
423 14976 : last = n;
424 14976 : n = n->next;
425 : }
426 :
427 : // last points to last preallocated node in list
428 : // n points to first block node in list
429 :
430 : do {
431 : // read nodes from target block
432 127522 : while (n && n->block == blockToRemove) {
433 122913 : n = n->next;
434 : }
435 :
436 : // n points to first foreign block
437 4609 : if (last) {
438 4608 : last->next = n;
439 : } else {
440 1 : _free.first = n;
441 : }
442 :
443 30635 : while (n && n->block != blockToRemove) {
444 26026 : last = n;
445 26026 : n = n->next;
446 : }
447 4609 : if (!n) {
448 1951 : break;
449 : }
450 : } while (1);
451 :
452 1951 : if (last) {
453 1950 : _free.last = last;
454 : } else {
455 1 : _free.last = _free.first = nullptr;
456 : }
457 :
458 1951 : deallocateBlock(lock, blockToRemove);
459 : } else {
460 54912 : if (_free.last) {
461 54814 : _free.last->next = node;
462 54814 : _free.last = node;
463 : } else {
464 98 : _free.last = _free.first = node;
465 : }
466 : }
467 56863 : } else {
468 : // add to the front of list, so, it's more likely that extra nodes will be unused
469 : // and extra block will be deallocated
470 1191240 : std::unique_lock<LockInterface> lock(_free.lock);
471 1191429 : node->next = _free.first;
472 1191429 : _free.first = node;
473 1191429 : if (!_free.last) {
474 636 : _free.last = _free.first;
475 : }
476 1191429 : }
477 : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
478 : isNodeInRange(_free.first);
479 : isNodeInRange(_free.last);
480 : #endif
481 1248273 : }
482 :
483 2011 : StorageBlock *allocateBlock(std::unique_lock<LockInterface> &lock) {
484 2011 : auto block = new StorageBlock();
485 2011 : initNodes(&block->nodes[0], &block->nodes[block->nodes.size() - 1], block);
486 2011 : _capacity += block->nodes.size();
487 2011 : return block;
488 : }
489 :
490 1951 : void deallocateBlock(std::unique_lock<LockInterface> &lock, StorageBlock *block) {
491 : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
492 : std::unique_lock rangesLock(_rangesLock);
493 : _ranges.emplace_back(uintptr_t(&block->nodes.front()), uintptr_t(&block->nodes.back()));
494 : rangesLock.unlock();
495 : #endif
496 :
497 1951 : _capacity -= block->nodes.size();
498 1951 : delete block;
499 1951 : }
500 :
501 : std::array<Node, PreallocatedNodes> _preallocated;
502 :
503 : NodeInterface _queue;
504 : NodeInterface _free;
505 :
506 : size_t _capacity = PreallocatedNodes;
507 :
508 : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
509 : void isNodeInRange(Node *ptr) const {
510 : if (!ptr) {
511 : return;
512 : }
513 : std::unique_lock rangesLock(_rangesLock);
514 : auto i = (uintptr_t)ptr;
515 : for (auto &it : _ranges) {
516 : if (i >= it.first && i <= it.second) {
517 : return;
518 : }
519 : }
520 : abort();
521 : }
522 :
523 : mutable std::mutex _rangesLock;
524 : std::vector<std::pair<uintptr_t, uintptr_t>> _ranges;
525 : #endif
526 : };
527 :
528 : }
529 :
530 : #endif /* STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_ */
|