9 #ifndef FAIRMQDEVICE_H_ 10 #define FAIRMQDEVICE_H_ 12 #include <StateMachine.h> 13 #include <FairMQTransportFactory.h> 14 #include <fairmq/Transports.h> 15 #include <fairmq/StateQueue.h> 17 #include <FairMQChannel.h> 18 #include <FairMQMessage.h> 19 #include <FairMQParts.h> 20 #include <FairMQUnmanagedRegion.h> 21 #include <FairMQLogger.h> 22 #include <fairmq/ProgOptions.h> 30 #include <unordered_map> 38 #include <fairmq/tools/Version.h> 40 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
42 using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
43 using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
62 internal_DEVICE_READY,
113 virtual void LogSocketRates();
115 template<
typename Serializer,
typename DataType,
typename... Args>
116 void Serialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const 118 Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
121 template<
typename Deserializer,
typename DataType,
typename... Args>
122 void Deserialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const 124 Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
133 int Send(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
135 return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
144 int Receive(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
146 return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
155 int64_t
Send(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
157 return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
166 int64_t
Receive(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
168 return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
174 return fTransportFactory.get();
178 template<
typename... Args>
179 FairMQMessagePtr NewMessage(Args&&... args)
181 return Transport()->CreateMessage(std::forward<Args>(args)...);
185 template<
typename... Args>
186 FairMQMessagePtr NewMessageFor(
const std::string& channel,
int index, Args&&... args)
188 return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
193 FairMQMessagePtr NewStaticMessage(
const T& data)
195 return Transport()->NewStaticMessage(data);
200 FairMQMessagePtr NewStaticMessageFor(
const std::string& channel,
int index,
const T& data)
202 return GetChannel(channel, index).NewStaticMessage(data);
207 FairMQMessagePtr NewSimpleMessage(
const T& data)
209 return Transport()->NewSimpleMessage(data);
214 FairMQMessagePtr NewSimpleMessageFor(
const std::string& channel,
int index,
const T& data)
216 return GetChannel(channel, index).NewSimpleMessage(data);
220 FairMQUnmanagedRegionPtr NewUnmanagedRegion(
const size_t size,
221 FairMQRegionCallback callback =
nullptr,
222 const std::string& path =
"",
225 return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
229 FairMQUnmanagedRegionPtr NewUnmanagedRegion(
const size_t size,
230 const int64_t userFlags,
231 FairMQRegionCallback callback =
nullptr,
232 const std::string& path =
"",
235 return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
239 FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(
const std::string& channel,
242 FairMQRegionCallback callback =
nullptr,
243 const std::string& path =
"",
246 return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags);
250 FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(
const std::string& channel,
253 const int64_t userFlags,
254 FairMQRegionCallback callback =
nullptr,
255 const std::string& path =
"",
258 return GetChannel(channel, index).NewUnmanagedRegion(size, userFlags, callback, path, flags);
261 template<
typename ...Ts>
262 FairMQPollerPtr NewPoller(
const Ts&... inputs)
264 std::vector<std::string> chans{inputs...};
267 if (chans.size() > 1)
269 fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType();
271 for (
unsigned int i = 1; i < chans.size(); ++i)
273 if (type != GetChannel(chans.at(i), 0).Transport()->GetType())
275 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
276 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
281 return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
284 FairMQPollerPtr NewPoller(
const std::vector<FairMQChannel*>& channels)
287 if (channels.size() > 1)
289 fair::mq::Transport type = channels.at(0)->Transport()->GetType();
291 for (
unsigned int i = 1; i < channels.size(); ++i)
293 if (type != channels.at(i)->Transport()->GetType())
295 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
296 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
301 return channels.at(0)->Transport()->CreatePoller(channels);
306 std::shared_ptr<FairMQTransportFactory> AddTransport(
const fair::mq::Transport transport);
318 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(FairMQMessagePtr& msg,
int index))
320 fDataCallbacks =
true;
321 fMsgInputs.insert(std::make_pair(channelName, [
this, memberFunction](FairMQMessagePtr& msg,
int index)
323 return (static_cast<T*>(
this)->*memberFunction)(msg, index);
326 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
328 fInputChannelKeys.push_back(channelName);
332 void OnData(
const std::string& channelName, InputMsgCallback callback)
334 fDataCallbacks =
true;
335 fMsgInputs.insert(make_pair(channelName, callback));
337 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
339 fInputChannelKeys.push_back(channelName);
345 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(
FairMQParts& parts,
int index))
347 fDataCallbacks =
true;
348 fMultipartInputs.insert(std::make_pair(channelName, [
this, memberFunction](
FairMQParts& parts,
int index)
350 return (static_cast<T*>(
this)->*memberFunction)(parts, index);
353 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
355 fInputChannelKeys.push_back(channelName);
359 void OnData(
const std::string& channelName, InputMultipartCallback callback)
361 fDataCallbacks =
true;
362 fMultipartInputs.insert(make_pair(channelName, callback));
364 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
366 fInputChannelKeys.push_back(channelName);
370 FairMQChannel& GetChannel(
const std::string& channelName,
const int index = 0)
372 return fChannels.at(channelName).at(index);
373 }
catch (
const std::out_of_range& oor) {
374 LOG(error) <<
"requested channel has not been configured? check channel names/configuration.";
375 LOG(error) <<
"channel: " << channelName <<
", index: " << index;
376 LOG(error) <<
"out of range: " << oor.what();
380 virtual void RegisterChannelEndpoints() {}
382 bool RegisterChannelEndpoint(
const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
384 bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
386 LOG(warn) <<
"Registering channel: name already registered: \"" << channelName <<
"\"";
391 void PrintRegisteredChannels()
393 if (fChannelRegistry.size() < 1) {
394 std::cout <<
"no channels registered." << std::endl;
396 for (
const auto& c : fChannelRegistry) {
397 std::cout << c.first <<
":" << c.second.first <<
":" << c.second.second << std::endl;
402 void SetId(
const std::string&
id) { fId = id; }
403 std::string GetId() {
return fId; }
407 void SetNumIoThreads(
int numIoThreads) { fConfig->SetProperty(
"io-threads", numIoThreads);}
408 int GetNumIoThreads()
const {
return fConfig->GetProperty<
int>(
"io-threads", DefaultIOThreads); }
410 void SetNetworkInterface(
const std::string& networkInterface) { fConfig->SetProperty(
"network-interface", networkInterface); }
411 std::string GetNetworkInterface()
const {
return fConfig->GetProperty<std::string>(
"network-interface", DefaultNetworkInterface); }
413 void SetDefaultTransport(
const std::string& name) { fConfig->SetProperty(
"transport", name); }
414 std::string GetDefaultTransport()
const {
return fConfig->GetProperty<std::string>(
"transport", DefaultTransportName); }
416 void SetInitTimeoutInS(
int initTimeoutInS) { fConfig->SetProperty(
"init-timeout", initTimeoutInS); }
417 int GetInitTimeoutInS()
const {
return fConfig->GetProperty<
int>(
"init-timeout", DefaultInitTimeout); }
421 void SetTransport(
const std::string& transport) { fConfig->SetProperty(
"transport", transport); }
423 std::string
GetTransportName()
const {
return fConfig->GetProperty<std::string>(
"transport", DefaultTransportName); }
425 void SetRawCmdLineArgs(
const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
426 std::vector<std::string> GetRawCmdLineArgs()
const {
return fRawCmdLineArgs; }
428 void RunStateMachine()
430 fStateMachine.ProcessWork();
436 template<
typename Rep,
typename Period>
437 bool WaitFor(std::chrono::duration<Rep, Period>
const& duration)
439 return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
444 std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>>
fTransports;
447 std::unordered_map<std::string, std::vector<FairMQChannel>>
fChannels;
451 void AddChannel(
const std::string& name,
FairMQChannel&& channel)
462 virtual void Bind() {}
464 virtual void Connect() {}
481 virtual void Pause() __attribute__((deprecated(
"PAUSE state is removed. This method is never called. To pause Run, go to READY with STOP transition and back to RUNNING with RUN to resume."))) {}
484 virtual void ResetTask() {}
487 virtual void Reset() {}
490 bool ChangeState(
const fair::mq::Transition transition) {
return fStateMachine.ChangeState(transition); }
491 bool ChangeState(
const std::string& transition) {
return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
493 bool ChangeState(
const int transition) __attribute__((deprecated(
"Use ChangeState(const fair::mq::Transition transition).")));
495 void WaitForEndOfState(
const fair::mq::Transition transition) __attribute__((deprecated(
"Use WaitForState(fair::mq::State expectedState).")));
496 void WaitForEndOfState(
const std::string& transition) __attribute__((deprecated(
"Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); }
498 fair::mq::State WaitForNextState() {
return fStateQueue.WaitForNext(); }
499 void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
500 void WaitForState(
const std::string& state) { WaitForState(fair::mq::GetState(state)); }
502 void TransitionTo(
const fair::mq::State state);
504 void SubscribeToStateChange(
const std::string& key, std::function<
void(
const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
505 void UnsubscribeFromStateChange(
const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
507 void SubscribeToNewTransition(
const std::string& key, std::function<
void(
const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
508 void UnsubscribeFromNewTransition(
const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
510 bool CheckCurrentState(
const int )
const __attribute__((deprecated(
"Use NewStatePending()."))) {
return !fStateMachine.NewStatePending(); }
511 bool CheckCurrentState(
const std::string& )
const __attribute__((deprecated(
"Use NewStatePending()."))) {
return !fStateMachine.NewStatePending(); }
514 bool NewStatePending()
const {
return fStateMachine.NewStatePending(); }
516 fair::mq::State GetCurrentState()
const {
return fStateMachine.GetCurrentState(); }
517 std::string GetCurrentStateName()
const {
return fStateMachine.GetCurrentStateName(); }
519 static std::string GetStateName(
const fair::mq::State state) {
return fair::mq::GetStateName(state); }
520 static std::string GetTransitionName(
const fair::mq::Transition transition) {
return fair::mq::GetTransitionName(transition); }
522 static constexpr
const char* DefaultId =
"";
523 static constexpr
int DefaultIOThreads = 1;
524 static constexpr
const char* DefaultTransportName =
"zeromq";
525 static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ;
526 static constexpr
const char* DefaultNetworkInterface =
"default";
527 static constexpr
int DefaultInitTimeout = 120;
528 static constexpr uint64_t DefaultMaxRunTime = 0;
529 static constexpr
float DefaultRate = 0.;
530 static constexpr
const char* DefaultSession =
"default";
533 fair::mq::Transport fDefaultTransportType;
541 void ConnectWrapper();
543 void InitTaskWrapper();
547 void ResetTaskWrapper();
552 void UnblockTransports();
558 void AttachChannels(std::vector<FairMQChannel*>& chans);
561 void HandleSingleChannelInput();
562 void HandleMultipleChannelInput();
563 void HandleMultipleTransportInput();
564 void PollForTransport(
const FairMQTransportFactory* factory,
const std::vector<std::string>& channelKeys);
566 bool HandleMsgInput(
const std::string& chName,
const InputMsgCallback& callback,
int i);
567 bool HandleMultipartInput(
const std::string& chName,
const InputMultipartCallback& callback,
int i);
569 std::vector<FairMQChannel*> fUninitializedBindingChannels;
570 std::vector<FairMQChannel*> fUninitializedConnectingChannels;
573 std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
574 std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
575 std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
576 std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
577 std::vector<std::string> fInputChannelKeys;
578 std::mutex fMultitransportMutex;
579 std::atomic<bool> fMultitransportProceed;
583 uint64_t fMaxRunRuntimeInS;
584 int fInitializationTimeoutInS;
585 std::vector<std::string> fRawCmdLineArgs;
589 std::mutex fTransitionMtx;
std::string GetTransportName() const
Gets the default transport name.
Definition: FairMQDevice.h:423
virtual bool ConditionalRun()
Called during RUNNING state repeatedly until it returns false or device state changes.
Definition: FairMQDevice.h:476
std::unordered_map< fair::mq::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
Container for transports.
Definition: FairMQDevice.h:444
Definition: StateQueue.h:25
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQDevice.h:467
Definition: FairMQTransportFactory.h:30
void SetTransport(const std::string &transport)
Definition: FairMQDevice.h:421
int Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:133
bool WaitFor(std::chrono::duration< Rep, Period > const &duration)
Definition: FairMQDevice.h:437
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:134
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:449
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQDevice.h:470
Definition: FairMQChannel.h:30
int64_t Send(FairMQParts &parts, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:155
Definition: ProgOptions.h:36
virtual void Init()
Additional user initialization (can be overloaded in child classes). Prefer to use InitTask()...
Definition: FairMQDevice.h:460
std::string fId
Device ID.
Definition: FairMQDevice.h:457
int Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:144
Definition: FairMQDevice.h:49
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:447
Definition: StateMachine.h:26
std::unique_ptr< fair::mq::ProgOptions > fInternalConfig
Internal program options configuration.
Definition: FairMQDevice.h:448
void AddChannel(const std::string &name, const FairMQChannel &channel)
Takes the provided channel and creates properties based on it.
Definition: ProgOptions.cxx:351
int64_t Receive(FairMQParts &parts, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:166
fair::mq::ProgOptions * GetConfig() const
Get pointer to the config.
Definition: FairMQDevice.h:311
virtual void PreRun()
Called in the RUNNING state once before executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:473
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:443
Definition: FairMQDevice.h:53
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
virtual void PostRun()
Called in the RUNNING state once after executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.h:479
Definition: FairMQMessage.h:20
auto Transport() const -> FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:172