UniSet  2.7.0
LogDB.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // --------------------------------------------------------------------------
20 // --------------------------------------------------------------------------
21 #ifndef LogDB_H_
22 #define LogDB_H_
23 // --------------------------------------------------------------------------
24 #include <queue>
25 #include <memory>
26 #include <mutex>
27 #include <condition_variable>
28 #include <chrono>
29 #include <ev++.h>
30 #include <sigc++/sigc++.h>
31 #include <Poco/JSON/Object.h>
32 #include <Poco/Net/WebSocket.h>
33 #include "UniSetTypes.h"
34 #include "LogAgregator.h"
35 #include "DebugStream.h"
36 #include "SQLiteInterface.h"
37 #include "EventLoopServer.h"
38 #include "UTCPStream.h"
39 #include "LogReader.h"
40 #include "UHttpRequestHandler.h"
41 #include "UHttpServer.h"
42 #include "UTCPCore.h"
43 // -------------------------------------------------------------------------
44 namespace uniset
45 {
46  //------------------------------------------------------------------------------------------
173  class LogDB:
174  public EventLoopServer
175 #ifndef DISABLE_REST_API
176  , public Poco::Net::HTTPRequestHandler
177 #endif
178  {
179  public:
180  LogDB( const std::string& name, int argc, const char* const* argv, const std::string& prefix );
181  virtual ~LogDB();
182 
184  static std::shared_ptr<LogDB> init_logdb( int argc, const char* const* argv, const std::string& prefix = "logdb-" );
185 
187  static void help_print();
188 
189  inline std::shared_ptr<DebugStream> log()
190  {
191  return dblog;
192  }
193 
194  void run( bool async );
195 #ifndef DISABLE_REST_API
196  virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
197  void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
198 #endif
199 
200  protected:
201 
202  class Log;
203  class LogWebSocket;
204 
205  virtual void evfinish() override;
206  virtual void evprepare() override;
207  void onCheckBuffer( ev::timer& t, int revents );
208  void onActivate( ev::async& watcher, int revents ) ;
209  void addLog( Log* log, const std::string& txt );
210 
211  size_t getCountOfRecords( const std::string& logname = "" );
212  size_t getFirstOfOldRecord( size_t maxnum );
213 
214 #ifndef DISABLE_REST_API
215  Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
216  Poco::JSON::Object::Ptr httpGetRequest( const std::string& cmd, const Poco::URI::QueryParameters& p );
217  Poco::JSON::Object::Ptr httpGetList( const Poco::URI::QueryParameters& p );
218  Poco::JSON::Object::Ptr httpGetLogs( const Poco::URI::QueryParameters& p );
219  Poco::JSON::Object::Ptr httpGetCount( const Poco::URI::QueryParameters& p );
220  void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
221  void httpWebSocketConnectPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp, const std::string& logname );
222 
223  // формирование условия where для строки XX[m|h|d|M]
224  // XX m - минут, h-часов, d-дней, M - месяцев
225  static std::string qLast( const std::string& p );
226 
227  // преобразование в дату 'YYYY-MM-DD' из строки 'YYYYMMDD' или 'YYYY/MM/DD'
228  static std::string qDate(const std::string& p , const char sep = '-');
229 
230  // экранирование кавычек (удваивание для sqlite)
231  static std::string qEscapeString( const std::string& s );
232 
233  std::shared_ptr<LogWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const std::string& logname );
234  void delWebSocket( std::shared_ptr<LogWebSocket>& ws );
235 #endif
236  std::string myname;
237  std::unique_ptr<SQLiteInterface> db;
238 
239  std::string tmsFormat = { "localtime" };
241  bool activate = { false };
242 
243  typedef std::queue<std::string> QueryBuffer;
244  QueryBuffer qbuf;
245  size_t qbufSize = { 1000 }; // размер буфера сообщений.
246 
247  ev::timer flushBufferTimer;
248  double tmFlushBuffer_sec = { 1.0 };
249  void flushBuffer();
250  void rotateDB();
251 
252  size_t maxdbRecords = { 200 * 1000 };
253  size_t numOverflow = { 0 }; // вычисляется из параметра "overflow factor"(float)
254 
255  ev::sig sigTERM;
256  ev::sig sigQUIT;
257  ev::sig sigINT;
258  void onTerminate( ev::sig& evsig , int revents );
259 
260  ev::async wsactivate; // активация LogWebSocket-ов
261 
262  class Log
263  {
264  public:
265  std::string name;
266  std::string ip;
267  int port = { 0 };
268  std::string cmd;
269  std::string peername;
270  std::string description;
271 
272  std::shared_ptr<DebugStream> dblog;
273 
274  bool isConnected() const;
275 
276  void set( ev::dynamic_loop& loop );
277  void check( ev::timer& t, int revents );
278  void event( ev::io& watcher, int revents );
279  void read( ev::io& watcher );
280  void write( ev::io& io );
281  void close();
282 
283  typedef sigc::signal<void, Log*, const std::string&> ReadSignal;
284  ReadSignal signal_on_read();
285 
286 
287  void setCheckConnectionTime( double sec );
288  void setReadBufSize( size_t sz );
289 
290  protected:
291  void ioprepare();
292  bool connect() noexcept;
293 
294  private:
295  ReadSignal sigRead;
296  ev::io io;
297  ev::timer iocheck;
298 
299  double checkConnection_sec = { 5.0 };
300 
301  std::shared_ptr<UTCPStream> tcp;
302  std::vector<char> buf; // буфер для чтения сообщений
303 
304  static const size_t reservsize = { 1000 };
305  std::string text;
306 
307  // буфер для посылаемых данных (write buffer)
308  std::queue<UTCPCore::Buffer*> wbuf;
309  };
310 
311  std::vector< std::shared_ptr<Log> > logservers;
312  std::shared_ptr<DebugStream> dblog;
313 
314 #ifndef DISABLE_REST_API
315  std::shared_ptr<Poco::Net::HTTPServer> httpserv;
316  std::string httpHost = { "" };
317  int httpPort = { 0 };
318  std::string httpCORS_allow = { "*" };
319  std::string httpReplyAddr = { "" };
320 
321  double wsHeartbeatTime_sec = { 3.0 };
322  double wsSendTime_sec = { 0.5 };
323  size_t wsMaxSend = { 200 };
324 
325  std::string fgColor = { "#c4c4c4" };
326  std::string bgColor = { "#111111" };
327  std::string bgColorTitle = { "green" };
328  std::string fgColorTitle = { "#ececec" };
329 
338  public Poco::Net::WebSocket
339  {
340  public:
341  LogWebSocket(Poco::Net::HTTPServerRequest* req,
342  Poco::Net::HTTPServerResponse* resp,
343  std::shared_ptr<Log>& log );
344 
345  virtual ~LogWebSocket();
346 
347  // конечно некрасиво что это в public
348  std::shared_ptr<DebugStream> dblog;
349 
350  bool isActive();
351  void set( ev::dynamic_loop& loop );
352 
353  void send( ev::timer& t, int revents );
354  void ping( ev::timer& t, int revents );
355 
356  void add( Log* log, const std::string& txt );
357 
358  void term();
359 
360  void waitCompletion();
361 
362  // настройка
363  void setHearbeatTime( const double& sec );
364  void setSendPeriod( const double& sec );
365  void setMaxSendCount( size_t val );
366 
367  protected:
368 
369  void write();
370 
371  ev::timer iosend;
372  double send_sec = { 0.5 };
373  size_t maxsend = { 200 };
374 
375  ev::timer ioping;
376  double ping_sec = { 3.0 };
377 
378  std::mutex finishmut;
379  std::condition_variable finish;
380 
381  std::atomic_bool cancelled = { false };
382 
383  sigc::connection con; // подписка на появление логов..
384 
385  Poco::Net::HTTPServerRequest* req;
386  Poco::Net::HTTPServerResponse* resp;
387 
388  // очередь данных на посылку..
389  std::queue<UTCPCore::Buffer*> wbuf;
390  size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
391  };
392 
394  {
395  public:
396 
397  LogWebSocketGuard( std::shared_ptr<LogWebSocket>& s, LogDB* l ):
398  ws(s), logdb(l) {}
399 
401  {
402  logdb->delWebSocket(ws);
403  }
404 
405 
406  private:
407  std::shared_ptr<LogWebSocket> ws;
408  LogDB* logdb;
409  };
410 
411  friend class LogWebSocketGuard;
412 
413  std::list<std::shared_ptr<LogWebSocket>> wsocks;
414  uniset::uniset_rwmutex wsocksMutex;
415  size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
416 
417 
419  public Poco::Net::HTTPRequestHandlerFactory
420  {
421  public:
422  LogDBRequestHandlerFactory( LogDB* l ): logdb(l) {}
423  virtual ~LogDBRequestHandlerFactory() {}
424 
425  virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
426 
427  private:
428  LogDB* logdb;
429  };
430 #endif
431 
432  private:
433  };
434  // ----------------------------------------------------------------------------------
435 } // end of namespace uniset
436 //------------------------------------------------------------------------------------------
437 #endif
std::string tmsFormat
Definition: LogDB.h:239
Definition: LogDB.h:262
Definition: CallbackTimer.h:29
Poco::JSON::Object::Ptr httpGetList(const Poco::URI::QueryParameters &p)
Definition: LogDB.cc:953
static void help_print()
Definition: LogDB.cc:444
void ioprepare()
Definition: LogDB.cc:618
Definition: LogDB.h:393
static std::shared_ptr< LogDB > init_logdb(int argc, const char *const *argv, const std::string &prefix="logdb-")
Definition: LogDB.cc:431
Definition: LogDB.h:173
Definition: Mutex.h:31
The EventLoopServer class Реализация общей части всех процессов использующих libev. Содержит свой (динамический) eventloop;.
Definition: EventLoopServer.h:17
Definition: LogDB.h:337