UniSet  2.12.1
UNetReceiver.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetReceiver_H_
18 #define UNetReceiver_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <memory>
22 #include <string>
23 #include <queue>
24 #include <unordered_map>
25 #include <sigc++/sigc++.h>
26 #include <ev++.h>
27 #include "UniSetObject.h"
28 #include "Trigger.h"
29 #include "Mutex.h"
30 #include "SMInterface.h"
31 #include "SharedMemory.h"
32 #include "UDPPacket.h"
33 #include "CommonEventLoop.h"
34 #include "UNetTransport.h"
35 // --------------------------------------------------------------------------
36 namespace uniset
37 {
38  // -----------------------------------------------------------------------------
39  /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40  * ===============
41  * Собственно реализация сделана так:
42  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
43  * что были посланы, сделана очередь с приоритетом. В качестве приоритета используется номер пакета
44  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
45  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
46  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
47  * Всё это реализовано в функции UNetReceiver::real_update()
48  *
49  * КЭШ
50  * ===
51  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
52  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
53  * Порядковый номер данных в пакете является индексом в кэше.
54  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
55  * ID который пришёл в пакете - элемент кэша обновляется.
56  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
57  *
58  * КЭШ (ДОПОЛНЕНИЕ)
59  * ===
60  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
61  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
62  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
63  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
64  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
65  *
66  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
67  * =========================================================================
68  * Для защиты от сбоя счётчика сделана следующая логика:
69  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
70  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
71  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
72  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
73  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
74  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
75  * затирают старые, если их не успели вынуть и обработать.
76  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
77  * =========================================================================
78  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
79  *
80  * Создание соединения (открытие сокета)
81  * ======================================
82  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
83  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
84  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
85  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
86  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
87  * Если такая логика не требуется, то можно задать в конструкторе
88  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
89  * выкинуто исключение при неудачной попытке создания соединения.
90  *
91  * Стратегия обновления данных в SM
92  * ==================================
93  * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
94  * Поддерживается два варианта:
95  * 'thread' - отдельный поток обновления
96  * 'evloop' - использование общего с приёмом event loop (libev)
97  */
98  // -----------------------------------------------------------------------------
99  class UNetReceiver:
100  protected EvWatcher,
101  public std::enable_shared_from_this<UNetReceiver>
102  {
103  public:
104  UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
105  , bool nocheckConnection = false
106  , const std::string& prefix = "unet" );
107  virtual ~UNetReceiver();
108 
109  void start();
110  void stop();
111 
112  inline const std::string getName() const
113  {
114  return myname;
115  }
116 
117  // блокировать сохранение данных в SM
118  void setLockUpdate( bool st ) noexcept;
119  bool isLockUpdate() const noexcept;
120 
121  void resetTimeout() noexcept;
122 
123  bool isInitOK() const noexcept;
124  bool isRecvOK() const noexcept;
125  size_t getLostPacketsNum() const noexcept;
126 
127  void setReceiveTimeout( timeout_t msec ) noexcept;
128  void setReceivePause( timeout_t msec ) noexcept;
129  void setUpdatePause( timeout_t msec ) noexcept;
130  void setLostTimeout( timeout_t msec ) noexcept;
131  void setPrepareTime( timeout_t msec ) noexcept;
132  void setCheckConnectionPause( timeout_t msec ) noexcept;
133  void setMaxDifferens( unsigned long set ) noexcept;
134  void setEvrunTimeout(timeout_t msec ) noexcept;
135  void setInitPause( timeout_t msec ) noexcept;
136 
137  void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
138  void setLostPacketsID( uniset::ObjectId id ) noexcept;
139 
140  void setMaxProcessingCount( int set ) noexcept;
141 
142  void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
143 
144  inline std::string getTransportID() const noexcept
145  {
146  return transport->ID();
147  }
148 
150  enum Event
151  {
154  };
155 
156  typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
157  void connectEvent( EventSlot sl ) noexcept;
158 
159  // --------------------------------------------------------------------
162  {
163  useUpdateUnknown,
166  };
167 
168  static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
169  static std::string to_string( UpdateStrategy s) noexcept;
170 
172  void setUpdateStrategy( UpdateStrategy set );
173 
174  // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
175  // (т.к. при evloop mutex захватывать не нужно)
177  {
178  public:
179  pack_guard( std::mutex& m, UpdateStrategy s );
180  ~pack_guard();
181 
182  protected:
183  std::mutex& m;
184  UpdateStrategy s;
185  };
186 
187  // --------------------------------------------------------------------
188 
189  inline std::shared_ptr<DebugStream> getLog()
190  {
191  return unetlog;
192  }
193 
194  virtual const std::string getShortInfo() const noexcept;
195 
196  protected:
197 
198  const std::shared_ptr<SMInterface> shm;
199  std::shared_ptr<DebugStream> unetlog;
200 
201  bool receive() noexcept;
202  void step() noexcept;
203  void update() noexcept;
204  void updateThread() noexcept;
205  void callback( ev::io& watcher, int revents ) noexcept;
206  void readEvent( ev::io& watcher ) noexcept;
207  void updateEvent( ev::periodic& watcher, int revents ) noexcept;
208  void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
209  void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
210  void initEvent( ev::timer& watcher, int revents ) noexcept;
211  virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
212  virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
213  virtual std::string wname() const noexcept override
214  {
215  return myname;
216  }
217 
218  void initIterators() noexcept;
219  bool createConnection( bool throwEx = false );
220  void checkConnection();
221 
222  public:
223 
224  // функция определения приоритетного сообщения для обработки
226  public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
227  {
228  inline bool operator()(const UniSetUDP::UDPMessage& lhs,
229  const UniSetUDP::UDPMessage& rhs) const
230  {
231  return lhs.num > rhs.num;
232  }
233  };
234 
235  typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
236 
237  private:
238  UNetReceiver();
239 
240  timeout_t recvpause = { 10 };
241  timeout_t updatepause = { 100 };
243  std::unique_ptr<UNetReceiveTransport> transport;
244  std::string addr;
245  std::string myname;
246  ev::io evReceive;
247  ev::periodic evCheckConnection;
248  ev::periodic evStatistic;
249  ev::periodic evUpdate;
250  ev::timer evInitPause;
251 
252  UpdateStrategy upStrategy = { useUpdateEventLoop };
253 
254  // счётчики для подсчёта статистики
255  size_t recvCount = { 0 };
256  size_t upCount = { 0 };
257 
258  // текущая статистик
259  size_t statRecvPerSec = { 0 };
260  size_t statUpPerSec = { 0 };
262  std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
263 
264  // делаем loop общим.. одним на всех!
265  static CommonEventLoop loop;
266 
267  double checkConnectionTime = { 10.0 }; // sec
268  std::mutex checkConnMutex;
269 
270  PassiveTimer ptRecvTimeout;
271  PassiveTimer ptPrepare;
272  timeout_t recvTimeout = { 5000 }; // msec
273  timeout_t prepareTime = { 2000 };
274  timeout_t evrunTimeout = { 15000 };
275  timeout_t lostTimeout = { 200 };
276 
277  double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
278  std::atomic_bool initOK = { false };
279 
280  PassiveTimer ptLostTimeout;
281  size_t lostPackets = { 0 };
283  uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
284  IOController::IOStateList::iterator itRespond;
285  bool respondInvert = { false };
286  uniset::ObjectId sidLostPackets;
287  IOController::IOStateList::iterator itLostPackets;
288 
289  std::atomic_bool activated = { false };
290 
291  PacketQueue qpack;
292  UniSetUDP::UDPMessage pack;
293  UniSetUDP::UDPPacket r_buf;
294  std::mutex packMutex;
295  size_t pnum = { 0 };
300  size_t maxDifferens = { 20 };
301 
302  PacketQueue qtmp;
303  bool waitClean = { false };
304  size_t rnum = { 0 };
306  size_t maxProcessingCount = { 100 };
308  std::atomic_bool lockUpdate = { false };
310  EventSlot slEvent;
311  Trigger trTimeout;
312  std::mutex tmMutex;
313 
314  struct CacheItem
315  {
316  long id = { uniset::DefaultObjectId };
317  IOController::IOStateList::iterator ioit;
318 
319  CacheItem():
320  id(uniset::DefaultObjectId) {}
321  };
322 
323  typedef std::vector<CacheItem> CacheVec;
324  struct CacheInfo
325  {
326  CacheInfo():
327  cache_init_ok(false) {}
328 
329  bool cache_init_ok = { false };
330  CacheVec cache;
331  };
332 
333  // ключом является UDPMessage::getDataID()
334  typedef std::unordered_map<long, CacheInfo> CacheMap;
335  CacheMap d_icache_map;
336  CacheMap a_icache_map;
338  bool d_cache_init_ok = { false };
339  bool a_cache_init_ok = { false };
340 
341  void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
342  void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
343  };
344  // --------------------------------------------------------------------------
345 } // end of namespace uniset
346 // -----------------------------------------------------------------------------
347 #endif // UNetReceiver_H_
348 // -----------------------------------------------------------------------------
Definition: DebugStream.h:61
Definition: SMInterface.h:31
Definition: CommonEventLoop.h:14
Definition: UNetReceiver.h:164
Definition: CommonEventLoop.h:18
Definition: UNetReceiver.h:153
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UDPPacket.h:106
Definition: UNetReceiver.h:225
Event
Definition: UNetReceiver.h:150
void setUpdateStrategy(UpdateStrategy set)
функция должна вызываться до первого вызова start()
Definition: UNetReceiver.cc:953
Definition: UNetReceiver.h:176
Definition: UNetReceiver.h:152
Definition: UNetReceiver.h:99
UpdateStrategy
Definition: UNetReceiver.h:161
Definition: UNetReceiver.h:165
long ObjectId
Definition: UniSetTypes_i.idl:30