UniSet  2.8.0
UNetSender.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetSender_H_
18 #define UNetSender_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <string>
22 #include <vector>
23 #include <limits>
24 #include <unordered_map>
25 #include "UniSetObject.h"
26 #include "Trigger.h"
27 #include "Mutex.h"
28 #include "SMInterface.h"
29 #include "SharedMemory.h"
30 #include "ThreadCreator.h"
31 #include "UDPCore.h"
32 #include "UDPPacket.h"
33 // --------------------------------------------------------------------------
34 namespace uniset
35 {
36  // -----------------------------------------------------------------------------
37  /*
38  * Распределение датчиков по пакетам
39  * =========================================================================
40  * Все пересылаемые данные разбиваются на группы по частоте посылки("sendfactor").
41  * Частота посылки кратна sendpause, задаётся для каждого датчика, при помощи свойства prefix_sendfactor.
42  * Внутри каждой группы пакеты набираются по мере "заполнения".
43  *
44  * Добавление датчика в пакет и создание нового пакета при переполнении происходит в функции initItem().
45  * Причем так как дискретные и аналоговые датчики обрабатываются отдельно (но пересылаются в одном пакете),
46  * то датчики, которые первые переполнятся приводят к тому, что создаётся новый пакет и они добавляются в него,
47  * в свою очередь остальные продолжают "добивать" предыдущий пакет.
48  * В initItem() каждому UItem в dlist кроме pack_ind присваивается еще и номер пакета pack_num, который гарантировано соответствует
49  * существующему пакету, поэтому в дальнейшем при использовании pack_num в качестве ключа в mypacks мы не проверяем пакет на существование.
50  *
51  * ОПТИМИЗАЦИЯ N1: Для оптимизации обработки посылаемых пакетов (на стороне UNetSender) сделана следующая логика:
52  * Номер очередного посылаемого пакета меняется (увеличивается) только, если изменились данные с момента
53  последней посылки. Для этого по данным каждый раз производится расчёт UNetUDP::makeCRC() и сравнивается с последним.
54  На стороне UNetReceiver пакеты с повторными номерами (т.е. уже обработанные) - откидываются.
55  *
56  *
57  * Создание соединения
58  * ======================================
59  * Попытка создать соединение производиться сразу в конструкторе, если это не получается,
60  * то в потоке "посылки", с заданным периодом (checkConnectionTime) идёт попытка создать соединение..
61  * и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
62  * (в момент создания объекта UNetSender) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
63  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
64  * Если такая логика не требуется, то можно задать в конструкторе флаг nocheckconnection=true,
65  * тогда при создании объекта UNetSender, в конструкторе будет
66  * выкинуто исключение при неудачной попытке создания соединения.
67  * \warning setCheckConnectionPause(msec) должно быть кратно sendpause!
68  */
69  class UNetSender
70  {
71  public:
72  UNetSender( const std::string& host, const int port, const std::shared_ptr<SMInterface>& smi
73  , bool nocheckConnection = false
74  , const std::string& s_field = ""
75  , const std::string& s_fvalue = ""
76  , const std::string& prop_prefix = "unet"
77  , const std::string& prefix = "unet"
78  , size_t maxDCount = UniSetUDP::MaxDCount
79  , size_t maxACount = UniSetUDP::MaxACount );
80 
81  virtual ~UNetSender();
82 
83  typedef size_t sendfactor_t;
84 
85  static const long not_specified_value = { std::numeric_limits<long>::max() };
86 
87  struct UItem
88  {
89  UItem():
90  iotype(UniversalIO::UnknownIOType),
92  pack_num(0),
93  pack_ind(0),
94  pack_sendfactor(0) {}
95 
96  UniversalIO::IOType iotype;
98  IOController::IOStateList::iterator ioit;
99  size_t pack_num;
100  size_t pack_ind;
101  sendfactor_t pack_sendfactor = { 0 };
102  long undefined_value = { not_specified_value };
103  friend std::ostream& operator<<( std::ostream& os, UItem& p );
104  };
105 
106  typedef std::unordered_map<uniset::ObjectId, UItem> UItemMap;
107 
108  size_t getDataPackCount() const;
109 
110  void start();
111  void stop();
112 
113  void send() noexcept;
114 
115  struct PackMessage
116  {
117  PackMessage( UniSetUDP::UDPMessage&& m ) noexcept: msg(std::move(m)) {}
118  PackMessage( const UniSetUDP::UDPMessage& m ) = delete;
119 
120  PackMessage() noexcept {}
121 
124  };
125 
126  void real_send( PackMessage& mypack ) noexcept;
127 
129  void updateFromSM();
130 
132  void updateSensor( uniset::ObjectId id, long value );
133 
135  void updateItem( UItem& it, long value );
136 
137  inline void setSendPause( int msec )
138  {
139  sendpause = msec;
140  }
141  inline void setPackSendPause( int msec )
142  {
143  packsendpause = msec;
144  }
145  inline void setPackSendPauseFactor( int factor )
146  {
147  packsendpauseFactor = factor;
148  }
149 
150  void setCheckConnectionPause( int msec );
151 
153  void askSensors( UniversalIO::UIOCommand cmd );
154 
156  void initIterators();
157 
158  inline std::shared_ptr<DebugStream> getLog()
159  {
160  return unetlog;
161  }
162 
163  virtual const std::string getShortInfo() const;
164 
165  inline std::string getAddress() const
166  {
167  return addr;
168  }
169  inline int getPort() const
170  {
171  return port;
172  }
173 
174  inline size_t getADataSize() const
175  {
176  return maxAData;
177  }
178  inline size_t getDDataSize() const
179  {
180  return maxDData;
181  }
182 
183  protected:
184 
185  std::string s_field = { "" };
186  std::string s_fvalue = { "" };
187  std::string prop_prefix = { "" };
188 
189  const std::shared_ptr<SMInterface> shm;
190  std::shared_ptr<DebugStream> unetlog;
191 
192  bool initItem( UniXML::iterator& it );
193  bool readItem( const std::shared_ptr<UniXML>& xml, UniXML::iterator& it, xmlNode* sec );
194 
195  void readConfiguration();
196 
197  bool createConnection( bool throwEx );
198 
199  private:
200  UNetSender();
201 
202  std::unique_ptr<UDPSocketU> udp;
203  std::string addr;
204  int port = { 0 };
205  std::string s_host = { "" };
206  Poco::Net::SocketAddress saddr;
207 
208  std::string myname = { "" };
209  timeout_t sendpause = { 150 };
210  timeout_t packsendpause = { 5 };
211  int packsendpauseFactor = { 1 };
212  timeout_t writeTimeout = { 1000 }; // msec
213  std::atomic_bool activated = { false };
214  PassiveTimer ptCheckConnection;
215 
216  typedef std::unordered_map<sendfactor_t, std::vector<PackMessage>> Packs;
217 
218  // mypacks заполняется в начале и дальше с ним происходит только чтение
219  // поэтому mutex-ом его не защищаем
220  Packs mypacks;
221  std::unordered_map<sendfactor_t, size_t> packs_anum;
222  std::unordered_map<sendfactor_t, size_t> packs_dnum;
223  UItemMap items;
224  size_t packetnum = { 1 };
225  uint16_t lastcrc = { 0 };
226  UniSetUDP::UDPPacket s_msg;
227 
228  size_t maxAData = { UniSetUDP::MaxACount };
229  size_t maxDData = { UniSetUDP::MaxDCount };
230 
231  std::unique_ptr< ThreadCreator<UNetSender> > s_thr; // send thread
232 
233  size_t ncycle = { 0 };
235  };
236  // --------------------------------------------------------------------------
237 } // end of namespace uniset
238 // -----------------------------------------------------------------------------
239 #endif // UNetSender_H_
240 // -----------------------------------------------------------------------------
Definition: CommonEventLoop.h:14
Definition: UNetSender.h:87
Definition: UNetSender.h:69
void askSensors(UniversalIO::UIOCommand cmd)
Definition: UNetSender.cc:558
void updateSensor(uniset::ObjectId id, long value)
Definition: UNetSender.cc:184
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UDPPacket.h:106
void updateFromSM()
Definition: UNetSender.cc:157
Definition: UNetSender.h:115
Definition: Mutex.h:31
void updateItem(UItem &it, long value)
Definition: UNetSender.cc:195
void initIterators()
Definition: UNetSender.cc:552
long ObjectId
Definition: UniSetTypes_i.idl:30