Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #include "SPWebAsyncTask.h"
24 :
25 : namespace STAPPLER_VERSIONIZED stappler::web {
26 :
27 : thread_local AsyncTaskGroup *tl_currentGroup = nullptr;
28 : thread_local AsyncTask *tl_currentTask = nullptr;
29 :
30 0 : AsyncTaskGroup *AsyncTaskGroup::getCurrent() {
31 0 : return tl_currentGroup;
32 : }
33 :
34 0 : AsyncTaskGroup::AsyncTaskGroup() : _host(Host::getCurrent()) { }
35 :
36 0 : AsyncTaskGroup::AsyncTaskGroup(const Host &serv) : _host(serv) { }
37 :
38 25 : AsyncTaskGroup::AsyncTaskGroup(const Host &serv, std::function<void()> &&fn)
39 25 : : _host(serv), _notifyFn(move(fn)) { }
40 :
41 0 : void AsyncTaskGroup::onAdded(AsyncTask *task) {
42 0 : ++ _added;
43 :
44 0 : if (std::this_thread::get_id() == _threadId) {
45 0 : if (Time::now() - _lastUpdate > TimeInterval::microseconds(1000 * 50)) {
46 0 : update();
47 : }
48 : }
49 0 : }
50 :
51 0 : void AsyncTaskGroup::onPerformed(AsyncTask *task) {
52 0 : _mutex.lock();
53 0 : _queue.push_back(task);
54 0 : _mutex.unlock();
55 :
56 0 : std::function<void()> tmp;
57 0 : if (_notifyFn) {
58 0 : tmp = _notifyFn;
59 : }
60 :
61 0 : _condition.notify_one();
62 :
63 0 : if (tmp) {
64 0 : tmp();
65 : }
66 0 : }
67 :
68 25 : void AsyncTaskGroup::update() {
69 25 : _mutex.lock();
70 :
71 25 : std::vector<AsyncTask *> stack;
72 25 : stack.swap(_queue);
73 :
74 25 : _mutex.unlock();
75 :
76 25 : if (stack.empty()) {
77 25 : return;
78 : }
79 :
80 0 : for (auto task : stack) {
81 0 : task->onComplete();
82 0 : ++ _completed;
83 :
84 0 : AsyncTask::destroy(task);
85 : }
86 :
87 0 : _lastUpdate = Time::now();
88 25 : }
89 :
90 25 : void AsyncTaskGroup::waitForAll() {
91 25 : update();
92 25 : while (_added != _completed) {
93 0 : std::unique_lock<std::mutex> lock(_condMutex);
94 0 : _condition.wait_for(lock, std::chrono::microseconds(1000 * 50));
95 0 : update();
96 0 : }
97 25 : }
98 :
99 0 : bool AsyncTaskGroup::perform(const Callback<void(AsyncTask &)> &cb) {
100 0 : return AsyncTask::perform(_host, cb, this);
101 : }
102 :
103 0 : Pair<size_t, size_t> AsyncTaskGroup::getCounters() const {
104 0 : return stappler::pair(_completed.load(), _added.load());
105 : }
106 :
107 100 : AsyncTask *AsyncTask::prepare(pool_t *rootPool, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
108 100 : if (rootPool) {
109 100 : if (auto p = pool::create(rootPool)) {
110 100 : AsyncTask * task = nullptr;
111 100 : web::perform([&] {
112 100 : task = new (p) AsyncTask(p, g);
113 100 : cb(*task);
114 100 : }, p);
115 100 : return task;
116 : }
117 : }
118 0 : return nullptr;
119 : }
120 :
121 0 : AsyncTask *AsyncTask::prepare(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
122 0 : if (auto serv = Host::getCurrent()) {
123 0 : return prepare(serv.getThreadPool(), cb, g);
124 : }
125 0 : return nullptr;
126 : }
127 :
128 100 : bool AsyncTask::perform(const Host &serv, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
129 100 : if (serv) {
130 100 : if (auto t = prepare(serv.getThreadPool(), cb, g)) {
131 100 : return serv.performTask(t);
132 : }
133 : }
134 0 : return false;
135 : }
136 :
137 0 : bool AsyncTask::perform(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup *g) {
138 0 : if (auto serv = Host::getCurrent()) {
139 0 : return perform(Host(serv), cb, g);
140 : }
141 0 : return false;
142 : }
143 :
144 100 : void AsyncTask::destroy(AsyncTask *t) {
145 100 : auto p = t->pool();
146 100 : delete t;
147 100 : pool::destroy(p);
148 100 : }
149 :
150 100 : void AsyncTask::run(AsyncTask *t) {
151 100 : tl_currentTask = t;
152 100 : if (auto g = t->getGroup()) {
153 0 : tl_currentGroup = g;
154 : }
155 100 : web::perform([&] {
156 100 : t->setSuccessful(t->execute());
157 100 : if (!t->getGroup()) {
158 100 : t->onComplete();
159 : }
160 100 : }, t->pool(), config::TAG_HOST, t->getHost().getController());
161 100 : tl_currentTask = nullptr;
162 100 : }
163 :
164 0 : AsyncTask *AsyncTask::getCurrent() {
165 0 : return tl_currentTask;
166 : }
167 :
168 0 : void AsyncTask::addExecuteFn(const ExecuteCallback &cb) {
169 0 : web::perform([&, this] {
170 0 : _execute.push_back(cb);
171 0 : }, _pool);
172 0 : }
173 :
174 100 : void AsyncTask::addExecuteFn(ExecuteCallback &&cb) {
175 100 : web::perform([&, this] {
176 100 : _execute.push_back(std::move(cb));
177 100 : }, _pool);
178 100 : }
179 :
180 0 : void AsyncTask::addCompleteFn(const CompleteCallback &cb) {
181 0 : web::perform([&, this] {
182 0 : _complete.push_back(cb);
183 0 : }, _pool);
184 0 : }
185 0 : void AsyncTask::addCompleteFn(CompleteCallback &&cb) {
186 0 : web::perform([&, this] {
187 0 : _complete.push_back(std::move(cb));
188 0 : }, _pool);
189 0 : }
190 :
191 75 : void AsyncTask::performWithStorage(const Callback<void(const db::Transaction &)> &cb) const {
192 75 : _host.performWithStorage(cb);
193 75 : }
194 :
195 100 : bool AsyncTask::execute() {
196 100 : bool success = true;
197 200 : for (auto &it : _execute) {
198 100 : if (!it(*this)) {
199 0 : success = false;
200 : }
201 : }
202 100 : return success;
203 : }
204 100 : void AsyncTask::onComplete() {
205 100 : for (auto &it : _complete) {
206 0 : it(*this, _isSuccessful);
207 : }
208 100 : }
209 :
210 100 : AsyncTask::AsyncTask(pool_t *p, AsyncTaskGroup *g) : _pool(p), _group(g) {
211 100 : if (!p) {
212 0 : abort();
213 : }
214 100 : }
215 :
216 : }
|