UniSet  2.7.0
LogSession.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -------------------------------------------------------------------------
17 #ifndef LogSession_H_
18 #define LogSession_H_
19 // -------------------------------------------------------------------------
20 #include <string>
21 #include <memory>
22 #include <queue>
23 #include <ev++.h>
24 #include "Poco/Net/StreamSocket.h"
25 #include "Mutex.h"
26 #include "DebugStream.h"
27 #include "UTCPCore.h"
28 #include "UTCPStream.h"
29 #include "LogAgregator.h"
30 #ifndef DISABLE_REST_API
31 #include <Poco/JSON/Object.h>
32 #endif
33 // -------------------------------------------------------------------------
34 namespace uniset
35 {
36 
38  class LogSession
39  {
40  public:
41 
42  LogSession( const Poco::Net::StreamSocket& s, std::shared_ptr<DebugStream>& log, timeout_t cmdTimeout = 2000, timeout_t checkConnectionTime = 10000 );
43  ~LogSession();
44 
45  typedef sigc::slot<void, LogSession*> FinalSlot;
46  void connectFinalSession( FinalSlot sl ) noexcept;
47 
48  // сигнал о приходе команды: std::string func( LogSession*, command, logname );
49  // \return какую-то информацию, которая будет послана client-у. Если return.empty(), то ничего послано не будет.
50  typedef sigc::signal<std::string, LogSession*, LogServerTypes::Command, const std::string& > LogSessionCommand_Signal;
51  LogSessionCommand_Signal signal_logsession_command();
52 
53  // прервать работу
54  void cancel() noexcept;
55 
56  std::string getClientAddress() const noexcept;
57 
58  void setSessionLogLevel( Debug::type t ) noexcept;
59  void addSessionLogLevel( Debug::type t ) noexcept;
60  void delSessionLogLevel( Debug::type t ) noexcept;
61 
63  void setMaxBufSize( size_t num );
64  size_t getMaxBufSize() const noexcept;
65 
66  // запуск обработки входящих запросов
67  void run( const ev::loop_ref& loop ) noexcept;
68  void terminate();
69 
70  bool isAcive() const noexcept;
71 
72  std::string name() const noexcept;
73 
74  std::string getShortInfo() noexcept;
75 
76 #ifndef DISABLE_REST_API
77  Poco::JSON::Object::Ptr httpGetShortInfo();
78 #endif
79 
80  protected:
81  // LogSession( ost::TCPSocket& server );
82 
83  void event( ev::async& watcher, int revents ) noexcept;
84  void callback( ev::io& watcher, int revents ) noexcept;
85  void readEvent( ev::io& watcher ) noexcept;
86  void writeEvent( ev::io& watcher );
87  size_t readData( unsigned char* buf, int len );
88  void cmdProcessing( const std::string& cmdLogName, const LogServerTypes::lsMessage& msg );
89  void onCmdTimeout( ev::timer& watcher, int revents ) noexcept;
90  void onCheckConnectionTimer( ev::timer& watcher, int revents ) noexcept;
91  void final() noexcept;
92 
93  void logOnEvent( const std::string& s ) noexcept;
94 
95  timeout_t cmdTimeout = { 2000 };
96  double checkConnectionTime = { 10. }; // время на проверку живости соединения..(сек)
97 
98  // Т.к. сообщений может быть ОЧЕНЬ МНОГО.. сеть медленная
99  // очередь будет не успевать рассасываться,
100  // то потенциально может "скушаться" вся память.
101  // Поэтому приходиться ограничить доступное количество записей.
102  // Рассчитываем, что средний размер одного сообщения 150 символов (байт)
103  // тогда выделяем буфер на 200 сообщений (~ 30кB)
104  // На самом деле сообщения могут быть совершенно разные..
105  size_t maxRecordsNum = { 30000 }; // максимальное количество сообщение в очереди
106 
107  private:
108  std::queue<UTCPCore::Buffer*> logbuf;
109  std::mutex logbuf_mutex;
110  bool lostMsg = { false };
111 
112  // статистика по использованию буфера
113  size_t maxCount = { 0 }; // максимальное количество побывавшее в очереди
114  size_t minSizeMsg = { 0 }; // минимальная встретившаяся длинна сообщения
115  size_t maxSizeMsg = { 0 }; // максимальная встретившаяся длинна сообщения
116  size_t numLostMsg = { 0 }; // количество потерянных сообщений
117 
118  std::string peername = { "" };
119  std::string caddr = { "" };
120  std::shared_ptr<DebugStream> log;
121  std::shared_ptr<LogAgregator> alog;
122  sigc::connection conn;
123 
124  std::shared_ptr<UTCPStream> sock;
125 
126  ev::io io;
127  ev::timer cmdTimer;
128  ev::async asyncEvent;
129  ev::timer checkConnectionTimer;
130 
131  FinalSlot slFin;
132  std::atomic_bool cancelled = { false };
133 
134  LogSessionCommand_Signal m_command_sig;
135 
136  DebugStream mylog;
137  };
138  // -------------------------------------------------------------------------
139 } // end of uniset namespace
140 // -------------------------------------------------------------------------
141 #endif // LogSession_H_
142 // -------------------------------------------------------------------------
Definition: DebugStream.h:91
Definition: CallbackTimer.h:29
void setMaxBufSize(size_t num)
Установить размер буфера для сообщений (количество записей. Не в байтах!!)
Definition: LogSession.cc:678
Definition: LogServerTypes.h:53
Definition: LogSession.h:38