|
UniSet
1.7.0
|
00001 #ifndef UNetReceiver_H_ 00002 #define UNetReceiver_H_ 00003 // ----------------------------------------------------------------------------- 00004 #include <ostream> 00005 #include <string> 00006 #include <queue> 00007 #include <cc++/socket.h> 00008 #include <sigc++/sigc++.h> 00009 #include "UniSetObject_LT.h" 00010 #include "Trigger.h" 00011 #include "Mutex.h" 00012 #include "SMInterface.h" 00013 #include "SharedMemory.h" 00014 #include "ThreadCreator.h" 00015 #include "UDPPacket.h" 00016 // ----------------------------------------------------------------------------- 00017 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP. 00018 * =============== 00019 * Собственно реализация сделана так: 00020 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности 00021 * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета 00022 * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд", 00023 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout) 00024 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше.. 00025 * Всё это реализовано в функции UNetReceiver::real_update() 00026 * 00027 * КЭШ 00028 * === 00029 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов. 00030 * Кэш расчитан на то, что принимаемые пакеты всегда имеют одну и ту же длину и последовательность. 00031 * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо). 00032 * Порядковый номер данных в пакете является индексом в кэше. 00033 * Для защиты от изменения поседовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем, 00034 * ID который пришёл в пакете - элемент кэша обновляется. 00035 * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется. 00036 * 00037 * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум) 00038 * ========================================================================= 00039 * Для защиты от сбоя счётика сделана следующая логика: 00040 * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается, 00041 * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью. 00042 * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет, 00043 * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка. 00044 * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься 00045 * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные 00046 * затирают старые, если их не успели вынуть и обработать. 00047 * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди. 00048 */ 00049 // ----------------------------------------------------------------------------- 00050 class UNetReceiver 00051 { 00052 public: 00053 UNetReceiver( const std::string& host, const ost::tpport_t port, SMInterface* smi ); 00054 ~UNetReceiver(); 00055 00056 void start(); 00057 void stop(); 00058 00059 void receive(); 00060 void update(); 00061 00062 inline std::string getName(){ return myname; } 00063 00064 // блокировать сохранение данный в SM 00065 void setLockUpdate( bool st ); 00066 00067 void resetTimeout(); 00068 00069 inline bool isRecvOK(){ return !ptRecvTimeout.checkTime(); } 00070 inline unsigned long getLostPacketsNum(){ return lostPackets; } 00071 00072 void setReceiveTimeout( timeout_t msec ); 00073 void setReceivePause( timeout_t msec ); 00074 void setUpdatePause( timeout_t msec ); 00075 void setLostTimeout( timeout_t msec ); 00076 void setPrepareTime( timeout_t msec ); 00077 void setMaxDifferens( unsigned long set ); 00078 00079 void setRespondID( UniSetTypes::ObjectId id, bool invert=false ); 00080 void setLostPacketsID( UniSetTypes::ObjectId id ); 00081 00082 void setMaxProcessingCount( int set ); 00083 00084 inline ost::IPV4Address getAddress(){ return addr; } 00085 inline ost::tpport_t getPort(){ return port; } 00086 00088 enum Event 00089 { 00090 evOK, 00091 evTimeout 00092 }; 00093 00094 typedef sigc::slot<void,UNetReceiver*,Event> EventSlot; 00095 void connectEvent( EventSlot sl ); 00096 00097 protected: 00098 00099 SMInterface* shm; 00100 00101 bool recv(); 00102 void step(); 00103 void real_update(); 00104 00105 void initIterators(); 00106 00107 private: 00108 UNetReceiver(); 00109 00110 int recvpause; 00111 int updatepause; 00113 ost::UDPReceive* udp; 00114 ost::IPV4Address addr; 00115 ost::tpport_t port; 00116 std::string myname; 00117 00118 UniSetTypes::uniset_mutex pollMutex; 00119 PassiveTimer ptRecvTimeout; 00120 PassiveTimer ptPrepare; 00121 timeout_t recvTimeout; 00122 timeout_t prepareTime; 00123 timeout_t lostTimeout; 00124 PassiveTimer ptLostTimeout; 00125 unsigned long lostPackets; 00127 UniSetTypes::ObjectId sidRespond; 00128 IOController::DIOStateList::iterator ditRespond; 00129 bool respondInvert; 00130 UniSetTypes::ObjectId sidLostPackets; 00131 IOController::AIOStateList::iterator aitLostPackets; 00132 00133 bool activated; 00134 00135 ThreadCreator<UNetReceiver>* r_thr; // receive thread 00136 ThreadCreator<UNetReceiver>* u_thr; // update thread 00137 00138 // функция определения приоритетного сообщения для обработки 00139 struct PacketCompare: 00140 public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool> 00141 { 00142 inline bool operator()(const UniSetUDP::UDPMessage& lhs, 00143 const UniSetUDP::UDPMessage& rhs) const 00144 { return lhs.num > rhs.num; } 00145 }; 00146 typedef std::priority_queue<UniSetUDP::UDPMessage,std::vector<UniSetUDP::UDPMessage>,PacketCompare> PacketQueue; 00147 PacketQueue qpack; 00148 UniSetUDP::UDPMessage pack; 00149 UniSetUDP::UDPPacket r_buf; 00150 UniSetTypes::uniset_mutex packMutex; 00151 unsigned long pnum; 00156 unsigned long maxDifferens; 00157 00158 PacketQueue qtmp; 00159 bool waitClean; 00160 unsigned long rnum; 00162 int maxProcessingCount; 00164 bool lockUpdate; 00165 UniSetTypes::uniset_mutex lockMutex; 00166 00167 EventSlot slEvent; 00168 Trigger trTimeout; 00169 UniSetTypes::uniset_mutex tmMutex; 00170 00171 struct ItemInfo 00172 { 00173 long id; 00174 IOController::AIOStateList::iterator ait; 00175 IOController::DIOStateList::iterator dit; 00176 UniversalIO::IOTypes iotype; 00177 00178 ItemInfo(): 00179 id(UniSetTypes::DefaultObjectId), 00180 iotype(UniversalIO::UnknownIOType){} 00181 }; 00182 00183 typedef std::vector<ItemInfo> ItemVec; 00184 ItemVec d_icache; 00185 ItemVec a_icache; 00187 bool d_cache_init_ok; 00188 bool a_cache_init_ok; 00189 00190 void initDCache( UniSetUDP::UDPMessage& pack, bool force=false ); 00191 void initACache( UniSetUDP::UDPMessage& pack, bool force=false ); 00192 }; 00193 // ----------------------------------------------------------------------------- 00194 #endif // UNetReceiver_H_ 00195 // -----------------------------------------------------------------------------
1.7.6.1