FairMQ  1.2.0
C++ Message Passing Framework
FairMQStateMachine.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQSTATEMACHINE_H_
16 #define FAIRMQSTATEMACHINE_H_
17 
18 #define FAIRMQ_INTERFACE_VERSION 3
19 
20 #include <string>
21 #include <atomic>
22 #include <mutex>
23 #include <condition_variable>
24 #include <thread>
25 #include <chrono>
26 #include <functional>
27 #include <unordered_map>
28 
29 // Increase maximum number of boost::msm states (default is 10)
30 // This #define has to be before any msm header includes
31 #define FUSION_MAX_VECTOR_SIZE 20
32 
33 #include <boost/mpl/for_each.hpp>
34 #include <boost/msm/back/state_machine.hpp>
35 #include <boost/msm/back/tools.hpp>
36 #include <boost/msm/back/metafunctions.hpp>
37 #include <boost/msm/front/state_machine_def.hpp>
38 #include <boost/msm/front/functor_row.hpp>
39 
40 #include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
41 
42 #include "FairMQLogger.h"
43 
44 namespace msmf = boost::msm::front;
45 
46 namespace fair
47 {
48 namespace mq
49 {
50 namespace fsm
51 {
52 
53 // defining events for the boost MSM state machine
54 struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } };
55 struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } };
56 struct INIT_TASK { std::string name() const { return "INIT_TASK"; } };
57 struct internal_READY { std::string name() const { return "internal_READY"; } };
58 struct RUN { std::string name() const { return "RUN"; } };
59 struct PAUSE { std::string name() const { return "PAUSE"; } };
60 struct STOP { std::string name() const { return "STOP"; } };
61 struct RESET_TASK { std::string name() const { return "RESET_TASK"; } };
62 struct RESET_DEVICE { std::string name() const { return "RESET_DEVICE"; } };
63 struct internal_IDLE { std::string name() const { return "internal_IDLE"; } };
64 struct END { std::string name() const { return "END"; } };
65 struct ERROR_FOUND { std::string name() const { return "ERROR_FOUND"; } };
66 
67 // deactivate the warning for non-virtual destructor thrown in the boost library
68 #if defined(__clang__)
69 _Pragma("clang diagnostic push")
70 _Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"")
71 #elif defined(__GNUC__) || defined(__GNUG__)
72 _Pragma("GCC diagnostic push")
73 _Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"")
74 #endif
75 
76 // defining the boost MSM state machine
77 struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
78 {
79  public:
80  FairMQFSM()
81  : fState()
82  , fChangeStateMutex()
83  , fWork()
84  , fWorkAvailableCondition()
85  , fWorkDoneCondition()
86  , fWorkMutex()
87  , fWorkerTerminated(false)
88  , fWorkActive(false)
89  , fWorkAvailable(false)
90  , fStateChangeSignal()
91  , fStateChangeSignalsMap()
92  , fTerminationRequested(false)
93  , fWorkerThread()
94  {}
95 
96  virtual ~FairMQFSM()
97  {}
98 
99  template<typename Event, typename FSM>
100  void on_entry(Event const&, FSM& fsm)
101  {
102  LOG(state) << "Starting FairMQ state machine";
103  fState = IDLE;
104  fsm.CallStateChangeCallbacks(IDLE);
105 
106  // start a worker thread to execute user states in.
107  fsm.fWorkerThread = std::thread(&FairMQFSM::Worker, &fsm);
108  }
109 
110  template<typename Event, typename FSM>
111  void on_exit(Event const&, FSM& /*fsm*/)
112  {
113  LOG(state) << "Exiting FairMQ state machine";
114  }
115 
116  // list of FSM states
117  struct OK_FSM : public msmf::state<> {};
118  struct ERROR_FSM : public msmf::terminate_state<> {};
119 
120  struct IDLE_FSM : public msmf::state<> {};
121  struct INITIALIZING_DEVICE_FSM : public msmf::state<> {};
122  struct DEVICE_READY_FSM : public msmf::state<> {};
123  struct INITIALIZING_TASK_FSM : public msmf::state<> {};
124  struct READY_FSM : public msmf::state<> {};
125  struct RUNNING_FSM : public msmf::state<> {};
126  struct PAUSED_FSM : public msmf::state<> {};
127  struct RESETTING_TASK_FSM : public msmf::state<> {};
128  struct RESETTING_DEVICE_FSM : public msmf::state<> {};
129  struct EXITING_FSM : public msmf::state<> {};
130 
131  // initial states
132  using initial_state = boost::mpl::vector<IDLE_FSM, OK_FSM>;
133 
134  // actions
135  struct IdleFct
136  {
137  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
138  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
139  {
140  LOG(state) << "Entering IDLE state";
141  fsm.fState = IDLE;
142  }
143  };
144 
146  {
147  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
148  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
149  {
150  fsm.fState = INITIALIZING_DEVICE;
151 
152  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
153  while (fsm.fWorkActive)
154  {
155  fsm.fWorkDoneCondition.wait(lock);
156  }
157  fsm.fWorkAvailable = true;
158  LOG(state) << "Entering INITIALIZING DEVICE state";
159  fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm);
160  fsm.fWorkAvailableCondition.notify_one();
161  }
162  };
163 
165  {
166  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
167  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
168  {
169  LOG(state) << "Entering DEVICE READY state";
170  fsm.fState = DEVICE_READY;
171  }
172  };
173 
174  struct InitTaskFct
175  {
176  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
177  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
178  {
179  fsm.fState = INITIALIZING_TASK;
180 
181  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
182  while (fsm.fWorkActive)
183  {
184  fsm.fWorkDoneCondition.wait(lock);
185  }
186  fsm.fWorkAvailable = true;
187  LOG(state) << "Entering INITIALIZING TASK state";
188  fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm);
189  fsm.fWorkAvailableCondition.notify_one();
190  }
191  };
192 
193  struct ReadyFct
194  {
195  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
196  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
197  {
198  LOG(state) << "Entering READY state";
199  fsm.fState = READY;
200  }
201  };
202 
203  struct RunFct
204  {
205  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
206  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
207  {
208  fsm.fState = RUNNING;
209 
210  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
211  while (fsm.fWorkActive)
212  {
213  fsm.fWorkDoneCondition.wait(lock);
214  }
215  fsm.fWorkAvailable = true;
216  LOG(state) << "Entering RUNNING state";
217  fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
218  fsm.fWorkAvailableCondition.notify_one();
219  }
220  };
221 
222  struct PauseFct
223  {
224  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
225  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
226  {
227  fsm.fState = PAUSED;
228 
229  fsm.Unblock();
230  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
231  while (fsm.fWorkActive)
232  {
233  fsm.fWorkDoneCondition.wait(lock);
234  }
235  fsm.fWorkAvailable = true;
236  LOG(state) << "Entering PAUSED state";
237  fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm);
238  fsm.fWorkAvailableCondition.notify_one();
239  }
240  };
241 
242  struct ResumeFct
243  {
244  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
245  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
246  {
247  fsm.fState = RUNNING;
248 
249  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
250  while (fsm.fWorkActive)
251  {
252  fsm.fWorkDoneCondition.wait(lock);
253  }
254  fsm.fWorkAvailable = true;
255  LOG(state) << "Entering RUNNING state";
256  fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
257  fsm.fWorkAvailableCondition.notify_one();
258  }
259  };
260 
261  struct StopFct
262  {
263  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
264  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
265  {
266  fsm.fState = READY;
267 
268  fsm.Unblock();
269  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
270  while (fsm.fWorkActive)
271  {
272  fsm.fWorkDoneCondition.wait(lock);
273  }
274  LOG(state) << "Entering READY state";
275  }
276  };
277 
279  {
280  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
281  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
282  {
283  fsm.fState = READY;
284  fsm.Unblock();
285  LOG(state) << "RUNNING state finished without an external event, entering READY state";
286  }
287  };
288 
290  {
291  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
292  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
293  {
294  fsm.fState = RESETTING_TASK;
295 
296  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
297  while (fsm.fWorkActive)
298  {
299  fsm.fWorkDoneCondition.wait(lock);
300  }
301  fsm.fWorkAvailable = true;
302  LOG(state) << "Entering RESETTING TASK state";
303  fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm);
304  fsm.fWorkAvailableCondition.notify_one();
305  }
306  };
307 
309  {
310  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
311  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
312  {
313  fsm.fState = RESETTING_DEVICE;
314 
315  std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
316  while (fsm.fWorkActive)
317  {
318  fsm.fWorkDoneCondition.wait(lock);
319  }
320  fsm.fWorkAvailable = true;
321  LOG(state) << "Entering RESETTING DEVICE state";
322  fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm);
323  fsm.fWorkAvailableCondition.notify_one();
324  }
325  };
326 
327  struct ExitingFct
328  {
329  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
330  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
331  {
332  LOG(state) << "Entering EXITING state";
333  fsm.fState = EXITING;
334  fsm.fTerminationRequested = true;
335  fsm.CallStateChangeCallbacks(EXITING);
336 
337  // terminate worker thread
338  {
339  std::lock_guard<std::mutex> lock(fsm.fWorkMutex);
340  fsm.fWorkerTerminated = true;
341  fsm.fWorkAvailableCondition.notify_one();
342  }
343 
344  // join the worker thread (executing user states)
345  if (fsm.fWorkerThread.joinable())
346  {
347  fsm.fWorkerThread.join();
348  }
349 
350  fsm.Exit();
351  }
352  };
353 
355  {
356  template<typename EVT, typename FSM, typename SourceState, typename TargetState>
357  void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
358  {
359  LOG(state) << "Entering ERROR state";
360  fsm.fState = Error;
361  fsm.CallStateChangeCallbacks(Error);
362  }
363  };
364 
365  // Transition table for FairMQFSM
366  struct transition_table : boost::mpl::vector<
367  // Start Event Next Action Guard
368  msmf::Row<IDLE_FSM, INIT_DEVICE, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>,
369  msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>,
370  msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
371  msmf::Row<DEVICE_READY_FSM, INIT_TASK, INITIALIZING_TASK_FSM, InitTaskFct, msmf::none>,
372  msmf::Row<DEVICE_READY_FSM, RESET_DEVICE, RESETTING_DEVICE_FSM, ResetDeviceFct, msmf::none>,
373  msmf::Row<INITIALIZING_TASK_FSM, internal_READY, READY_FSM, ReadyFct, msmf::none>,
374  msmf::Row<READY_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
375  msmf::Row<READY_FSM, RESET_TASK, RESETTING_TASK_FSM, ResetTaskFct, msmf::none>,
376  msmf::Row<RUNNING_FSM, PAUSE, PAUSED_FSM, PauseFct, msmf::none>,
377  msmf::Row<RUNNING_FSM, STOP, READY_FSM, StopFct, msmf::none>,
378  msmf::Row<RUNNING_FSM, internal_READY, READY_FSM, InternalStopFct, msmf::none>,
379  msmf::Row<PAUSED_FSM, RUN, RUNNING_FSM, ResumeFct, msmf::none>,
380  msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
381  msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>,
382  msmf::Row<OK_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>>
383  {};
384 
385  // replaces the default no-transition response.
386  template<typename FSM, typename Event>
387  void no_transition(Event const& e, FSM&, int state)
388  {
389  using recursive_stt = typename boost::msm::back::recursive_get_transition_table<FSM>::type;
390  using all_states = typename boost::msm::back::generate_state_set<recursive_stt>::type;
391 
392  std::string stateName;
393 
394  boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
395 
396  stateName = stateName.substr(24);
397  std::size_t pos = stateName.find("_FSME");
398  stateName.erase(pos);
399 
400  if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
401  {
402  stateName = stateName.substr(1);
403  }
404 
405  if (stateName != "OK")
406  {
407  LOG(state) << "No transition from state " << stateName << " on event " << e.name();
408  }
409 
410  // LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
411  }
412 
413  // backward compatibility to FairMQStateMachine
414  enum State
415  {
416  OK,
417  Error,
418  IDLE,
419  INITIALIZING_DEVICE,
420  DEVICE_READY,
421  INITIALIZING_TASK,
422  READY,
423  RUNNING,
424  PAUSED,
425  RESETTING_TASK,
426  RESETTING_DEVICE,
427  EXITING
428  };
429 
430  static std::string GetStateName(const int state)
431  {
432  switch(state)
433  {
434  case OK:
435  return "OK";
436  case Error:
437  return "Error";
438  case IDLE:
439  return "IDLE";
440  case INITIALIZING_DEVICE:
441  return "INITIALIZING_DEVICE";
442  case DEVICE_READY:
443  return "DEVICE_READY";
444  case INITIALIZING_TASK:
445  return "INITIALIZING_TASK";
446  case READY:
447  return "READY";
448  case RUNNING:
449  return "RUNNING";
450  case PAUSED:
451  return "PAUSED";
452  case RESETTING_TASK:
453  return "RESETTING_TASK";
454  case RESETTING_DEVICE:
455  return "RESETTING_DEVICE";
456  case EXITING:
457  return "EXITING";
458  default:
459  return "requested name for non-existent state...";
460  }
461  }
462 
463  std::string GetCurrentStateName() const
464  {
465  return GetStateName(fState);
466  }
467  int GetCurrentState() const
468  {
469  return fState;
470  }
471  bool CheckCurrentState(int state) const
472  {
473  return state == fState;
474  }
475  bool CheckCurrentState(std::string state) const
476  {
477  return state == GetCurrentStateName();
478  }
479 
480  // actions to be overwritten by derived classes
481  virtual void InitWrapper() {}
482  virtual void InitTaskWrapper() {}
483  virtual void RunWrapper() {}
484  virtual void PauseWrapper() {}
485  virtual void ResetWrapper() {}
486  virtual void ResetTaskWrapper() {}
487  virtual void Exit() {}
488  virtual void Unblock() {}
489 
490  bool Terminated()
491  {
492  return fTerminationRequested;
493  }
494 
495  protected:
496  std::atomic<State> fState;
497  std::mutex fChangeStateMutex;
498 
499  // function to execute user states in a worker thread
500  std::function<void(void)> fWork;
501  std::condition_variable fWorkAvailableCondition;
502  std::condition_variable fWorkDoneCondition;
503  std::mutex fWorkMutex;
504  bool fWorkerTerminated;
505  bool fWorkActive;
506  bool fWorkAvailable;
507 
508  boost::signals2::signal<void(const State)> fStateChangeSignal;
509  std::unordered_map<std::string, boost::signals2::connection> fStateChangeSignalsMap;
510  std::atomic<bool> fTerminationRequested;
511 
512  void CallStateChangeCallbacks(const State state) const
513  {
514  if (!fStateChangeSignal.empty())
515  {
516  fStateChangeSignal(state);
517  }
518  }
519 
520  private:
521  void Worker()
522  {
523  while (true)
524  {
525  {
526  std::unique_lock<std::mutex> lock(fWorkMutex);
527  // Wait for work to be done.
528  while (!fWorkAvailable && !fWorkerTerminated)
529  {
530  fWorkAvailableCondition.wait(lock);
531  }
532 
533  if (fWorkerTerminated)
534  {
535  break;
536  }
537 
538  fWorkActive = true;
539  }
540 
541  fWork();
542 
543  {
544  std::lock_guard<std::mutex> lock(fWorkMutex);
545  fWorkActive = false;
546  fWorkAvailable = false;
547  fWorkDoneCondition.notify_one();
548  }
549  CallStateChangeCallbacks(fState);
550  }
551  }
552 
553  // run state handlers in a separate thread
554  std::thread fWorkerThread;
555 };
556 
557 // reactivate the warning for non-virtual destructor
558 #if defined(__clang__)
559 _Pragma("clang diagnostic pop")
560 #elif defined(__GNUC__) || defined(__GNUG__)
561 _Pragma("GCC diagnostic pop")
562 #endif
563 
564 } // namespace fsm
565 } // namespace mq
566 } // namespace fair
567 
568 class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm::FairMQFSM>
569 {
570  public:
571  enum Event
572  {
573  INIT_DEVICE,
574  internal_DEVICE_READY,
575  INIT_TASK,
576  internal_READY,
577  RUN,
578  PAUSE,
579  STOP,
580  RESET_TASK,
581  RESET_DEVICE,
582  internal_IDLE,
583  END,
584  ERROR_FOUND
585  };
586 
588  virtual ~FairMQStateMachine();
589 
590  int GetInterfaceVersion() const;
591 
592  bool ChangeState(int event);
593  bool ChangeState(const std::string& event);
594 
595  void WaitForEndOfState(int event);
596  void WaitForEndOfState(const std::string& event);
597 
598  bool WaitForEndOfStateForMs(int event, int durationInMs);
599  bool WaitForEndOfStateForMs(const std::string& event, int durationInMs);
600 
601  void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
602  void UnsubscribeFromStateChange(const std::string& key);
603 
604  private:
605  int GetEventNumber(const std::string& event);
606 };
607 
608 #endif /* FAIRMQSTATEMACHINE_H_ */
Definition: FairMQStateMachine.h:145
Definition: FairMQStateMachine.h:61
Definition: FairMQStateMachine.h:126
Definition: FairMQStateMachine.h:58
Definition: FairMQStateMachine.h:308
Definition: FairMQStateMachine.h:63
Definition: FairMQStateMachine.h:120
Definition: FairMQStateMachine.h:56
Definition: FairMQStateMachine.h:261
Definition: FairMQStateMachine.h:54
Definition: EventManager.h:33
Definition: FairMQStateMachine.h:129
Definition: FairMQStateMachine.h:59
Definition: FairMQStateMachine.h:65
Definition: FairMQStateMachine.h:123
Definition: FairMQStateMachine.h:327
Definition: FairMQStateMachine.h:242
Definition: FairMQStateMachine.h:121
Definition: FairMQStateMachine.h:164
Definition: FairMQStateMachine.h:135
Definition: FairMQStateMachine.h:354
Definition: FairMQStateMachine.h:193
Definition: FairMQStateMachine.h:278
Definition: FairMQStateMachine.h:62
Definition: FairMQStateMachine.h:127
Definition: FairMQStateMachine.h:366
Definition: FairMQStateMachine.h:122
Definition: FairMQStateMachine.h:174
Definition: FairMQStateMachine.h:568
Definition: FairMQStateMachine.h:118
Definition: FairMQStateMachine.h:55
Definition: FairMQStateMachine.h:57
Definition: FairMQStateMachine.h:289
Definition: FairMQStateMachine.h:222
Definition: FairMQStateMachine.h:64
Definition: FairMQStateMachine.h:124
Definition: DeviceRunner.h:23
Definition: FairMQStateMachine.h:203
Definition: FairMQStateMachine.h:125
Definition: FairMQStateMachine.h:60
Definition: FairMQStateMachine.h:128
Definition: FairMQStateMachine.h:117
Definition: FairMQStateMachine.h:77