UniSet 1.7.0
include/UNetReceiver.h
00001 #ifndef UNetReceiver_H_
00002 #define UNetReceiver_H_
00003 // -----------------------------------------------------------------------------
00004 #include <ostream>
00005 #include <string>
00006 #include <queue>
00007 #include <cc++/socket.h>
00008 #include <sigc++/sigc++.h>
00009 #include "UniSetObject_LT.h"
00010 #include "Trigger.h"
00011 #include "Mutex.h"
00012 #include "SMInterface.h"
00013 #include "SharedMemory.h"
00014 #include "ThreadCreator.h"
00015 #include "UDPPacket.h"
00016 // -----------------------------------------------------------------------------
00017 /*  Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
00018  * ===============
00019  * Собственно реализация сделана так:
00020  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
00021  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
00022  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
00023  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
00024  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
00025  * Всё это реализовано в функции UNetReceiver::real_update()
00026  *
00027  * КЭШ
00028  * ===
00029  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
00030  * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
00031  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
00032  * Порядковый номер данных в пакете является индексом в кэше.
00033  * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
00034  * ID который пришёл в пакете - элемент кэша обновляется.
00035  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
00036  *
00037  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
00038  * =========================================================================
00039  * Для защиты от сбоя счётика сделана следующая логика:
00040  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
00041  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
00042  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
00043  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
00044  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
00045  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
00046  * затирают старые, если их не успели вынуть и обработать.
00047  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
00048 */
00049 // -----------------------------------------------------------------------------
00050 class UNetReceiver
00051 {
00052     public:
00053         UNetReceiver( const std::string& host, const ost::tpport_t port, SMInterface* smi );
00054         ~UNetReceiver();
00055 
00056          void start();
00057          void stop();
00058 
00059          void receive();
00060          void update();
00061 
00062          inline std::string getName(){ return myname; }
00063 
00064          // блокировать сохранение данный в SM
00065          void setLockUpdate( bool st );
00066 
00067          void resetTimeout();
00068 
00069          inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); }
00070          inline unsigned long getLostPacketsNum(){ return lostPackets; }
00071 
00072          void setReceiveTimeout( timeout_t msec );
00073          void setReceivePause( timeout_t msec );
00074          void setUpdatePause( timeout_t msec );
00075          void setLostTimeout( timeout_t msec );
00076          void setPrepareTime( timeout_t msec );
00077          void setMaxDifferens( unsigned long set );
00078 
00079          void setRespondID( UniSetTypes::ObjectId id, bool invert=false );
00080          void setLostPacketsID( UniSetTypes::ObjectId id );
00081 
00082          void setMaxProcessingCount( int set );
00083 
00084          inline ost::IPV4Address getAddress(){ return addr; }
00085          inline ost::tpport_t getPort(){ return port; }
00086 
00088          enum Event
00089          {
00090              evOK,      
00091              evTimeout  
00092          };
00093 
00094          typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
00095          void connectEvent( EventSlot sl );
00096 
00100          void setIgnore( UniSetTypes::ObjectId id=UniSetTypes::DefaultObjectId, bool set=true );
00101 
00102     protected:
00103         UNetReceiver();
00104 
00105         SMInterface* shm;
00106 
00107         bool recv();
00108         void step();
00109         virtual void real_update();
00110 
00111         std::string myname;
00112         
00113         struct ItemInfo
00114         {
00115             long id;
00116             IOController::AIOStateList::iterator ait;
00117             IOController::DIOStateList::iterator dit;
00118             UniversalIO::IOTypes iotype;
00119             bool ignore; 
00121             ItemInfo():
00122                 id(UniSetTypes::DefaultObjectId),
00123                 iotype(UniversalIO::UnknownIOType),
00124                 ignore(false){}
00125         };
00126         virtual void updateDItem( ItemInfo& ii, const long& id, bool val );
00127         virtual void updateAItem( ItemInfo& ii, const UniSetUDP::UDPAData& d );
00128 
00129         void initIterators();
00130 
00131         typedef std::vector<ItemInfo> ItemVec;
00132         ItemVec d_icache;   
00133         ItemVec a_icache;   
00135         bool d_cache_init_ok;
00136         bool a_cache_init_ok;
00137     private:
00138 
00139         int recvpause;      
00140         int updatepause;    
00142         ost::UDPReceive* udp;
00143         ost::IPV4Address addr;
00144         ost::tpport_t port;
00145 
00146         UniSetTypes::uniset_mutex pollMutex;
00147         PassiveTimer ptRecvTimeout;
00148         PassiveTimer ptPrepare;
00149         timeout_t recvTimeout;
00150         timeout_t prepareTime;
00151         timeout_t lostTimeout;
00152         PassiveTimer ptLostTimeout;
00153         unsigned long lostPackets; 
00155         UniSetTypes::ObjectId sidRespond;
00156         IOController::DIOStateList::iterator ditRespond;
00157         bool respondInvert;
00158         UniSetTypes::ObjectId sidLostPackets;
00159         IOController::AIOStateList::iterator aitLostPackets;
00160 
00161         bool activated;
00162         
00163         ThreadCreator<UNetReceiver>* r_thr;     // receive thread
00164         ThreadCreator<UNetReceiver>* u_thr;     // update thread
00165 
00166         // функция определения приоритетного сообщения для обработки
00167         struct PacketCompare:
00168         public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
00169         {
00170             inline bool operator()(const UniSetUDP::UDPMessage& lhs,
00171                             const UniSetUDP::UDPMessage& rhs) const
00172                     { return lhs.num > rhs.num; }
00173         };
00174         typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
00175         PacketQueue qpack;  
00176         UniSetUDP::UDPMessage pack;     
00177         UniSetUDP::UDPPacket r_buf;
00178         UniSetTypes::uniset_mutex packMutex; 
00179         unsigned long pnum; 
00184         unsigned long maxDifferens;
00185 
00186         PacketQueue qtmp;   
00187         bool waitClean;     
00188         unsigned long rnum; 
00190         int maxProcessingCount; 
00192         bool lockUpdate; 
00193         UniSetTypes::uniset_mutex lockMutex;
00194         
00195         EventSlot slEvent;
00196         Trigger trTimeout;
00197         UniSetTypes::uniset_mutex tmMutex;
00198 
00199         virtual void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
00200         virtual void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
00201 };
00202 // -----------------------------------------------------------------------------
00203 #endif // UNetReceiver_H_
00204 // -----------------------------------------------------------------------------