UniSet  1.7.0
include/UNetReceiver.h
00001 #ifndef UNetReceiver_H_
00002 #define UNetReceiver_H_
00003 // -----------------------------------------------------------------------------
00004 #include <ostream>
00005 #include <string>
00006 #include <queue>
00007 #include <cc++/socket.h>
00008 #include <sigc++/sigc++.h>
00009 #include "UniSetObject_LT.h"
00010 #include "Trigger.h"
00011 #include "Mutex.h"
00012 #include "SMInterface.h"
00013 #include "SharedMemory.h"
00014 #include "ThreadCreator.h"
00015 #include "UDPPacket.h"
00016 // -----------------------------------------------------------------------------
00017 /*  Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
00018  * ===============
00019  * Собственно реализация сделана так:
00020  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
00021  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
00022  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
00023  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
00024  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
00025  * Всё это реализовано в функции UNetReceiver::real_update()
00026  *
00027  * КЭШ
00028  * ===
00029  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
00030  * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность.
00031  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
00032  * Порядковый номер данных в пакете является индексом в кэше.
00033  * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
00034  * ID который пришёл в пакете - элемент кэша обновляется.
00035  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
00036  *
00037  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
00038  * =========================================================================
00039  * Для защиты от сбоя счётика сделана следующая логика:
00040  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
00041  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
00042  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
00043  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
00044  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
00045  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
00046  * затирают старые, если их не успели вынуть и обработать.
00047  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
00048 */
00049 // -----------------------------------------------------------------------------
00050 class UNetReceiver
00051 {
00052     public:
00053         UNetReceiver( const std::string& host, const ost::tpport_t port, SMInterface* smi );
00054         ~UNetReceiver();
00055 
00056          void start();
00057          void stop();
00058 
00059          void receive();
00060          void update();
00061 
00062          inline std::string getName(){ return myname; }
00063 
00064          // блокировать сохранение данный в SM
00065          void setLockUpdate( bool st );
00066 
00067          void resetTimeout();
00068 
00069          inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); }
00070          inline unsigned long getLostPacketsNum(){ return lostPackets; }
00071 
00072          void setReceiveTimeout( timeout_t msec );
00073          void setReceivePause( timeout_t msec );
00074          void setUpdatePause( timeout_t msec );
00075          void setLostTimeout( timeout_t msec );
00076          void setPrepareTime( timeout_t msec );
00077          void setMaxDifferens( unsigned long set );
00078 
00079          void setRespondID( UniSetTypes::ObjectId id, bool invert=false );
00080          void setLostPacketsID( UniSetTypes::ObjectId id );
00081 
00082          void setMaxProcessingCount( int set );
00083 
00084          inline ost::IPV4Address getAddress(){ return addr; }
00085          inline ost::tpport_t getPort(){ return port; }
00086 
00088          enum Event
00089          {
00090              evOK,      
00091              evTimeout  
00092          };
00093 
00094          typedef sigc::slot<void,UNetReceiver*,Event> EventSlot;
00095          void connectEvent( EventSlot sl );
00096 
00097     protected:
00098 
00099         SMInterface* shm;
00100 
00101         bool recv();
00102         void step();
00103         void real_update();
00104 
00105         void initIterators();
00106 
00107     private:
00108         UNetReceiver();
00109 
00110         int recvpause;      
00111         int updatepause;    
00113         ost::UDPReceive* udp;
00114         ost::IPV4Address addr;
00115         ost::tpport_t port;
00116         std::string myname;
00117 
00118         UniSetTypes::uniset_mutex pollMutex;
00119         PassiveTimer ptRecvTimeout;
00120         PassiveTimer ptPrepare;
00121         timeout_t recvTimeout;
00122         timeout_t prepareTime;
00123         timeout_t lostTimeout;
00124         PassiveTimer ptLostTimeout;
00125         unsigned long lostPackets; 
00127         UniSetTypes::ObjectId sidRespond;
00128         IOController::DIOStateList::iterator ditRespond;
00129         bool respondInvert;
00130         UniSetTypes::ObjectId sidLostPackets;
00131         IOController::AIOStateList::iterator aitLostPackets;
00132 
00133         bool activated;
00134         
00135         ThreadCreator<UNetReceiver>* r_thr;     // receive thread
00136         ThreadCreator<UNetReceiver>* u_thr;     // update thread
00137 
00138         // функция определения приоритетного сообщения для обработки
00139         struct PacketCompare:
00140         public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
00141         {
00142             inline bool operator()(const UniSetUDP::UDPMessage& lhs,
00143                             const UniSetUDP::UDPMessage& rhs) const
00144                     { return lhs.num > rhs.num; }
00145         };
00146         typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue;
00147         PacketQueue qpack;  
00148         UniSetUDP::UDPMessage pack;     
00149         UniSetUDP::UDPPacket r_buf;
00150         UniSetTypes::uniset_mutex packMutex; 
00151         unsigned long pnum; 
00156         unsigned long maxDifferens;
00157 
00158         PacketQueue qtmp;   
00159         bool waitClean;     
00160         unsigned long rnum; 
00162         int maxProcessingCount; 
00164         bool lockUpdate; 
00165         UniSetTypes::uniset_mutex lockMutex;
00166         
00167         EventSlot slEvent;
00168         Trigger trTimeout;
00169         UniSetTypes::uniset_mutex tmMutex;
00170 
00171         struct ItemInfo
00172         {
00173             long id;
00174             IOController::AIOStateList::iterator ait;
00175             IOController::DIOStateList::iterator dit;
00176             UniversalIO::IOTypes iotype;
00177 
00178             ItemInfo():
00179                 id(UniSetTypes::DefaultObjectId),
00180                 iotype(UniversalIO::UnknownIOType){}
00181         };
00182 
00183         typedef std::vector<ItemInfo> ItemVec;
00184         ItemVec d_icache;   
00185         ItemVec a_icache;   
00187         bool d_cache_init_ok;
00188         bool a_cache_init_ok;
00189 
00190         void initDCache( UniSetUDP::UDPMessage& pack, bool force=false );
00191         void initACache( UniSetUDP::UDPMessage& pack, bool force=false );
00192 };
00193 // -----------------------------------------------------------------------------
00194 #endif // UNetReceiver_H_
00195 // -----------------------------------------------------------------------------