UniSet  2.7.0
UNetReceiver.h
1 /*
2  * Copyright (c) 2015 Pavel Vainerman.
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as
6  * published by the Free Software Foundation, version 2.1.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11  * Lesser General Lesser Public License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  */
16 // -----------------------------------------------------------------------------
17 #ifndef UNetReceiver_H_
18 #define UNetReceiver_H_
19 // -----------------------------------------------------------------------------
20 #include <ostream>
21 #include <memory>
22 #include <string>
23 #include <queue>
24 #include <unordered_map>
25 #include <sigc++/sigc++.h>
26 #include <ev++.h>
27 #include "UniSetObject.h"
28 #include "Trigger.h"
29 #include "Mutex.h"
30 #include "SMInterface.h"
31 #include "SharedMemory.h"
32 #include "UDPPacket.h"
33 #include "CommonEventLoop.h"
34 #include "UDPCore.h"
35 // --------------------------------------------------------------------------
36 namespace uniset
37 {
38  // -----------------------------------------------------------------------------
39  /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40  * ===============
41  * Собственно реализация сделана так:
42  * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
43  * что были посланы, сделана очередь с приоритетом. В качестве приориета используется номер пакета
44  * (чем меньше тем старше). При этом обработка ведётся только тех пакетов, которые идут "подряд",
45  * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
46  * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается дальше..
47  * Всё это реализовано в функции UNetReceiver::real_update()
48  *
49  * КЭШ
50  * ===
51  * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
52  * Идея проста: сделан вектор размером с количество принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
53  * Порядковый номер данных в пакете является индексом в кэше.
54  * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
55  * ID который пришёл в пакете - элемент кэша обновляется.
56  * Если количество пришедших данных не совпадают с размером кэша.. кэш обновляется.
57  *
58  * КЭШ (ДОПОЛНЕНИЕ)
59  * ===
60  * Т.к. в общем случае, данные могут быть разбиты не несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
61  * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
62  * Кэш в map добавляется когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим используется для этого пакета.
63  * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и расчитан на статичность пакетов,
64  * т.е. на то что UNetSender не будет с течением времени менять структуру отправляемых пакетов.
65  *
66  * Обработка сбоя или переполнения счётчика пакетов(перехода через максимум)
67  * =========================================================================
68  * Для защиты от сбоя счётика сделана следующая логика:
69  * Если номер очередного пришедшего пакета отличается от последнего обработанного на maxDifferens, то считается,
70  * что произошёл сбой счётчика и происходит ожидание пока функция update, не обработает основную очередь полностью.
71  * При этом принимаемые пакеты складываются во временную очередь qtmp. Как только основная очередь пустеет,
72  * в неё копируется всё накопленное во временной очереди..и опять идёт штатная обработка.
73  * Если во время "ожидания" опять происходит "разрыв" в номерах пакетов, то временная очередь чиститься
74  * и данные которые в ней были теряются! Аналог ограниченного буфера (у любых карт), когда новые данные
75  * затирают старые, если их не успели вынуть и обработать.
76  * \todo Сделать защиту от бесконечного ожидания "очистки" основной очереди.
77  * =========================================================================
78  * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся.. не обрабатываем..
79  *
80  * Создание соединения (открытие сокета)
81  * ======================================
82  * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
83  * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
84  * открыть сокет.. и так бесконечно, пока не получиться. Это важно для систем, где в момент загрузки программы
85  * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
86  * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть).
87  * Если такая логика не требуется, то можно задать в конструкторе
88  * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
89  * выкинуто исключение при неудачной попытке создания соединения.
90  *
91  * Стратегия обновления данных в SM
92  * ==================================
93  * При помощи функции setUpdateStrategy() можно выбрать стратегию обновления данных в SM.
94  * Поддерживается два варианта:
95  * 'thread' - отдельный поток обновления
96  * 'evloop' - использование общего с приёмом event loop (libev)
97  */
98  // -----------------------------------------------------------------------------
99  class UNetReceiver:
100  protected EvWatcher,
101  public std::enable_shared_from_this<UNetReceiver>
102  {
103  public:
104  UNetReceiver( const std::string& host, int port, const std::shared_ptr<SMInterface>& smi, bool nocheckConnection = false );
105  virtual ~UNetReceiver();
106 
107  void start();
108  void stop();
109 
110  inline const std::string getName() const
111  {
112  return myname;
113  }
114 
115  // блокировать сохранение данных в SM
116  void setLockUpdate( bool st ) noexcept;
117  bool isLockUpdate() const noexcept;
118 
119  void resetTimeout() noexcept;
120 
121  bool isInitOK() const noexcept;
122  bool isRecvOK() const noexcept;
123  size_t getLostPacketsNum() const noexcept;
124 
125  void setReceiveTimeout( timeout_t msec ) noexcept;
126  void setReceivePause( timeout_t msec ) noexcept;
127  void setUpdatePause( timeout_t msec ) noexcept;
128  void setLostTimeout( timeout_t msec ) noexcept;
129  void setPrepareTime( timeout_t msec ) noexcept;
130  void setCheckConnectionPause( timeout_t msec ) noexcept;
131  void setMaxDifferens( unsigned long set ) noexcept;
132  void setEvrunTimeout(timeout_t msec ) noexcept;
133  void setInitPause( timeout_t msec ) noexcept;
134 
135  void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
136  void setLostPacketsID( uniset::ObjectId id ) noexcept;
137 
138  void setMaxProcessingCount( int set ) noexcept;
139 
140  void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
141 
142  inline std::string getAddress() const noexcept
143  {
144  return addr;
145  }
146  inline int getPort() const noexcept
147  {
148  return port;
149  }
150 
152  enum Event
153  {
156  };
157 
158  typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
159  void connectEvent( EventSlot sl ) noexcept;
160 
161  // --------------------------------------------------------------------
164  {
165  useUpdateUnknown,
168  };
169 
170  static UpdateStrategy strToUpdateStrategy( const std::string& s ) noexcept;
171  static std::string to_string( UpdateStrategy s) noexcept;
172 
174  void setUpdateStrategy( UpdateStrategy set );
175 
176  // специальная обёртка, захватывающая или нет mutex в зависимости от стратегии
177  // (т.к. при evloop mutex захватытвать не нужно)
179  {
180  public:
181  pack_guard( std::mutex& m, UpdateStrategy s );
182  ~pack_guard();
183 
184  protected:
185  std::mutex& m;
186  UpdateStrategy s;
187  };
188 
189  // --------------------------------------------------------------------
190 
191  inline std::shared_ptr<DebugStream> getLog()
192  {
193  return unetlog;
194  }
195 
196  virtual const std::string getShortInfo() const noexcept;
197 
198  protected:
199 
200  const std::shared_ptr<SMInterface> shm;
201  std::shared_ptr<DebugStream> unetlog;
202 
203  bool receive() noexcept;
204  void step() noexcept;
205  void update() noexcept;
206  void updateThread() noexcept;
207  void callback( ev::io& watcher, int revents ) noexcept;
208  void readEvent( ev::io& watcher ) noexcept;
209  void updateEvent( ev::periodic& watcher, int revents ) noexcept;
210  void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
211  void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
212  void initEvent( ev::timer& watcher, int revents ) noexcept;
213  virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
214  virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
215  virtual std::string wname() const noexcept override
216  {
217  return myname;
218  }
219 
220  void initIterators() noexcept;
221  bool createConnection( bool throwEx = false );
222  void checkConnection();
223 
224  public:
225 
226  // функция определения приоритетного сообщения для обработки
228  public std::binary_function<UniSetUDP::UDPMessage, UniSetUDP::UDPMessage, bool>
229  {
230  inline bool operator()(const UniSetUDP::UDPMessage& lhs,
231  const UniSetUDP::UDPMessage& rhs) const
232  {
233  return lhs.num > rhs.num;
234  }
235  };
236 
237  typedef std::priority_queue<UniSetUDP::UDPMessage, std::vector<UniSetUDP::UDPMessage>, PacketCompare> PacketQueue;
238 
239  private:
240  UNetReceiver();
241 
242  timeout_t recvpause = { 10 };
243  timeout_t updatepause = { 100 };
245  std::unique_ptr<UDPReceiveU> udp;
246  std::string addr;
247  int port = { 0 };
248  Poco::Net::SocketAddress saddr;
249  std::string myname;
250  ev::io evReceive;
251  ev::periodic evCheckConnection;
252  ev::periodic evStatistic;
253  ev::periodic evUpdate;
254  ev::timer evInitPause;
255 
256  UpdateStrategy upStrategy = { useUpdateEventLoop };
257 
258  // счётчики для подсчёта статистики
259  size_t recvCount = { 0 };
260  size_t upCount = { 0 };
261 
262  // текущая статистик
263  size_t statRecvPerSec = { 0 };
264  size_t statUpPerSec = { 0 };
266  std::unique_ptr< ThreadCreator<UNetReceiver> > upThread; // update thread
267 
268  // делаем loop общим.. одним на всех!
269  static CommonEventLoop loop;
270 
271  double checkConnectionTime = { 10.0 }; // sec
272  std::mutex checkConnMutex;
273 
274  PassiveTimer ptRecvTimeout;
275  PassiveTimer ptPrepare;
276  timeout_t recvTimeout = { 5000 }; // msec
277  timeout_t prepareTime = { 2000 };
278  timeout_t evrunTimeout = { 15000 };
279  timeout_t lostTimeout = { 200 };
280 
281  double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
282  std::atomic_bool initOK = { false };
283 
284  PassiveTimer ptLostTimeout;
285  size_t lostPackets = { 0 };
287  uniset::ObjectId sidRespond = { uniset::DefaultObjectId };
288  IOController::IOStateList::iterator itRespond;
289  bool respondInvert = { false };
290  uniset::ObjectId sidLostPackets;
291  IOController::IOStateList::iterator itLostPackets;
292 
293  std::atomic_bool activated = { false };
294 
295  PacketQueue qpack;
296  UniSetUDP::UDPMessage pack;
297  UniSetUDP::UDPPacket r_buf;
298  std::mutex packMutex;
299  size_t pnum = { 0 };
304  size_t maxDifferens = { 20 };
305 
306  PacketQueue qtmp;
307  bool waitClean = { false };
308  size_t rnum = { 0 };
310  size_t maxProcessingCount = { 100 };
312  std::atomic_bool lockUpdate = { false };
314  EventSlot slEvent;
315  Trigger trTimeout;
316  std::mutex tmMutex;
317 
318  struct CacheItem
319  {
320  long id = { uniset::DefaultObjectId };
321  IOController::IOStateList::iterator ioit;
322 
323  CacheItem():
325  };
326 
327  typedef std::vector<CacheItem> CacheVec;
328  struct CacheInfo
329  {
330  CacheInfo():
331  cache_init_ok(false) {}
332 
333  bool cache_init_ok = { false };
334  CacheVec cache;
335  };
336 
337  // ключом является UDPMessage::getDataID()
338  typedef std::unordered_map<long, CacheInfo> CacheMap;
339  CacheMap d_icache_map;
340  CacheMap a_icache_map;
342  bool d_cache_init_ok = { false };
343  bool a_cache_init_ok = { false };
344 
345  void initDCache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
346  void initACache( UniSetUDP::UDPMessage& pack, bool force = false ) noexcept;
347  };
348  // --------------------------------------------------------------------------
349 } // end of namespace uniset
350 // -----------------------------------------------------------------------------
351 #endif // UNetReceiver_H_
352 // -----------------------------------------------------------------------------
Пассивный таймер
Definition: PassiveTimer.h:92
The CommonEventLoop class Реализация механизма "один eventloop, много подписчиков" (libev)...
Definition: CommonEventLoop.h:54
Definition: CallbackTimer.h:29
Definition: UNetReceiver.h:166
Definition: CommonEventLoop.h:18
Definition: UNetReceiver.h:155
const ObjectId DefaultObjectId
Definition: UniSetTypes.h:69
Definition: UDPPacket.h:93
Definition: UDPPacket.h:83
Definition: Trigger.h:29
Definition: UNetReceiver.h:227
Event
Definition: UNetReceiver.h:152
void setUpdateStrategy(UpdateStrategy set)
функция должна вызываться до первого вызова start()
Definition: UNetReceiver.cc:962
Definition: UNetReceiver.h:178
Definition: UNetReceiver.h:154
Definition: UNetReceiver.h:99
UpdateStrategy
Definition: UNetReceiver.h:163
Definition: UNetReceiver.h:167
long ObjectId
Definition: UniSetTypes_i.idl:30