|
UniSet 1.7.0
|
00001 #ifndef UNetReceiver_H_ 00002 #define UNetReceiver_H_ 00003 // ----------------------------------------------------------------------------- 00004 #include <ostream> 00005 #include <string> 00006 #include <queue> 00007 #include <cc++/socket.h> 00008 #include <sigc++/sigc++.h> 00009 #include "UniSetObject_LT.h" 00010 #include "Trigger.h" 00011 #include "Mutex.h" 00012 #include "SMInterface.h" 00013 #include "SharedMemory.h" 00014 #include "ThreadCreator.h" 00015 #include "UDPPacket.h" 00016 // ----------------------------------------------------------------------------- 00017 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. 00018 * =============== 00019 * Собственно реализация сделана так: 00020 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности 00021 * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета 00022 * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", 00023 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) 00024 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. 00025 * Всё это реализовано в функции UNetReceiver::real_update() 00026 * 00027 * КЭШ 00028 * === 00029 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. 00030 * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность. 00031 * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо). 00032 * Порядковый номер данных в пакете является индексом в кэше. 00033 * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, 00034 * ID который пришёл в пакете - элемент кэша обновляется. 00035 * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется. 00036 * 00037 * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) 00038 * ========================================================================= 00039 * Для защиты от сбоя счётика сделана следующая логика: 00040 * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, 00041 * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. 00042 * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, 00043 * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. 00044 * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься 00045 * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные 00046 * затирают старые, если их не успели вынуть и обработать. 00047 * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. 00048 */ 00049 // ----------------------------------------------------------------------------- 00050 class UNetReceiver 00051 { 00052 public: 00053 UNetReceiver( const std::string& host, const ost::tpport_t port, SMInterface* smi ); 00054 ~UNetReceiver(); 00055 00056 void start(); 00057 void stop(); 00058 00059 void receive(); 00060 void update(); 00061 00062 inline std::string getName(){ return myname; } 00063 00064 // блокировать сохранение данный в SM 00065 void setLockUpdate( bool st ); 00066 00067 void resetTimeout(); 00068 00069 inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); } 00070 inline unsigned long getLostPacketsNum(){ return lostPackets; } 00071 00072 void setReceiveTimeout( timeout_t msec ); 00073 void setReceivePause( timeout_t msec ); 00074 void setUpdatePause( timeout_t msec ); 00075 void setLostTimeout( timeout_t msec ); 00076 void setPrepareTime( timeout_t msec ); 00077 void setMaxDifferens( unsigned long set ); 00078 00079 void setRespondID( UniSetTypes::ObjectId id, bool invert=false ); 00080 void setLostPacketsID( UniSetTypes::ObjectId id ); 00081 00082 void setMaxProcessingCount( int set ); 00083 00084 inline ost::IPV4Address getAddress(){ return addr; } 00085 inline ost::tpport_t getPort(){ return port; } 00086 00088 enum Event 00089 { 00090 evOK, 00091 evTimeout 00092 }; 00093 00094 typedef sigc::slot<void,UNetReceiver*,Event> EventSlot; 00095 void connectEvent( EventSlot sl ); 00096 00100 void setIgnore( UniSetTypes::ObjectId id=UniSetTypes::DefaultObjectId, bool set=true ); 00101 00102 protected: 00103 UNetReceiver(); 00104 00105 SMInterface* shm; 00106 00107 bool recv(); 00108 void step(); 00109 virtual void real_update(); 00110 00111 std::string myname; 00112 00113 struct ItemInfo 00114 { 00115 long id; 00116 IOController::AIOStateList::iterator ait; 00117 IOController::DIOStateList::iterator dit; 00118 UniversalIO::IOTypes iotype; 00119 bool ignore; 00121 ItemInfo(): 00122 id(UniSetTypes::DefaultObjectId), 00123 iotype(UniversalIO::UnknownIOType), 00124 ignore(false){} 00125 }; 00126 virtual void updateDItem( ItemInfo& ii, const long& id, bool val ); 00127 virtual void updateAItem( ItemInfo& ii, const UniSetUDP::UDPAData& d ); 00128 00129 void initIterators(); 00130 00131 typedef std::vector<ItemInfo> ItemVec; 00132 ItemVec d_icache; 00133 ItemVec a_icache; 00135 bool d_cache_init_ok; 00136 bool a_cache_init_ok; 00137 private: 00138 00139 int recvpause; 00140 int updatepause; 00142 ost::UDPReceive* udp; 00143 ost::IPV4Address addr; 00144 ost::tpport_t port; 00145 00146 UniSetTypes::uniset_mutex pollMutex; 00147 PassiveTimer ptRecvTimeout; 00148 PassiveTimer ptPrepare; 00149 timeout_t recvTimeout; 00150 timeout_t prepareTime; 00151 timeout_t lostTimeout; 00152 PassiveTimer ptLostTimeout; 00153 unsigned long lostPackets; 00155 UniSetTypes::ObjectId sidRespond; 00156 IOController::DIOStateList::iterator ditRespond; 00157 bool respondInvert; 00158 UniSetTypes::ObjectId sidLostPackets; 00159 IOController::AIOStateList::iterator aitLostPackets; 00160 00161 bool activated; 00162 00163 ThreadCreator<UNetReceiver>* r_thr; // receive thread 00164 ThreadCreator<UNetReceiver>* u_thr; // update thread 00165 00166 // функция определения приоритетного сообщения для обработки 00167 struct PacketCompare: 00168 public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool> 00169 { 00170 inline bool operator()(const UniSetUDP::UDPMessage& lhs, 00171 const UniSetUDP::UDPMessage& rhs) const 00172 { return lhs.num > rhs.num; } 00173 }; 00174 typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue; 00175 PacketQueue qpack; 00176 UniSetUDP::UDPMessage pack; 00177 UniSetUDP::UDPPacket r_buf; 00178 UniSetTypes::uniset_mutex packMutex; 00179 unsigned long pnum; 00184 unsigned long maxDifferens; 00185 00186 PacketQueue qtmp; 00187 bool waitClean; 00188 unsigned long rnum; 00190 int maxProcessingCount; 00192 bool lockUpdate; 00193 UniSetTypes::uniset_mutex lockMutex; 00194 00195 EventSlot slEvent; 00196 Trigger trTimeout; 00197 UniSetTypes::uniset_mutex tmMutex; 00198 00199 virtual void initDCache( UniSetUDP::UDPMessage& pack, bool force=false ); 00200 virtual void initACache( UniSetUDP::UDPMessage& pack, bool force=false ); 00201 }; 00202 // ----------------------------------------------------------------------------- 00203 #endif // UNetReceiver_H_ 00204 // -----------------------------------------------------------------------------
1.7.4