Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #ifndef EXTRA_WEBSERVER_WEBSERVER_UTILS_SPWEBASYNCTASK_H_
24 : #define EXTRA_WEBSERVER_WEBSERVER_UTILS_SPWEBASYNCTASK_H_
25 :
26 : #include "SPWebHost.h"
27 :
28 : namespace STAPPLER_VERSIONIZED stappler::web {
29 :
30 : class Host;
31 : class AsyncTask;
32 :
33 : class AsyncTaskGroup : public AllocBase {
34 : public:
35 : static AsyncTaskGroup *getCurrent();
36 :
37 : AsyncTaskGroup();
38 : AsyncTaskGroup(const Host &);
39 : AsyncTaskGroup(const Host &, std::function<void()> &&);
40 :
41 : void onAdded(AsyncTask *);
42 : void onPerformed(AsyncTask *);
43 :
44 : void update();
45 : void waitForAll();
46 :
47 : bool perform(const Callback<void(AsyncTask &)> &cb);
48 :
49 : Pair<size_t, size_t> getCounters() const; // <completed, added>
50 :
51 750 : Host getHost() const { return _host; }
52 :
53 : protected:
54 : Time _lastUpdate = Time::now();
55 : std::thread::id _threadId = std::this_thread::get_id();
56 : std::mutex _mutex;
57 : std::mutex _condMutex;
58 : std::condition_variable _condition;
59 :
60 : std::vector<AsyncTask *> _queue;
61 : std::atomic<size_t> _added = 0;
62 : std::atomic<size_t> _completed = 0;
63 : Host _host;
64 :
65 : std::function<void()> _notifyFn = nullptr;
66 : };
67 :
68 : class AsyncTask : public AllocBase {
69 : public:
70 : static constexpr uint8_t PriorityLowest = config::PriorityLowest;
71 : static constexpr uint8_t PriorityLow = config::PriorityLow;
72 : static constexpr uint8_t PriorityNormal = config::PriorityNormal;
73 : static constexpr uint8_t PriorityHigh = config::PriorityHigh;
74 : static constexpr uint8_t PriorityHighest = config::PriorityHighest;
75 :
76 : using ExecuteCallback = Function<bool(const AsyncTask &)>;
77 : using CompleteCallback = Function<void(const AsyncTask &, bool)>;
78 :
79 : // usage:
80 : // auto newTask = Task::prepare([&] (Task &task) {
81 : // // do configuration in Task's memory pool context
82 : // });
83 : //
84 : static AsyncTask *prepare(pool_t *rootPool, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup * = nullptr);
85 : static AsyncTask *prepare(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup * = nullptr);
86 :
87 : static bool perform(const Host &, const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup * = nullptr);
88 : static bool perform(const Callback<void(AsyncTask &)> &cb, AsyncTaskGroup * = nullptr);
89 :
90 : static void destroy(AsyncTask *);
91 :
92 : static void run(AsyncTask *);
93 :
94 : static AsyncTask *getCurrent();
95 :
96 : void addExecuteFn(const ExecuteCallback &);
97 : void addExecuteFn(ExecuteCallback &&);
98 :
99 : void addCompleteFn(const CompleteCallback &);
100 : void addCompleteFn(CompleteCallback &&);
101 :
102 : /* set default task priority */
103 : void setPriority(uint8_t priority) { _priority = priority; }
104 :
105 : /* get task priority */
106 100 : uint8_t getPriority() const { return _priority; }
107 :
108 : /* used by task manager to set success state */
109 100 : void setSuccessful(bool value) { _isSuccessful = value; }
110 :
111 : /* if task execution was successful */
112 : bool isSuccessful() const { return _isSuccessful; }
113 :
114 100 : void setHost(const Host &serv) { _host = serv; }
115 300 : const Host &getHost() const { return _host; }
116 :
117 : void setScheduled(Time t) { _scheduled = t; }
118 : Time getScheduled() const { return _scheduled; }
119 :
120 400 : AsyncTaskGroup *getGroup() const { return _group; }
121 :
122 : void performWithStorage(const Callback<void(const db::Transaction &)> &) const;
123 :
124 : bool execute();
125 : void onComplete();
126 :
127 325 : pool_t *pool() const { return _pool; }
128 :
129 : protected:
130 : AsyncTask(pool_t *, AsyncTaskGroup *);
131 :
132 : pool_t *_pool = nullptr;
133 : uint8_t _priority = PriorityNormal;
134 : Time _scheduled;
135 : bool _isSuccessful = false;
136 :
137 : Host _host;
138 : Vector<ExecuteCallback> _execute;
139 : Vector<CompleteCallback> _complete;
140 :
141 : AsyncTaskGroup *_group = nullptr;
142 : };
143 :
144 : enum class SharedMode {
145 : Pool,
146 : Allocator,
147 : };
148 :
149 : template <typename T>
150 : class Shared : public RefBase<memory::PoolInterface> {
151 : public:
152 : template <typename ...Args>
153 : static Shared *create(Args && ...);
154 :
155 : template <typename ...Args>
156 : static Shared *create(pool_t *, Args && ...);
157 :
158 : template <typename ...Args>
159 : static Shared *create(SharedMode, Args && ...);
160 :
161 : virtual ~Shared();
162 :
163 : template <typename Callback>
164 25 : void perform(Callback &&cb) {
165 25 : web::perform([&, this] {
166 25 : cb(_shared);
167 : }, _pool);
168 25 : }
169 :
170 75 : inline T *get() const { return _shared; }
171 :
172 : inline operator T * () const { return get(); }
173 : inline T * operator->() const { return get(); }
174 :
175 : inline explicit operator bool () const { return _shared != nullptr; }
176 :
177 : pool_t *getPool() const { return _pool; }
178 : allocator_t *getAllocator() const { return _allocator; }
179 :
180 : protected:
181 : Shared(SharedMode m, allocator_t *, pool_t *, T *);
182 :
183 : allocator_t *_allocator = nullptr;
184 : pool_t *_pool = nullptr;
185 : T *_shared = nullptr;
186 : SharedMode _mode = SharedMode::Pool;
187 : };
188 :
189 :
190 : template <typename _Base>
191 : class SharedRc {
192 : public:
193 : using Base = typename std::remove_cv<_Base>::type;
194 : using Type = Shared<Base>;
195 : using Self = SharedRc<Base>;
196 : using Pointer = Type *;
197 :
198 : template <class... Args>
199 : static inline Self create(Args && ... args) {
200 : auto pRet = Type::create();
201 : Self ret(nullptr);
202 : pRet->perform([&] (Base *base) {
203 : if (base->init(std::forward<Args>(args)...)) {
204 : ret = Self(pRet, true); // unsafe assignment
205 : }
206 : });
207 : if (!ret) {
208 : delete pRet;
209 : }
210 : return ret;
211 : }
212 :
213 : template <class... Args>
214 : static inline Self create(pool_t *pool, Args && ... args) {
215 : auto pRet = Type::create(pool);
216 : Self ret(nullptr);
217 : pRet->perform([&] (Base *base) {
218 : if (base->init(std::forward<Args>(args)...)) {
219 : ret = Self(pRet, true); // unsafe assignment
220 : }
221 : });
222 : if (!ret) {
223 : delete pRet;
224 : }
225 : return ret;
226 : }
227 :
228 : template <class... Args>
229 25 : static inline Self create(SharedMode mode, Args && ... args) {
230 25 : auto pRet = Type::create(mode);
231 25 : Self ret(nullptr);
232 50 : pRet->perform([&] (Base *base) {
233 25 : if (base->init(std::forward<Args>(args)...)) {
234 25 : ret = Self(pRet, true); // unsafe assignment
235 : }
236 : });
237 25 : if (!ret) {
238 0 : delete pRet;
239 : }
240 50 : return ret;
241 0 : }
242 :
243 : static inline Self alloc() {
244 : return Self(Type::create(), true);
245 : }
246 :
247 : template <class... Args>
248 : static inline Self alloc(Args && ... args) {
249 : return Self(Type::create(std::forward<Args>(args)...), true);
250 : }
251 :
252 : template <class... Args>
253 : static inline Self alloc(pool_t *pool, Args && ... args) {
254 : return Self(Type::create(pool, std::forward<Args>(args)...), true);
255 : }
256 :
257 : template <class... Args>
258 : static inline Self alloc(SharedMode mode, Args && ... args) {
259 : return Self(Type::create(mode, std::forward<Args>(args)...), true);
260 : }
261 :
262 : inline SharedRc() : _ptr(nullptr) { }
263 25 : inline SharedRc(const nullptr_t &) : _ptr(nullptr) { }
264 : inline SharedRc(const Pointer &value) : _ptr(value) { doRetain(); }
265 : inline SharedRc(const Self &v) { _ptr = v._ptr; doRetain(); }
266 : inline SharedRc(Self &&v) {
267 : _ptr = v._ptr; v._ptr = nullptr;
268 : }
269 :
270 25 : inline SharedRc & operator = (const nullptr_t &) {
271 25 : doRelease();
272 25 : _ptr = nullptr;
273 25 : return *this;
274 : }
275 :
276 : inline SharedRc & operator = (const Pointer &value) { set(value); return *this; }
277 : inline SharedRc & operator = (const Self &v) { set(v._ptr); return *this; }
278 25 : inline SharedRc & operator = (Self &&v) {
279 25 : doRelease();
280 25 : _ptr = v._ptr; v._ptr = nullptr;
281 25 : return *this;
282 : }
283 :
284 50 : inline ~SharedRc() { doRelease(); _ptr = nullptr; }
285 :
286 : inline void set(const Pointer &value) {
287 : _ptr = doSwap(value);
288 : }
289 :
290 25 : inline Base *get() const {
291 25 : return _ptr ? _ptr->get() : nullptr;
292 : }
293 :
294 25 : inline operator Base * () const { return get(); }
295 25 : inline explicit operator bool () const { return _ptr && _ptr->get() != nullptr; }
296 :
297 : inline void swap(Self & v) { auto ptr = _ptr; _ptr = v._ptr; v._ptr = ptr; }
298 :
299 25 : inline Base * operator->() const { return _ptr ? _ptr->get() : nullptr; }
300 :
301 : template <typename Target>
302 : inline RcBase<Target> cast() const {
303 : if (auto v = dynamic_cast<Target *>(_ptr)) {
304 : return RcBase<Target>(v);
305 : }
306 : return RcBase<Target>(nullptr);
307 : }
308 :
309 : inline bool operator == (const Self & other) const { return _ptr == other._ptr; }
310 : inline bool operator == (const Base * & other) const { return _ptr->get() == other; }
311 : inline bool operator == (const std::nullptr_t other) const { return _ptr == other; }
312 :
313 : inline bool operator != (const Self & other) const { return _ptr != other._ptr; }
314 : inline bool operator != (const Base * & other) const { return _ptr->get() != other; }
315 : inline bool operator != (const std::nullptr_t other) const { return _ptr != other; }
316 :
317 : inline bool operator > (const Self & other) const { return _ptr > other._ptr; }
318 : inline bool operator > (const Base * other) const { return _ptr->get() > other; }
319 : inline bool operator > (const std::nullptr_t other) const { return _ptr > other; }
320 :
321 : inline bool operator < (const Self & other) const { return _ptr < other._ptr; }
322 : inline bool operator < (const Base * other) const { return _ptr->get() < other; }
323 : inline bool operator < (const std::nullptr_t other) const { return _ptr < other; }
324 :
325 : inline bool operator >= (const Self & other) const { return _ptr >= other._ptr; }
326 : inline bool operator >= (const Base * other) const { return _ptr->get() >= other; }
327 : inline bool operator >= (const std::nullptr_t other) const { return _ptr >= other; }
328 :
329 : inline bool operator <= (const Self & other) const { return _ptr <= other._ptr; }
330 : inline bool operator <= (const Base * other) const { return _ptr->get() <= other; }
331 : inline bool operator <= (const std::nullptr_t other) const { return _ptr <= other; }
332 :
333 : #if SP_REF_DEBUG
334 : uint64_t getId() const { return _id; }
335 : #endif
336 : private:
337 : inline void doRetain() {
338 : #if SP_REF_DEBUG
339 : if (_ptr) { _id = _ptr->retain(); }
340 : #else
341 : if (_ptr) { _ptr->retain(); }
342 : #endif
343 : }
344 :
345 100 : inline void doRelease() {
346 : #if SP_REF_DEBUG
347 : if (_ptr) { _ptr->release(_id); }
348 : #else
349 100 : if (_ptr) { _ptr->release(0); }
350 : #endif
351 100 : }
352 :
353 : inline Pointer doSwap(Pointer value) {
354 : #if SP_REF_DEBUG
355 : uint64_t id = 0;
356 : if (value) { id = value->retain(); }
357 : if (_ptr) { _ptr->release(_id); }
358 : _id = id;
359 : return value;
360 : #else
361 : if (value) { value->retain(); }
362 : if (_ptr) { _ptr->release(0); }
363 : return value;
364 : #endif
365 : }
366 :
367 : // unsafe
368 25 : inline SharedRc(Pointer value, bool v) : _ptr(value) { }
369 :
370 : Pointer _ptr = nullptr;
371 : #if SP_REF_DEBUG
372 : uint64_t _id = 0;
373 : #endif
374 : };
375 :
376 : template <typename T>
377 : template <typename ...Args>
378 : auto Shared<T>::create(Args && ... args) -> Shared * {
379 : auto pool = pool::create((pool_t *)nullptr);
380 :
381 : Shared *shared = nullptr;
382 : web::perform([&] {
383 : shared = new (pool) Shared(SharedMode::Pool, nullptr, pool,
384 : new (pool) T(pool, std::forward<Args>(args)...));
385 : }, pool);
386 : return shared;
387 : }
388 :
389 : template <typename T>
390 : template <typename ...Args>
391 : auto Shared<T>::create(pool_t *p, Args && ... args) -> Shared * {
392 : auto pool = pool::create(p);
393 :
394 : Shared *shared = nullptr;
395 : web::perform([&] {
396 : shared = new (pool) Shared(SharedMode::Pool, nullptr, pool,
397 : new (pool) T(pool, std::forward<Args>(args)...));
398 : }, pool);
399 : return shared;
400 : }
401 :
402 : template <typename T>
403 : template <typename ...Args>
404 25 : auto Shared<T>::create(SharedMode mode, Args && ... args) -> Shared * {
405 25 : allocator_t *alloc = nullptr;
406 25 : pool_t *pool = nullptr;
407 :
408 25 : switch (mode) {
409 0 : case SharedMode::Pool:
410 0 : pool = pool::create((pool_t *)nullptr);
411 0 : break;
412 25 : case SharedMode::Allocator:
413 25 : alloc = allocator::create();
414 25 : pool = pool::create(alloc);
415 25 : break;
416 : }
417 :
418 25 : Shared *shared = nullptr;
419 25 : web::perform([&] {
420 25 : shared = new (pool) Shared(mode, alloc, pool,
421 25 : new (pool) T(pool, std::forward<Args>(args)...));
422 : }, pool);
423 25 : return shared;
424 : }
425 :
426 : template <typename T>
427 50 : Shared<T>::~Shared() {
428 25 : if (_shared) {
429 75 : web::perform([&, this] {
430 25 : delete _shared;
431 : }, _pool);
432 25 : _shared = nullptr;
433 : }
434 :
435 25 : auto pool = _pool;
436 25 : auto allocator = _allocator;
437 :
438 25 : _pool = nullptr;
439 25 : _allocator = nullptr;
440 :
441 25 : if (pool) {
442 25 : pool::destroy(pool);
443 : }
444 25 : if (allocator) {
445 25 : allocator::destroy(allocator);
446 : }
447 75 : }
448 :
449 : template <typename T>
450 25 : Shared<T>::Shared(SharedMode m, allocator_t *alloc, pool_t *pool, T *obj)
451 25 : : _allocator(alloc), _pool(pool), _shared(obj), _mode(m) { }
452 :
453 : }
454 :
455 : #endif /* EXTRA_WEBSERVER_WEBSERVER_UTILS_SPWEBASYNCTASK_H_ */
|