LCOV - code coverage report
Current view: top level - extra/webserver/webserver/utils - SPWebAsyncTask.cc (source / functions) Hit Total Coverage
Test: coverage.info Lines: 61 126 48.4 %
Date: 2024-05-12 00:16:13 Functions: 15 31 48.4 %

          Line data    Source code
       1             : /**
       2             :  Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
       3             : 
       4             :  Permission is hereby granted, free of charge, to any person obtaining a copy
       5             :  of this software and associated documentation files (the "Software"), to deal
       6             :  in the Software without restriction, including without limitation the rights
       7             :  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       8             :  copies of the Software, and to permit persons to whom the Software is
       9             :  furnished to do so, subject to the following conditions:
      10             : 
      11             :  The above copyright notice and this permission notice shall be included in
      12             :  all copies or substantial portions of the Software.
      13             : 
      14             :  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      15             :  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      16             :  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      17             :  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      18             :  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      19             :  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      20             :  THE SOFTWARE.
      21             :  **/
      22             : 
      23             : #include "SPWebAsyncTask.h"
      24             : 
      25             : namespace STAPPLER_VERSIONIZED stappler::web {
      26             : 
      27             : thread_local AsyncTaskGroup *tl_currentGroup = nullptr;
      28             : thread_local AsyncTask *tl_currentTask = nullptr;
      29             : 
      30           0 : AsyncTaskGroup *AsyncTaskGroup::getCurrent() {
      31           0 :         return tl_currentGroup;
      32             : }
      33             : 
      34           0 : AsyncTaskGroup::AsyncTaskGroup() : _host(Host::getCurrent()) { }
      35             : 
      36           0 : AsyncTaskGroup::AsyncTaskGroup(const Host &serv) : _host(serv) { }
      37             : 
      38          25 : AsyncTaskGroup::AsyncTaskGroup(const Host &serv, std::function<void()> &&fn)
      39          25 : : _host(serv), _notifyFn(move(fn)) { }
      40             : 
      41           0 : void AsyncTaskGroup::onAdded(AsyncTask *task) {
      42           0 :         ++ _added;
      43             : 
      44           0 :         if (std::this_thread::get_id() == _threadId) {
      45           0 :                 if (Time::now() - _lastUpdate > TimeInterval::microseconds(1000 * 50)) {
      46           0 :                         update();
      47             :                 }
      48             :         }
      49           0 : }
      50             : 
      51           0 : void AsyncTaskGroup::onPerformed(AsyncTask *task) {
      52           0 :         _mutex.lock();
      53           0 :         _queue.push_back(task);
      54           0 :         _mutex.unlock();
      55             : 
      56           0 :         std::function<void()> tmp;
      57           0 :         if (_notifyFn) {
      58           0 :                 tmp = _notifyFn;
      59             :         }
      60             : 
      61           0 :         _condition.notify_one();
      62             : 
      63           0 :         if (tmp) {
      64           0 :                 tmp();
      65             :         }
      66           0 : }
      67             : 
      68          25 : void AsyncTaskGroup::update() {
      69          25 :         _mutex.lock();
      70             : 
      71          25 :         std::vector<AsyncTask *> stack;
      72          25 :         stack.swap(_queue);
      73             : 
      74          25 :         _mutex.unlock();
      75             : 
      76          25 :         if (stack.empty()) {
      77          25 :                 return;
      78             :         }
      79             : 
      80           0 :         for (auto task : stack) {
      81           0 :                 task->onComplete();
      82           0 :                 ++ _completed;
      83             : 
      84           0 :                 AsyncTask::destroy(task);
      85             :         }
      86             : 
      87           0 :         _lastUpdate = Time::now();
      88          25 : }
      89             : 
      90          25 : void AsyncTaskGroup::waitForAll() {
      91          25 :         update();
      92          25 :         while (_added != _completed) {
      93           0 :                 std::unique_lock<std::mutex> lock(_condMutex);
      94           0 :                 _condition.wait_for(lock, std::chrono::microseconds(1000 * 50));
      95           0 :                 update();
      96           0 :         }
      97          25 : }
      98             : 
      99           0 : bool AsyncTaskGroup::perform(const Callback<void(AsyncTask &)> &cb) {
     100           0 :         return AsyncTask::perform(_host, cb, this);
     101             : }
     102             : 
     103           0 : Pair<size_t, size_t> AsyncTaskGroup::getCounters() const {
     104           0 :         return stappler::pair(_completed.load(), _added.load());
     105             : }
     106             : 
     107         100 : AsyncTask *AsyncTask::prepare(pool_t *rootPool, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
     108         100 :         if (rootPool) {
     109         100 :                 if (auto p = pool::create(rootPool)) {
     110         100 :                         AsyncTask * task = nullptr;
     111         100 :                         web::perform([&] {
     112         100 :                                 task = new (p) AsyncTask(p, g);
     113         100 :                                 cb(*task);
     114         100 :                         }, p);
     115         100 :                         return task;
     116             :                 }
     117             :         }
     118           0 :         return nullptr;
     119             : }
     120             : 
     121           0 : AsyncTask *AsyncTask::prepare(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
     122           0 :         if (auto serv = Host::getCurrent()) {
     123           0 :                 return prepare(serv.getThreadPool(), cb, g);
     124             :         }
     125           0 :         return nullptr;
     126             : }
     127             : 
     128         100 : bool AsyncTask::perform(const Host &serv, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
     129         100 :         if (serv) {
     130         100 :                 if (auto t = prepare(serv.getThreadPool(), cb, g)) {
     131         100 :                         return serv.performTask(t);
     132             :                 }
     133             :         }
     134           0 :         return false;
     135             : }
     136             : 
     137           0 : bool AsyncTask::perform(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
     138           0 :         if (auto serv = Host::getCurrent()) {
     139           0 :                 return perform(Host(serv), cb, g);
     140             :         }
     141           0 :         return false;
     142             : }
     143             : 
     144         100 : void AsyncTask::destroy(AsyncTask *t) {
     145         100 :         auto p = t->pool();
     146         100 :         delete t;
     147         100 :         pool::destroy(p);
     148         100 : }
     149             : 
     150         100 : void AsyncTask::run(AsyncTask *t) {
     151         100 :         tl_currentTask = t;
     152         100 :         if (auto g = t->getGroup()) {
     153           0 :                 tl_currentGroup = g;
     154             :         }
     155         100 :         web::perform([&] {
     156         100 :                 t->setSuccessful(t->execute());
     157         100 :                 if (!t->getGroup()) {
     158         100 :                         t->onComplete();
     159             :                 }
     160         100 :         }, t->pool(), config::TAG_HOST, t->getHost().getController());
     161         100 :         tl_currentTask = nullptr;
     162         100 : }
     163             : 
     164           0 : AsyncTask *AsyncTask::getCurrent() {
     165           0 :         return tl_currentTask;
     166             : }
     167             : 
     168           0 : void AsyncTask::addExecuteFn(const ExecuteCallback &cb) {
     169           0 :         web::perform([&, this] {
     170           0 :                 _execute.push_back(cb);
     171           0 :         }, _pool);
     172           0 : }
     173             : 
     174         100 : void AsyncTask::addExecuteFn(ExecuteCallback &&cb) {
     175         100 :         web::perform([&, this] {
     176         100 :                 _execute.push_back(std::move(cb));
     177         100 :         }, _pool);
     178         100 : }
     179             : 
     180           0 : void AsyncTask::addCompleteFn(const CompleteCallback &cb) {
     181           0 :         web::perform([&, this] {
     182           0 :                 _complete.push_back(cb);
     183           0 :         }, _pool);
     184           0 : }
     185           0 : void AsyncTask::addCompleteFn(CompleteCallback &&cb) {
     186           0 :         web::perform([&, this] {
     187           0 :                 _complete.push_back(std::move(cb));
     188           0 :         }, _pool);
     189           0 : }
     190             : 
     191          75 : void AsyncTask::performWithStorage(const Callback<void(const db::Transaction &)> &cb) const {
     192          75 :         _host.performWithStorage(cb);
     193          75 : }
     194             : 
     195         100 : bool AsyncTask::execute() {
     196         100 :         bool success = true;
     197         200 :         for (auto &it : _execute) {
     198         100 :                 if (!it(*this)) {
     199           0 :                         success = false;
     200             :                 }
     201             :         }
     202         100 :         return success;
     203             : }
     204         100 : void AsyncTask::onComplete() {
     205         100 :         for (auto &it : _complete) {
     206           0 :                 it(*this, _isSuccessful);
     207             :         }
     208         100 : }
     209             : 
     210         100 : AsyncTask::AsyncTask(pool_t *p, AsyncTaskGroup *g) : _pool(p), _group(g) {
     211         100 :         if (!p) {
     212           0 :                 abort();
     213             :         }
     214         100 : }
     215             : 
     216             : }

Generated by: LCOV version 1.14