LCOV - code coverage report
Current view: top level - core/core/memory - SPMemPriorityQueue.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 211 212 99.5 %
Date: 2024-05-12 00:16:13 Functions: 90 95 94.7 %

          Line data    Source code
       1             : /**
       2             :  Copyright (c) 2021-2022 Roman Katuntsev <sbkarr@stappler.org>
       3             :  Copyright (c) 2023 Stappler LLC <admin@stappler.dev>
       4             : 
       5             :  Permission is hereby granted, free of charge, to any person obtaining a copy
       6             :  of this software and associated documentation files (the "Software"), to deal
       7             :  in the Software without restriction, including without limitation the rights
       8             :  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       9             :  copies of the Software, and to permit persons to whom the Software is
      10             :  furnished to do so, subject to the following conditions:
      11             : 
      12             :  The above copyright notice and this permission notice shall be included in
      13             :  all copies or substantial portions of the Software.
      14             : 
      15             :  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      16             :  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      17             :  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      18             :  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      19             :  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      20             :  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      21             :  THE SOFTWARE.
      22             :  **/
      23             : 
      24             : #ifndef STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_
      25             : #define STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_
      26             : 
      27             : #include "SPMemFunction.h"
      28             : 
      29             : #define SP_PRIORITY_QUEUE_RANGE_DEBUG 0
      30             : 
      31             : namespace STAPPLER_VERSIONIZED stappler::memory {
      32             : 
      33             : void PriorityQueue_lock_noOp(void *);
      34             : void PriorityQueue_lock_std_mutex(void *);
      35             : void PriorityQueue_unlock_std_mutex(void *);
      36             : 
      37             : // Real-time task priority queue
      38             : // It's designed for relatively low pending tasks (below PreallocatedNodes),
      39             : // with relatively low tasks with priority different from zero
      40             : template <typename Value>
      41             : class PriorityQueue {
      42             : public:
      43             :         static constexpr size_t PreallocatedNodes = 8;
      44             :         static constexpr size_t StorageNodes = 64;
      45             : 
      46             :         using LockFnPtr = void (*) (void *);
      47             :         using PriorityType = int32_t;
      48             : 
      49             :         struct StorageBlock;
      50             : 
      51             :         struct alignas(Value) AlignedStorage {
      52             :                 uint8_t buffer[sizeof(Value)];
      53             :         };
      54             : 
      55             :         // Nodes will be sequentially placed in continuous region of memory, so, they should have proper alignment
      56             :         struct Node {
      57             :                 AlignedStorage storage;
      58             :                 Node *next;
      59             :                 StorageBlock *block;
      60             :                 PriorityType priority;
      61             :         };
      62             : 
      63             :         struct StorageBlock {
      64             :                 std::array<Node, StorageNodes> nodes;
      65             :                 uint32_t used = 0;
      66             :         };
      67             : 
      68             :         struct LockInterface {
      69             :                 void *lockPtr = nullptr;
      70             :                 LockFnPtr lockFn = &PriorityQueue_lock_noOp;
      71             :                 LockFnPtr unlockFn = &PriorityQueue_lock_noOp;
      72             : 
      73         894 :                 void clear() {
      74         894 :                         lockPtr = nullptr;
      75         894 :                         lockFn = &PriorityQueue_lock_noOp;
      76         894 :                         unlockFn = &PriorityQueue_lock_noOp;
      77         894 :                 }
      78             : 
      79   219174995 :                 void lock() {
      80   219174995 :                         lockFn(lockPtr);
      81   219665890 :                 }
      82             : 
      83   219631210 :                 void unlock() {
      84   219631210 :                         unlockFn(lockPtr);
      85   219708742 :                 }
      86             : 
      87             :                 bool operator==(const LockInterface &other) const {
      88             :                         return lockPtr == other.lockPtr && lockFn == other.lockFn && unlockFn == other.unlockFn;
      89             :                 }
      90             : 
      91         260 :                 bool operator!=(const LockInterface &other) const {
      92         260 :                         return lockPtr != other.lockPtr || lockFn != other.lockFn || unlockFn != other.unlockFn;
      93             :                 }
      94             :         };
      95             : 
      96             :         struct NodeInterface {
      97             :                 Node *first = nullptr;
      98             :                 Node *last = nullptr;
      99             :                 LockInterface lock;
     100             :         };
     101             : 
     102         338 :         PriorityQueue() {
     103         338 :                 initNodes(&_preallocated[0], &_preallocated[_preallocated.size() - 1], nullptr);
     104         338 :                 _free.first = &_preallocated[0];
     105         338 :                 _free.last = &_preallocated[_preallocated.size() - 1];
     106         338 :         }
     107             : 
     108         317 :         ~PriorityQueue() {
     109             :                 // disable locking
     110         317 :                 _queue.lock.clear();
     111         317 :                 _free.lock.clear();
     112             : 
     113         317 :                 auto n = _queue.first;
     114         407 :                 while (n) {
     115          90 :                         Value * val = (Value *)(n->storage.buffer);
     116          90 :                         val->~Value();
     117          90 :                         freeNode(n);
     118          90 :                         n = n->next;
     119             :                 }
     120         317 :         }
     121             : 
     122             :         PriorityQueue(const PriorityQueue &) = delete;
     123             :         PriorityQueue &operator=(const PriorityQueue &) = delete;
     124             : 
     125             :         PriorityQueue(PriorityQueue &&) = delete;
     126             :         PriorityQueue &operator=(PriorityQueue &&) = delete;
     127             : 
     128         210 :         size_t capacity() const {
     129         210 :                 return _capacity;
     130             :         }
     131             : 
     132         240 :         size_t free_capacity() {
     133         240 :                 std::unique_lock<LockInterface> lock(_free.lock);
     134         240 :                 size_t ret = 0;
     135         240 :                 auto node = _free.first;
     136        5280 :                 while (node) {
     137        5040 :                         ++ ret;
     138        5040 :                         node = node->next;
     139             :                 }
     140         240 :                 return ret;
     141         240 :         }
     142             : 
     143             :         void setQueueLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
     144             :                 _queue.lock.lockFn = lockFn;
     145             :                 _queue.lock.unlockFn = unlockFn;
     146             :                 _queue.lock.lockPtr = ptr;
     147             :         }
     148             : 
     149             :         void setFreeLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
     150             :                 _free.lock.lockFn = lockFn;
     151             :                 _free.lock.unlockFn = unlockFn;
     152             :                 _free.lock.lockPtr = ptr;
     153             :         }
     154             : 
     155             :         void setLocking(LockFnPtr lockFn, LockFnPtr unlockFn, void *ptr) {
     156             :                 setQueueLocking(lockFn, unlockFn, ptr);
     157             :                 setFreeLocking(lockFn, unlockFn, ptr);
     158             :         }
     159             : 
     160         188 :         void setQueueLocking(std::mutex &mutex) {
     161         188 :                 _queue.lock.lockFn = PriorityQueue_lock_std_mutex;
     162         188 :                 _queue.lock.unlockFn = PriorityQueue_unlock_std_mutex;
     163         188 :                 _queue.lock.lockPtr = &mutex;
     164         188 :         }
     165             : 
     166         188 :         void setFreeLocking(std::mutex &mutex) {
     167         188 :                 _free.lock.lockFn = PriorityQueue_lock_std_mutex;
     168         188 :                 _free.lock.unlockFn = PriorityQueue_unlock_std_mutex;
     169         188 :                 _free.lock.lockPtr = &mutex;
     170         188 :         }
     171             : 
     172             :         void setLocking(std::mutex &mutex) {
     173             :                 setQueueLocking(mutex);
     174             :                 setFreeLocking(mutex);
     175             :         }
     176             : 
     177         130 :         void clear() {
     178         130 :                 auto tmpFreeLock = _free.lock;
     179         130 :                 auto tmpQueueLock = _queue.lock;
     180             : 
     181         130 :                 _free.lock.clear();
     182         130 :                 _queue.lock.clear();
     183             : 
     184         130 :                 if (tmpFreeLock != tmpQueueLock) {
     185         100 :                         tmpFreeLock.lock();
     186         100 :                         tmpQueueLock.lock();
     187             :                 } else {
     188          30 :                         tmpQueueLock.lock();
     189             :                 }
     190             : 
     191        4210 :                 while (auto node = popNode()) {
     192        4080 :                         Value * val = (Value *)(node->storage.buffer);
     193        4080 :                         val->~Value();
     194        4080 :                         freeNode(node);
     195             :                 }
     196             : 
     197         130 :                 if (tmpFreeLock != tmpQueueLock) {
     198         100 :                         tmpFreeLock.unlock();
     199         100 :                         tmpQueueLock.unlock();
     200             :                 } else {
     201          30 :                         tmpQueueLock.unlock();
     202             :                 }
     203             : 
     204         130 :                 _free.lock = tmpFreeLock;
     205         130 :                 _queue.lock = tmpQueueLock;
     206         130 :         }
     207             : 
     208          21 :         bool empty() {
     209          21 :                 std::unique_lock<LockInterface> lock(_queue.lock);
     210          42 :                 return empty(lock);
     211          21 :         }
     212             : 
     213             :         // inform queue that lock already acquired
     214             :         template <class T>
     215         627 :         bool empty(std::unique_lock<T> &lock) {
     216         627 :                 return _queue.first == nullptr;
     217             :         }
     218             : 
     219             :         template <typename ... Args>
     220     1252761 :         void push(PriorityType p, bool insertFirst, Args && ... args) {
     221     1252761 :                 auto node = allocateNode();
     222     1252767 :                 node->priority = p;
     223     1252767 :                 new (node->storage.buffer) Value(std::forward<Args>(args)...);
     224     1252768 :                 pushNode(node, insertFirst);
     225     1252768 :         }
     226             : 
     227             :         // pop node, move value into temporary, then free node, then call callback
     228             :         // optimized for long callbacks and simple move constructor
     229             :         bool pop_prefix(std::unique_lock<LockInterface> &lock, const callback<void(PriorityType, Value &&)> &cb) {
     230             :                 if (auto node = popNode(lock)) {
     231             :                         auto p = node->priority;
     232             :                         Value * val = (Value *)(node->storage.buffer);
     233             :                         Value tmp(move(*val));
     234             :                         val->~Value();
     235             :                         freeNode(node);
     236             :                         cb(p, move(tmp));
     237             :                         return true;
     238             :                 }
     239             :                 return false;
     240             :         }
     241             : 
     242             :         bool pop_prefix(const callback<void(PriorityType, Value &&)> &cb) {
     243             :                 if (auto node = popNode()) {
     244             :                         auto p = node->priority;
     245             :                         Value * val = (Value *)(node->storage.buffer);
     246             :                         Value tmp(move(*val));
     247             :                         val->~Value();
     248             :                         freeNode(node);
     249             :                         cb(p, move(tmp));
     250             :                         return true;
     251             :                 }
     252             :                 return false;
     253             :         }
     254             : 
     255             :         // pop node, run callback on value, directly stored in node, then free node
     256             :         // no additional move, but with extra cost for detached node, that blocked until callback ends
     257             :         bool pop_direct(std::unique_lock<LockInterface> &lock, const callback<void(PriorityType, Value &&)> &cb) {
     258             :                 if (auto node = popNode(lock)) {
     259             :                         Value * val = (Value *)(node->storage.buffer);
     260             :                         cb(node->priority, move(*val));
     261             :                         val->~Value();
     262             :                         freeNode(node);
     263             :                         return true;
     264             :                 }
     265             :                 return false;
     266             :         }
     267             : 
     268   215540070 :         bool pop_direct(const callback<void(PriorityType, Value &&)> &cb) {
     269   215540070 :                 if (auto node = popNode()) {
     270     1229958 :                         Value * val = (Value *)(node->storage.buffer);
     271     1229958 :                         cb(node->priority, move(*val));
     272     1243957 :                         val->~Value();
     273     1243929 :                         freeNode(node);
     274     1244093 :                         return true;
     275             :                 }
     276   214725862 :                 return false;
     277             :         }
     278             : 
     279         190 :         void foreach(const callback<void(PriorityType, const Value &)> &cb) {
     280         190 :                 std::unique_lock<LockInterface> lock(_queue.lock);
     281             : 
     282         190 :                 auto node = _queue.first;
     283        4750 :                 while (node) {
     284        4560 :                         cb(node->priority, *(Value *)(node->storage.buffer));
     285        4560 :                         node = node->next;
     286             :                 }
     287         190 :         }
     288             : 
     289             : protected:
     290        2349 :         void initNodes(Node *first, Node *last, StorageBlock *block) {
     291             : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
     292             :                 std::unique_lock rangesLock(_rangesLock);
     293             :                 _ranges.emplace_back(uintptr_t(first), uintptr_t(last));
     294             :                 rangesLock.unlock();
     295             : #endif
     296             : 
     297             :                 // make linked-list from continuous region of memory
     298      131408 :                 while (first != last) {
     299      129059 :                         first->next = first + 1;
     300      129059 :                         first->block = block;
     301      129059 :                         first = first->next;
     302             :                 }
     303        2349 :                 last->next = nullptr;
     304        2349 :                 last->block = block;
     305        2349 :         }
     306             : 
     307             :         // node lifecycle:
     308             :         // (on producer thread)
     309             :         // - allocate (blocking)
     310             :         // - fill (app-side)
     311             :         // - push (blocking)
     312             :         //
     313             :         // (on consumer thread)
     314             :         // - pop (blocking)
     315             :         // - dispose (app-size)
     316             :         // - free (blocking)
     317             : 
     318   215476723 :         Node *popNode() {
     319   215476723 :                 std::unique_lock<LockInterface> lock(_queue.lock);
     320   431898024 :                 return popNode(lock);
     321   215909354 :         }
     322             : 
     323   215910203 :         Node *popNode(std::unique_lock<LockInterface> &lock) {
     324   215910203 :                 Node *ret = nullptr;
     325   215910203 :                 if (_queue.first) {
     326     1248197 :                         ret = _queue.first;
     327     1248197 :                         if (_queue.first == _queue.last) {
     328      961761 :                                 _queue.first = _queue.last = nullptr;
     329             :                         } else {
     330      286436 :                                 _queue.first = ret->next;
     331             :                         }
     332     1248197 :                         ret->next = nullptr;
     333             :                 }
     334   215910203 :                 return ret;
     335             :         }
     336             : 
     337     1252768 :         void pushNode(Node *node, bool insertFirst) {
     338     1252768 :                 std::unique_lock<LockInterface> lock(_queue.lock);
     339     1252768 :                 node->next = nullptr;
     340     1252768 :                 if (!_queue.first) {
     341      961853 :                         _queue.last = _queue.first = node;
     342             :                 } else {
     343      290915 :                         if (insertFirst) {
     344        2130 :                                 if (node->priority <= _queue.first->priority) {
     345         176 :                                         node->next = _queue.first;
     346         176 :                                         _queue.first = node;
     347        1954 :                                 } else if (_queue.last->priority < node->priority) {
     348          56 :                                         _queue.last->next = node;
     349          56 :                                         _queue.last = node;
     350             :                                 } else {
     351        1898 :                                         Node *n = _queue.first;
     352       37756 :                                         while (n->next && n->next->priority < node->priority) {
     353       35858 :                                                 n = n->next;
     354             :                                         }
     355        1898 :                                         node->next = n->next;
     356        1898 :                                         n->next = node;
     357             :                                 }
     358             :                         } else {
     359      288785 :                                 if (node->priority < _queue.first->priority) {
     360         284 :                                         node->next = _queue.first;
     361         284 :                                         _queue.first = node;
     362      288501 :                                 } else if (_queue.last->priority <= node->priority) {
     363      275011 :                                         _queue.last->next = node;
     364      275011 :                                         _queue.last = node;
     365             :                                 } else {
     366       13490 :                                         Node *n = _queue.first;
     367      433658 :                                         while (n->next && n->next->priority <= node->priority) {
     368      420168 :                                                 n = n->next;
     369             :                                         }
     370       13490 :                                         node->next = n->next;
     371       13490 :                                         n->next = node;
     372             :                                 }
     373             :                         }
     374             :                 }
     375     1252768 :         }
     376             : 
     377     1252759 :         Node *allocateNode() {
     378     1252759 :                 Node *ret = nullptr;
     379     1252759 :                 std::unique_lock<LockInterface> lock(_free.lock);
     380     1252767 :                 if (_free.first) {
     381     1250756 :                         ret = _free.first;
     382     1250756 :                         if (_free.first == _free.last) {
     383        2744 :                                 _free.first = _free.last = nullptr;
     384             :                         } else {
     385     1248012 :                                 _free.first = ret->next;
     386             :                         }
     387             :                 } else {
     388        2011 :                         auto block = allocateBlock(lock);
     389             : 
     390             :                         // append others
     391        2011 :                         if (_free.last) {
     392           0 :                                 _free.last->next = &block->nodes[1];
     393             :                         } else {
     394        2011 :                                 _free.first = &block->nodes[1];
     395             :                         }
     396        2011 :                         _free.last = &block->nodes[block->nodes.size() - 1];
     397             : 
     398             :                         // return first new node
     399        2011 :                         ret = &block->nodes[0];
     400             :                 }
     401     1252766 :                 ret->next = nullptr;
     402     1252766 :                 if (ret->block) {
     403       60643 :                         ++ ret->block->used;
     404             :                 }
     405     1252768 :                 return ret;
     406     1252766 :         }
     407             : 
     408     1248087 :         void freeNode(Node *node) {
     409     1248087 :                 if (node->block) {
     410             :                         // add to the end of the list
     411       56847 :                         std::unique_lock<LockInterface> lock(_free.lock);
     412       56863 :                         node->next = nullptr;
     413             : 
     414       56863 :                         -- node->block->used;
     415       56863 :                         if (node->block->used == 0) {
     416        1951 :                                 auto blockToRemove = node->block;
     417             :                                 // remove all nodes from this block from free list
     418        1951 :                                 Node *n = _free.first;
     419        1951 :                                 Node *last = nullptr;
     420             : 
     421             :                                 // skip preallocated nodes, that should be first in free list
     422       16927 :                                 while (n && !n->block) {
     423       14976 :                                         last = n;
     424       14976 :                                         n = n->next;
     425             :                                 }
     426             : 
     427             :                                 // last points to last preallocated node in list
     428             :                                 // n points to first block node in list
     429             : 
     430             :                                 do {
     431             :                                         // read nodes from target block
     432      127522 :                                         while (n && n->block == blockToRemove) {
     433      122913 :                                                 n = n->next;
     434             :                                         }
     435             : 
     436             :                                         // n points to first foreign block
     437        4609 :                                         if (last) {
     438        4608 :                                                 last->next = n;
     439             :                                         } else {
     440           1 :                                                 _free.first = n;
     441             :                                         }
     442             : 
     443       30635 :                                         while (n && n->block != blockToRemove) {
     444       26026 :                                                 last = n;
     445       26026 :                                                 n = n->next;
     446             :                                         }
     447        4609 :                                         if (!n) {
     448        1951 :                                                 break;
     449             :                                         }
     450             :                                 } while (1);
     451             : 
     452        1951 :                                 if (last) {
     453        1950 :                                         _free.last = last;
     454             :                                 } else {
     455           1 :                                         _free.last = _free.first = nullptr;
     456             :                                 }
     457             : 
     458        1951 :                                 deallocateBlock(lock, blockToRemove);
     459             :                         } else {
     460       54912 :                                 if (_free.last) {
     461       54814 :                                         _free.last->next = node;
     462       54814 :                                         _free.last = node;
     463             :                                 } else {
     464          98 :                                         _free.last = _free.first = node;
     465             :                                 }
     466             :                         }
     467       56863 :                 } else {
     468             :                         // add to the front of list, so, it's more likely that extra nodes will be unused
     469             :                         // and extra block will be deallocated
     470     1191240 :                         std::unique_lock<LockInterface> lock(_free.lock);
     471     1191429 :                         node->next = _free.first;
     472     1191429 :                         _free.first = node;
     473     1191429 :                         if (!_free.last) {
     474         636 :                                 _free.last = _free.first;
     475             :                         }
     476     1191429 :                 }
     477             : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
     478             :                 isNodeInRange(_free.first);
     479             :                 isNodeInRange(_free.last);
     480             : #endif
     481     1248273 :         }
     482             : 
     483        2011 :         StorageBlock *allocateBlock(std::unique_lock<LockInterface> &lock) {
     484        2011 :                 auto block = new StorageBlock();
     485        2011 :                 initNodes(&block->nodes[0], &block->nodes[block->nodes.size() - 1], block);
     486        2011 :                 _capacity += block->nodes.size();
     487        2011 :                 return block;
     488             :         }
     489             : 
     490        1951 :         void deallocateBlock(std::unique_lock<LockInterface> &lock, StorageBlock *block) {
     491             : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
     492             :                 std::unique_lock rangesLock(_rangesLock);
     493             :                 _ranges.emplace_back(uintptr_t(&block->nodes.front()), uintptr_t(&block->nodes.back()));
     494             :                 rangesLock.unlock();
     495             : #endif
     496             : 
     497        1951 :                 _capacity -= block->nodes.size();
     498        1951 :                 delete block;
     499        1951 :         }
     500             : 
     501             :         std::array<Node, PreallocatedNodes> _preallocated;
     502             : 
     503             :         NodeInterface _queue;
     504             :         NodeInterface _free;
     505             : 
     506             :         size_t _capacity = PreallocatedNodes;
     507             : 
     508             : #if SP_PRIORITY_QUEUE_RANGE_DEBUG
     509             :         void isNodeInRange(Node *ptr) const {
     510             :                 if (!ptr) {
     511             :                         return;
     512             :                 }
     513             :                 std::unique_lock rangesLock(_rangesLock);
     514             :                 auto i = (uintptr_t)ptr;
     515             :                 for (auto &it : _ranges) {
     516             :                         if (i >= it.first && i <= it.second) {
     517             :                                 return;
     518             :                         }
     519             :                 }
     520             :                 abort();
     521             :         }
     522             : 
     523             :         mutable std::mutex _rangesLock;
     524             :         std::vector<std::pair<uintptr_t, uintptr_t>> _ranges;
     525             : #endif
     526             : };
     527             : 
     528             : }
     529             : 
     530             : #endif /* STAPPLER_CORE_MEMORY_SPMEMPRIORITYQUEUE_H_ */

Generated by: LCOV version 1.14