OgreWorkQueue.h
Go to the documentation of this file.
00001 /*
00002 -----------------------------------------------------------------------------
00003 This source file is part of OGRE
00004 (Object-oriented Graphics Rendering Engine)
00005 For the latest info, see http://www.ogre3d.org/
00006 
00007 Copyright (c) 2000-2013 Torus Knot Software Ltd
00008 
00009 Permission is hereby granted, free of charge, to any person obtaining a copy
00010 of this software and associated documentation files (the "Software"), to deal
00011 in the Software without restriction, including without limitation the rights
00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00013 copies of the Software, and to permit persons to whom the Software is
00014 furnished to do so, subject to the following conditions:
00015 
00016 The above copyright notice and this permission notice shall be included in
00017 all copies or substantial portions of the Software.
00018 
00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00025 THE SOFTWARE.
00026 -----------------------------------------------------------------------------
00027 */
00028 #ifndef __OgreWorkQueue_H__
00029 #define __OgreWorkQueue_H__
00030 
00031 #include "OgrePrerequisites.h"
00032 #include "OgreAny.h"
00033 #include "OgreSharedPtr.h"
00034 #include "Threading/OgreThreadHeaders.h"
00035 #include "OgreHeaderPrefix.h"
00036 
00037 namespace Ogre
00038 {
00070     class _OgreExport WorkQueue : public UtilityAlloc
00071     {
00072     protected:
00073         typedef map<String, uint16>::type ChannelMap;
00074         ChannelMap mChannelMap;
00075         uint16 mNextChannel;
00076         OGRE_MUTEX(mChannelMapMutex);
00077     public:
00079         typedef unsigned long long int RequestID;
00080 
00083         class _OgreExport Request : public UtilityAlloc
00084         {
00085             friend class WorkQueue;
00086         protected:
00088             uint16 mChannel;
00090             uint16 mType;
00092             Any mData;
00094             uint8 mRetryCount;
00096             RequestID mID;
00098             mutable bool mAborted;
00099 
00100         public:
00102             Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
00103             ~Request();
00105             void abortRequest() const { mAborted = true; }
00107             uint16 getChannel() const { return mChannel; }
00109             uint16 getType() const { return mType; }
00111             const Any& getData() const { return mData; }
00113             uint8 getRetryCount() const { return mRetryCount; }
00115             RequestID getID() const { return mID; }
00117             bool getAborted() const { return mAborted; }
00118         };
00119 
00122         struct _OgreExport Response : public UtilityAlloc
00123         {
00125             const Request* mRequest;
00127             bool mSuccess;
00129             String mMessages;
00131             Any mData;
00132 
00133         public:
00134             Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK);
00135             ~Response();
00137             const Request* getRequest() const { return mRequest; }
00139             bool succeeded() const { return mSuccess; }
00141             const String& getMessages() const { return mMessages; }
00143             const Any& getData() const { return mData; }
00145             void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
00146         };
00147 
00161         class _OgreExport RequestHandler
00162         {
00163         public:
00164             RequestHandler() {}
00165             virtual ~RequestHandler() {}
00166 
00173             virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) 
00174             { (void)srcQ; return !req->getAborted(); }
00175 
00186             virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
00187         };
00188 
00196         class _OgreExport ResponseHandler
00197         {
00198         public:
00199             ResponseHandler() {}
00200             virtual ~ResponseHandler() {}
00201 
00208             virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) 
00209             { (void)srcQ; return !res->getRequest()->getAborted(); }
00210 
00218             virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
00219         };
00220 
00221         WorkQueue() : mNextChannel(0) {}
00222         virtual ~WorkQueue() {}
00223 
00228         virtual void startup(bool forceRestart = true) = 0;
00238         virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
00240         virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
00241 
00251         virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
00253         virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
00254 
00272         virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
00273             bool forceSynchronous = false, bool idleThread = false) = 0;
00274 
00280         virtual void abortRequest(RequestID id) = 0;
00281 
00288         virtual void abortRequestsByChannel(uint16 channel) = 0;
00289 
00296         virtual void abortPendingRequestsByChannel(uint16 channel) = 0;
00297 
00302         virtual void abortAllRequests() = 0;
00303         
00309         virtual void setPaused(bool pause) = 0;
00311         virtual bool isPaused() const = 0;
00312 
00317         virtual void setRequestsAccepted(bool accept) = 0;
00319         virtual bool getRequestsAccepted() const = 0;
00320 
00329         virtual void processResponses() = 0; 
00330 
00334         virtual unsigned long getResponseProcessingTimeLimit() const = 0;
00335 
00341         virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
00342 
00345         virtual void shutdown() = 0;
00346 
00354         virtual uint16 getChannel(const String& channelName);
00355 
00356     };
00357 
00360     class _OgreExport DefaultWorkQueueBase : public WorkQueue
00361     {
00362     public:
00363 
00368         DefaultWorkQueueBase(const String& name = StringUtil::BLANK);
00369         virtual ~DefaultWorkQueueBase();
00371         const String& getName() const;
00375         virtual size_t getWorkerThreadCount() const;
00376 
00382         virtual void setWorkerThreadCount(size_t c);
00383 
00393         virtual bool getWorkersCanAccessRenderSystem() const;
00394 
00395 
00407         virtual void setWorkersCanAccessRenderSystem(bool access);
00408 
00416         virtual void _processNextRequest();
00417 
00419         virtual void _threadMain() = 0;
00420 
00422         virtual bool isShuttingDown() const { return mShuttingDown; }
00423 
00425         virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
00427         virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
00429         virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
00431         virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
00432 
00434         virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
00435             bool forceSynchronous = false, bool idleThread = false);
00437         virtual void abortRequest(RequestID id);
00439         virtual void abortRequestsByChannel(uint16 channel);
00441         virtual void abortPendingRequestsByChannel(uint16 channel);
00443         virtual void abortAllRequests();
00445         virtual void setPaused(bool pause);
00447         virtual bool isPaused() const;
00449         virtual void setRequestsAccepted(bool accept);
00451         virtual bool getRequestsAccepted() const;
00453         virtual void processResponses(); 
00455         virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
00457         virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
00458     protected:
00459         String mName;
00460         size_t mWorkerThreadCount;
00461         bool mWorkerRenderSystemAccess;
00462         bool mIsRunning;
00463         unsigned long mResposeTimeLimitMS;
00464 
00465         typedef deque<Request*>::type RequestQueue;
00466         typedef deque<Response*>::type ResponseQueue;
00467         RequestQueue mRequestQueue; // Guarded by mRequestMutex
00468         RequestQueue mProcessQueue; // Guarded by mProcessMutex
00469         ResponseQueue mResponseQueue; // Guarded by mResponseMutex
00470 
00472         struct _OgreExport WorkerFunc OGRE_THREAD_WORKER_INHERIT
00473         {
00474             DefaultWorkQueueBase* mQueue;
00475 
00476             WorkerFunc(DefaultWorkQueueBase* q) 
00477                 : mQueue(q) {}
00478 
00479             void operator()();
00480             
00481             void operator()() const;
00482 
00483             void run();
00484         };
00485         WorkerFunc* mWorkerFunc;
00486 
00491         class _OgreExport RequestHandlerHolder : public UtilityAlloc
00492         {
00493         protected:
00494             OGRE_RW_MUTEX(mRWMutex);
00495             RequestHandler* mHandler;
00496         public:
00497             RequestHandlerHolder(RequestHandler* handler)
00498                 : mHandler(handler) {}
00499 
00500             // Disconnect the handler to allow it to be destroyed
00501             void disconnectHandler()
00502             {
00503                 // write lock - must wait for all requests to finish
00504                 OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
00505                 mHandler = 0;
00506             }
00507 
00511             RequestHandler* getHandler() { return mHandler; }
00512 
00516             Response* handleRequest(const Request* req, const WorkQueue* srcQ)
00517             {
00518                 // Read mutex so that multiple requests can be processed by the
00519                 // same handler in parallel if required
00520                 OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
00521                 Response* response = 0;
00522                 if (mHandler)
00523                 {
00524                     if (mHandler->canHandleRequest(req, srcQ))
00525                     {
00526                         response = mHandler->handleRequest(req, srcQ);
00527                     }
00528                 }
00529                 return response;
00530             }
00531 
00532         };
00533         // Hold these by shared pointer so they can be copied keeping same instance
00534         typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr;
00535 
00536         typedef list<RequestHandlerHolderPtr>::type RequestHandlerList;
00537         typedef list<ResponseHandler*>::type ResponseHandlerList;
00538         typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel;
00539         typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel;
00540 
00541         RequestHandlerListByChannel mRequestHandlers;
00542         ResponseHandlerListByChannel mResponseHandlers;
00543         RequestID mRequestCount; // Guarded by mRequestMutex
00544         bool mPaused;
00545         bool mAcceptRequests;
00546         bool mShuttingDown;
00547 
00548         //NOTE: If you lock multiple mutexes at the same time, the order is important!
00549         // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex,
00550         // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead!
00551         //RULE: Lock mProcessMutex before other mutex, to prevent livelocks
00552         OGRE_MUTEX(mIdleMutex);
00553         OGRE_MUTEX(mRequestMutex);
00554         OGRE_MUTEX(mProcessMutex);
00555         OGRE_MUTEX(mResponseMutex);
00556         OGRE_RW_MUTEX(mRequestHandlerMutex);
00557 
00558 
00559         void processRequestResponse(Request* r, bool synchronous);
00560         Response* processRequest(Request* r);
00561         void processResponse(Response* r);
00563         virtual void notifyWorkers() = 0;
00565         void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
00566         
00567         RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex
00568         bool mIdleThreadRunning; // Guarded by mIdleMutex
00569         Request* mIdleProcessed; // Guarded by mProcessMutex
00570         
00571 
00572         bool processIdleRequests();
00573     };
00574 
00575 
00576 
00577 
00578 
00582 }
00583 
00584 #include "OgreHeaderSuffix.h"
00585 
00586 #endif
00587 

Copyright © 2012 Torus Knot Software Ltd
Creative Commons License
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.
Last modified Mon Jul 27 2020 13:40:48