00001 /* 00002 ----------------------------------------------------------------------------- 00003 This source file is part of OGRE 00004 (Object-oriented Graphics Rendering Engine) 00005 For the latest info, see http://www.ogre3d.org/ 00006 00007 Copyright (c) 2000-2013 Torus Knot Software Ltd 00008 00009 Permission is hereby granted, free of charge, to any person obtaining a copy 00010 of this software and associated documentation files (the "Software"), to deal 00011 in the Software without restriction, including without limitation the rights 00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00013 copies of the Software, and to permit persons to whom the Software is 00014 furnished to do so, subject to the following conditions: 00015 00016 The above copyright notice and this permission notice shall be included in 00017 all copies or substantial portions of the Software. 00018 00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00025 THE SOFTWARE. 00026 ----------------------------------------------------------------------------- 00027 */ 00028 #ifndef __OgreWorkQueue_H__ 00029 #define __OgreWorkQueue_H__ 00030 00031 #include "OgrePrerequisites.h" 00032 #include "OgreAny.h" 00033 #include "OgreSharedPtr.h" 00034 #include "Threading/OgreThreadHeaders.h" 00035 #include "OgreHeaderPrefix.h" 00036 00037 namespace Ogre 00038 { 00070 class _OgreExport WorkQueue : public UtilityAlloc 00071 { 00072 protected: 00073 typedef map<String, uint16>::type ChannelMap; 00074 ChannelMap mChannelMap; 00075 uint16 mNextChannel; 00076 OGRE_MUTEX(mChannelMapMutex); 00077 public: 00079 typedef unsigned long long int RequestID; 00080 00083 class _OgreExport Request : public UtilityAlloc 00084 { 00085 friend class WorkQueue; 00086 protected: 00088 uint16 mChannel; 00090 uint16 mType; 00092 Any mData; 00094 uint8 mRetryCount; 00096 RequestID mID; 00098 mutable bool mAborted; 00099 00100 public: 00102 Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid); 00103 ~Request(); 00105 void abortRequest() const { mAborted = true; } 00107 uint16 getChannel() const { return mChannel; } 00109 uint16 getType() const { return mType; } 00111 const Any& getData() const { return mData; } 00113 uint8 getRetryCount() const { return mRetryCount; } 00115 RequestID getID() const { return mID; } 00117 bool getAborted() const { return mAborted; } 00118 }; 00119 00122 struct _OgreExport Response : public UtilityAlloc 00123 { 00125 const Request* mRequest; 00127 bool mSuccess; 00129 String mMessages; 00131 Any mData; 00132 00133 public: 00134 Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK); 00135 ~Response(); 00137 const Request* getRequest() const { return mRequest; } 00139 bool succeeded() const { return mSuccess; } 00141 const String& getMessages() const { return mMessages; } 00143 const Any& getData() const { return mData; } 00145 void abortRequest() { mRequest->abortRequest(); mData.destroy(); } 00146 }; 00147 00161 class _OgreExport RequestHandler 00162 { 00163 public: 00164 RequestHandler() {} 00165 virtual ~RequestHandler() {} 00166 00173 virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) 00174 { (void)srcQ; return !req->getAborted(); } 00175 00186 virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0; 00187 }; 00188 00196 class _OgreExport ResponseHandler 00197 { 00198 public: 00199 ResponseHandler() {} 00200 virtual ~ResponseHandler() {} 00201 00208 virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) 00209 { (void)srcQ; return !res->getRequest()->getAborted(); } 00210 00218 virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0; 00219 }; 00220 00221 WorkQueue() : mNextChannel(0) {} 00222 virtual ~WorkQueue() {} 00223 00228 virtual void startup(bool forceRestart = true) = 0; 00238 virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0; 00240 virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0; 00241 00251 virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0; 00253 virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0; 00254 00272 virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 00273 bool forceSynchronous = false, bool idleThread = false) = 0; 00274 00280 virtual void abortRequest(RequestID id) = 0; 00281 00288 virtual void abortRequestsByChannel(uint16 channel) = 0; 00289 00296 virtual void abortPendingRequestsByChannel(uint16 channel) = 0; 00297 00302 virtual void abortAllRequests() = 0; 00303 00309 virtual void setPaused(bool pause) = 0; 00311 virtual bool isPaused() const = 0; 00312 00317 virtual void setRequestsAccepted(bool accept) = 0; 00319 virtual bool getRequestsAccepted() const = 0; 00320 00329 virtual void processResponses() = 0; 00330 00334 virtual unsigned long getResponseProcessingTimeLimit() const = 0; 00335 00341 virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0; 00342 00345 virtual void shutdown() = 0; 00346 00354 virtual uint16 getChannel(const String& channelName); 00355 00356 }; 00357 00360 class _OgreExport DefaultWorkQueueBase : public WorkQueue 00361 { 00362 public: 00363 00368 DefaultWorkQueueBase(const String& name = StringUtil::BLANK); 00369 virtual ~DefaultWorkQueueBase(); 00371 const String& getName() const; 00375 virtual size_t getWorkerThreadCount() const; 00376 00382 virtual void setWorkerThreadCount(size_t c); 00383 00393 virtual bool getWorkersCanAccessRenderSystem() const; 00394 00395 00407 virtual void setWorkersCanAccessRenderSystem(bool access); 00408 00416 virtual void _processNextRequest(); 00417 00419 virtual void _threadMain() = 0; 00420 00422 virtual bool isShuttingDown() const { return mShuttingDown; } 00423 00425 virtual void addRequestHandler(uint16 channel, RequestHandler* rh); 00427 virtual void removeRequestHandler(uint16 channel, RequestHandler* rh); 00429 virtual void addResponseHandler(uint16 channel, ResponseHandler* rh); 00431 virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh); 00432 00434 virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 00435 bool forceSynchronous = false, bool idleThread = false); 00437 virtual void abortRequest(RequestID id); 00439 virtual void abortRequestsByChannel(uint16 channel); 00441 virtual void abortPendingRequestsByChannel(uint16 channel); 00443 virtual void abortAllRequests(); 00445 virtual void setPaused(bool pause); 00447 virtual bool isPaused() const; 00449 virtual void setRequestsAccepted(bool accept); 00451 virtual bool getRequestsAccepted() const; 00453 virtual void processResponses(); 00455 virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; } 00457 virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; } 00458 protected: 00459 String mName; 00460 size_t mWorkerThreadCount; 00461 bool mWorkerRenderSystemAccess; 00462 bool mIsRunning; 00463 unsigned long mResposeTimeLimitMS; 00464 00465 typedef deque<Request*>::type RequestQueue; 00466 typedef deque<Response*>::type ResponseQueue; 00467 RequestQueue mRequestQueue; // Guarded by mRequestMutex 00468 RequestQueue mProcessQueue; // Guarded by mProcessMutex 00469 ResponseQueue mResponseQueue; // Guarded by mResponseMutex 00470 00472 struct _OgreExport WorkerFunc OGRE_THREAD_WORKER_INHERIT 00473 { 00474 DefaultWorkQueueBase* mQueue; 00475 00476 WorkerFunc(DefaultWorkQueueBase* q) 00477 : mQueue(q) {} 00478 00479 void operator()(); 00480 00481 void operator()() const; 00482 00483 void run(); 00484 }; 00485 WorkerFunc* mWorkerFunc; 00486 00491 class _OgreExport RequestHandlerHolder : public UtilityAlloc 00492 { 00493 protected: 00494 OGRE_RW_MUTEX(mRWMutex); 00495 RequestHandler* mHandler; 00496 public: 00497 RequestHandlerHolder(RequestHandler* handler) 00498 : mHandler(handler) {} 00499 00500 // Disconnect the handler to allow it to be destroyed 00501 void disconnectHandler() 00502 { 00503 // write lock - must wait for all requests to finish 00504 OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex); 00505 mHandler = 0; 00506 } 00507 00511 RequestHandler* getHandler() { return mHandler; } 00512 00516 Response* handleRequest(const Request* req, const WorkQueue* srcQ) 00517 { 00518 // Read mutex so that multiple requests can be processed by the 00519 // same handler in parallel if required 00520 OGRE_LOCK_RW_MUTEX_READ(mRWMutex); 00521 Response* response = 0; 00522 if (mHandler) 00523 { 00524 if (mHandler->canHandleRequest(req, srcQ)) 00525 { 00526 response = mHandler->handleRequest(req, srcQ); 00527 } 00528 } 00529 return response; 00530 } 00531 00532 }; 00533 // Hold these by shared pointer so they can be copied keeping same instance 00534 typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr; 00535 00536 typedef list<RequestHandlerHolderPtr>::type RequestHandlerList; 00537 typedef list<ResponseHandler*>::type ResponseHandlerList; 00538 typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel; 00539 typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel; 00540 00541 RequestHandlerListByChannel mRequestHandlers; 00542 ResponseHandlerListByChannel mResponseHandlers; 00543 RequestID mRequestCount; // Guarded by mRequestMutex 00544 bool mPaused; 00545 bool mAcceptRequests; 00546 bool mShuttingDown; 00547 00548 //NOTE: If you lock multiple mutexes at the same time, the order is important! 00549 // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex, 00550 // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead! 00551 //RULE: Lock mProcessMutex before other mutex, to prevent livelocks 00552 OGRE_MUTEX(mIdleMutex); 00553 OGRE_MUTEX(mRequestMutex); 00554 OGRE_MUTEX(mProcessMutex); 00555 OGRE_MUTEX(mResponseMutex); 00556 OGRE_RW_MUTEX(mRequestHandlerMutex); 00557 00558 00559 void processRequestResponse(Request* r, bool synchronous); 00560 Response* processRequest(Request* r); 00561 void processResponse(Response* r); 00563 virtual void notifyWorkers() = 0; 00565 void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount); 00566 00567 RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex 00568 bool mIdleThreadRunning; // Guarded by mIdleMutex 00569 Request* mIdleProcessed; // Guarded by mProcessMutex 00570 00571 00572 bool processIdleRequests(); 00573 }; 00574 00575 00576 00577 00578 00582 } 00583 00584 #include "OgreHeaderSuffix.h" 00585 00586 #endif 00587
Copyright © 2012 Torus Knot Software Ltd

This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.
Last modified Mon Jul 27 2020 13:40:48