UniSet  2.12.1
UWebSocketGate.h
1 /*
2  * Copyright (c) 2017 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // --------------------------------------------------------------------------
20 // --------------------------------------------------------------------------
21 #ifndef UWebSocketGate_H_
22 #define UWebSocketGate_H_
23 // --------------------------------------------------------------------------
24 #include <queue>
25 #include <memory>
26 #include <mutex>
27 #include <condition_variable>
28 #include <chrono>
29 #include <ev++.h>
30 #include <sigc++/sigc++.h>
31 #include <Poco/JSON/Object.h>
32 #include <Poco/Net/WebSocket.h>
33 #include "UniSetTypes.h"
34 #include "LogAgregator.h"
35 #include "UniSetObject.h"
36 #include "DebugStream.h"
37 #include "SharedMemory.h"
38 #include "SMInterface.h"
39 #include "EventLoopServer.h"
40 #include "UTCPStream.h"
41 #include "UHttpRequestHandler.h"
42 #include "UHttpServer.h"
43 #include "UTCPCore.h"
44 // -------------------------------------------------------------------------
45 namespace uniset
46 {
47  //------------------------------------------------------------------------------------------
187  public UniSetObject,
188  public EventLoopServer
189 #ifndef DISABLE_REST_API
190  , public Poco::Net::HTTPRequestHandler
191 #endif
192  {
193  public:
194  UWebSocketGate( uniset::ObjectId id, xmlNode* cnode
195  , uniset::ObjectId shmID
196  , const std::shared_ptr<SharedMemory>& ic = nullptr
197  , const std::string& prefix = "-ws" );
198 
199  virtual ~UWebSocketGate();
200 
202  static std::shared_ptr<UWebSocketGate> init_wsgate( int argc, const char* const* argv
203  , uniset::ObjectId shmID
204  , const std::shared_ptr<SharedMemory>& ic = nullptr
205  , const std::string& prefix = "ws-" );
206 
208  static void help_print();
209 
210  inline std::shared_ptr<DebugStream> log()
211  {
212  return mylog;
213  }
214 
215 #ifndef DISABLE_REST_API
216  virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
217  void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
218 #endif
219 
220  protected:
221 
222  class UWebSocket;
223 
224  virtual bool activateObject() override;
225  void run( bool async );
226  virtual void evfinish() override;
227  virtual void evprepare() override;
228  void onCheckBuffer( ev::timer& t, int revents );
229  void onActivate( ev::async& watcher, int revents ) ;
230  void onCommand( ev::async& watcher, int revents );
231 
232 #ifndef DISABLE_REST_API
233  void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
234  void httpWebSocketConnectPage(std::ostream& out, Poco::Net::HTTPServerRequest& req,
235  Poco::Net::HTTPServerResponse& resp, const std::string& params );
236 
237  std::shared_ptr<UWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp );
238  void delWebSocket( std::shared_ptr<UWebSocket>& ws );
239 
240  Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
241  void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp );
242 #endif
243  ev::sig sigTERM;
244  ev::sig sigQUIT;
245  ev::sig sigINT;
246  void onTerminate( ev::sig& evsig, int revents );
247 
248  ev::async wsactivate; // активация WebSocket-ов
249  std::shared_ptr<ev::async> wscmd;
250 
251  void checkMessages( ev::timer& t, int revents );
252  virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
253  virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
254  ev::timer iocheck;
255  double check_sec = { 0.05 };
256  int maxMessagesProcessing = { 100 };
257 
258  std::shared_ptr<DebugStream> mylog;
259  std::shared_ptr<SMInterface> shm;
260 
261 #ifndef DISABLE_REST_API
262  std::shared_ptr<Poco::Net::HTTPServer> httpserv;
263  std::string httpHost = { "" };
264  int httpPort = { 0 };
265  std::string httpCORS_allow = { "*" };
266 
267  double wsHeartbeatTime_sec = { 3.0 };
268  double wsSendTime_sec = { 0.5 };
269  size_t wsMaxSend = { 5000 };
270  size_t wsMaxCmd = { 200 };
271 
272  static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::string& err );
273  static Poco::JSON::Object::Ptr error_to_json( const std::string& err );
274 
282  class UWebSocket:
283  public Poco::Net::WebSocket
284  {
285  public:
286  UWebSocket( Poco::Net::HTTPServerRequest* req,
287  Poco::Net::HTTPServerResponse* resp);
288 
289  virtual ~UWebSocket();
290 
291  std::string getInfo() const noexcept;
292 
293  bool isActive();
294  void set( ev::dynamic_loop& loop, std::shared_ptr<ev::async> a );
295 
296  void send( ev::timer& t, int revents );
297  void ping( ev::timer& t, int revents );
298  void read( ev::io& io, int revents );
299 
300  struct sinfo
301  {
302  std::string err; // ошибка при работе с датчиком (например при заказе)
304  std::string cmd = "";
305  long value = { 0 }; // set value
306  };
307 
308 
309  void ask( uniset::ObjectId id );
310  void del( uniset::ObjectId id );
311  void get( uniset::ObjectId id );
312  void set( uniset::ObjectId id, long value );
313  void sensorInfo( const uniset::SensorMessage* sm );
314  void doCommand( const std::shared_ptr<SMInterface>& ui );
315  static Poco::JSON::Object::Ptr to_short_json( sinfo* si );
316 
317  void term();
318 
319  void waitCompletion();
320 
321  // настройка
322  void setHearbeatTime( const double& sec );
323  void setSendPeriod( const double& sec );
324  void setMaxSendCount( size_t val );
325  void setMaxCmdCount( size_t val );
326 
327  std::shared_ptr<DebugStream> mylog;
328 
329  protected:
330 
331  void write();
332  void sendResponse( sinfo& si );
333  void sendShortResponse( sinfo& si );
334  void onCommand( const std::string& cmd );
335  void sendError( const std::string& message );
336 
337  ev::timer iosend;
338  double send_sec = { 0.5 };
339  size_t maxsend = { 5000 };
340  size_t maxcmd = { 200 };
341  const int Kbuf = { 10 }; // коэффициент для буфера сообщений (maxsend умножается на Kbuf)
342 
343  ev::timer ioping;
344  double ping_sec = { 3.0 };
345  static const std::string ping_str;
346 
347  ev::io iorecv;
348  char rbuf[32 * 1024];
349  timeout_t recvTimeout = { 200 }; // msec
350  std::shared_ptr<ev::async> cmdsignal;
351 
352  std::mutex finishmut;
353  std::condition_variable finish;
354 
355  std::atomic_bool cancelled = { false };
356 
357  std::unordered_map<uniset::ObjectId, sinfo> smap;
358  std::queue<sinfo> qcmd; // очередь команд
359 
360  Poco::Net::HTTPServerRequest* req;
361  Poco::Net::HTTPServerResponse* resp;
362 
363  // очередь json-на отправку
364  std::queue<Poco::JSON::Object::Ptr> jbuf;
365 
366  // очередь данных на посылку..
367  std::queue<uniset::UTCPCore::Buffer*> wbuf;
368  size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
369  };
370 
372  {
373  public:
374 
375  UWebSocketGuard( std::shared_ptr<UWebSocket>& s, UWebSocketGate* g ):
376  ws(s), wsgate(g) {}
377 
378  ~UWebSocketGuard()
379  {
380  wsgate->delWebSocket(ws);
381  }
382 
383 
384  private:
385  std::shared_ptr<UWebSocket> ws;
386  UWebSocketGate* wsgate;
387  };
388 
389  friend class UWebSocketGuard;
390 
391  std::list<std::shared_ptr<UWebSocket>> wsocks;
392  uniset::uniset_rwmutex wsocksMutex;
393  size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
394 
395 
397  public Poco::Net::HTTPRequestHandlerFactory
398  {
399  public:
402 
403  virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
404 
405  private:
406  UWebSocketGate* wsgate;
407  };
408 #endif
409 
410  private:
411 
412  };
413  // ----------------------------------------------------------------------------------
414 } // end of namespace uniset
415 //------------------------------------------------------------------------------------------
416 #endif
Definition: CommonEventLoop.h:14
timeout_t recvTimeout
Definition: UWebSocketGate.h:349
Definition: UniSetObject.h:73
std::shared_ptr< UInterface > ui
Definition: UniSetObject.h:136
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UWebSocketGate.h:186
Definition: MessageType.h:126
virtual bool activateObject() override
Активизация объекта (переопределяется для необходимых действий после активизации)
Definition: UWebSocketGate.cc:566
Definition: UWebSocketGate.h:300
Definition: Mutex.h:31
Definition: UWebSocketGate.h:371
The EventLoopServer class Реализация общей части всех процессов использующих libev....
Definition: EventLoopServer.h:17
static void help_print()
Definition: UWebSocketGate.cc:281
Definition: UniSetTypes_i.idl:64
Definition: UWebSocketGate.h:282
long ObjectId
Definition: UniSetTypes_i.idl:30
static std::shared_ptr< UWebSocketGate > init_wsgate(int argc, const char *const *argv, uniset::ObjectId shmID, const std::shared_ptr< SharedMemory > &ic=nullptr, const std::string &prefix="ws-")
Definition: UWebSocketGate.cc:265