diff --git a/CMakeLists.txt b/CMakeLists.txt index 5771a058..d8a3a988 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,14 +39,16 @@ fairmq_build_option(BUILD_NANOMSG_TRANSPORT "Build nanomsg transport." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") +fairmq_build_option(BUILD_SDK_COMMANDS "Build the FairMQ SDK commands." + DEFAULT OFF) fairmq_build_option(BUILD_DDS_PLUGIN "Build DDS plugin." - DEFAULT OFF REQUIRES "BUILD_FAIRMQ") + DEFAULT OFF REQUIRES "BUILD_FAIRMQ;BUILD_SDK_COMMANDS") fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin." DEFAULT OFF REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples." DEFAULT ON REQUIRES "BUILD_FAIRMQ") fairmq_build_option(BUILD_SDK "Build the FairMQ controller SDK." - DEFAULT OFF REQUIRES "BUILD_DDS_PLUGIN") + DEFAULT OFF REQUIRES "BUILD_DDS_PLUGIN;BUILD_SDK_COMMANDS") fairmq_build_option(BUILD_DOCS "Build FairMQ documentation." DEFAULT OFF) fairmq_build_option(FAST_BUILD "Fast production build. Not recommended for development." @@ -89,6 +91,10 @@ else() set(required_dds_version 2.4) endif() +if(BUILD_SDK_COMMANDS) + find_package2(PRIVATE Flatbuffers REQUIRED) +endif() + if(BUILD_DDS_PLUGIN OR BUILD_SDK) find_package2(PRIVATE DDS REQUIRED VERSION ${required_dds_version} @@ -230,6 +236,9 @@ endif() if(BUILD_SDK) list(APPEND PROJECT_PACKAGE_COMPONENTS sdk) endif() +if(BUILD_SDK_COMMANDS) + list(APPEND PROJECT_PACKAGE_COMPONENTS sdk_commands) +endif() ################################################################################ @@ -343,6 +352,9 @@ if(PROJECT_PACKAGE_DEPENDENCIES) elseif(${dep} STREQUAL fmt) get_target_property(fmt_include fmt::fmt INTERFACE_INCLUDE_DIRECTORIES) get_filename_component(prefix ${fmt_include}/.. ABSOLUTE) + elseif(${dep} STREQUAL Flatbuffers) + get_target_property(flatbuffers_include flatbuffers::flatbuffers INTERFACE_INCLUDE_DIRECTORIES) + get_filename_component(prefix ${flatbuffers_include}/.. ABSOLUTE) else() get_filename_component(prefix ${${dep}_INCLUDE_DIR}/.. ABSOLUTE) endif() @@ -410,6 +422,12 @@ else() set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (required C++14) (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})") endif() message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}") +if(BUILD_SDK_COMMANDS) + set(sdk_commands_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_SDK_COMMANDS=OFF${CR})") +else() + set(sdk_commands_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_SDK_COMMANDS=ON${CR})") +endif() +message(STATUS " ${BWhite}sdk_commands${CR} ${sdk_commands_summary}") message(STATUS " ") if(RUN_STATIC_ANALYSIS) list(LENGTH PROJECT_STATIC_ANALYSERS size) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 7e4d7066..c300e0c4 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -460,6 +460,10 @@ if(BUILD_FAIRMQ) endforeach() endif() +if(BUILD_SDK_COMMANDS) + add_subdirectory(sdk/commands) +endif() + if(BUILD_SDK) add_subdirectory(sdk) endif() diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 8cb7c212..096dc200 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -33,7 +33,7 @@ class FairMQParts FairMQParts& operator=(const FairMQParts&) = delete; /// Constructor from argument pack of std::unique_ptr rvalues template - FairMQParts(Ts&&... messages) : fParts() {AddPart(std::forward(messages)...);} + FairMQParts(Ts&&... messages) : fParts() { AddPart(std::forward(messages)...); } /// Default destructor ~FairMQParts() {}; @@ -63,10 +63,10 @@ class FairMQParts /// Add content of another object by move void AddPart(FairMQParts&& other) { - container parts = std::move(other.fParts); - for (auto& part : parts) { - fParts.push_back(std::move(part)); - } + container parts = std::move(other.fParts); + for (auto& part : parts) { + fParts.push_back(std::move(part)); + } } /// Get reference to part in the container at index (without bounds check) diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index 878d3208..e47d2856 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -17,7 +17,6 @@ #include #include -#include #include "FairMQPoller.h" #include "FairMQChannel.h" diff --git a/fairmq/plugins/DDS/CMakeLists.txt b/fairmq/plugins/DDS/CMakeLists.txt index 1fb41022..2b48d259 100644 --- a/fairmq/plugins/DDS/CMakeLists.txt +++ b/fairmq/plugins/DDS/CMakeLists.txt @@ -8,7 +8,7 @@ set(plugin FairMQPlugin_dds) add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/DDS.cxx ${CMAKE_CURRENT_SOURCE_DIR}/DDS.h) -target_link_libraries(${plugin} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib Boost::boost) +target_link_libraries(${plugin} PUBLIC FairMQ StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib Boost::boost PRIVATE Commands) target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden) set_target_properties(${plugin} PROPERTIES @@ -19,7 +19,7 @@ set_target_properties(${plugin} PROPERTIES set(exe fairmq-dds-command-ui) add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx) -target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib) +target_link_libraries(${exe} FairMQ Commands StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib) target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) install(TARGETS ${plugin} ${exe} diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 74f3e605..78db5741 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -8,6 +8,8 @@ #include "DDS.h" +#include + #include #include @@ -62,13 +64,13 @@ DDS::DDS(const string& name, fHeartbeatThread = thread(&DDS::HeartbeatSender, this); - std::string deviceId(GetProperty("id")); + string deviceId(GetProperty("id")); if (deviceId.empty()) { - SetProperty("id", dds::env_prop()); + SetProperty("id", dds::env_prop()); } - std::string sessionId(GetProperty("session")); + string sessionId(GetProperty("session")); if (sessionId == "default") { - SetProperty("session", dds::env_prop()); + SetProperty("session", dds::env_prop()); } auto control = GetProperty("control"); @@ -101,7 +103,7 @@ DDS::DDS(const string& name, break; case DeviceState::ResettingDevice: { { - std::lock_guard lk(fUpdateMutex); + lock_guard lk(fUpdateMutex); fUpdatesAllowed = false; } @@ -122,9 +124,12 @@ DDS::DDS(const string& name, string id = GetProperty("id"); fLastState = fCurrentState; fCurrentState = newState; + using namespace sdk::cmd; for (auto subscriberId : fStateChangeSubscribers) { LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; - fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop()) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); + + Cmds cmds(make(id, dds::env_prop(), fLastState, fCurrentState)); + fDDS.Send(cmds.Serialize(), to_string(subscriberId)); } }); @@ -250,7 +255,7 @@ auto DDS::FillChannelContainers() -> void fIofN.insert(make_pair(chanName, IofN(i, n))); } { - std::lock_guard lk(fUpdateMutex); + lock_guard lk(fUpdateMutex); fUpdatesAllowed = true; } fUpdateCondition.notify_one(); @@ -276,7 +281,7 @@ auto DDS::SubscribeForConnectingChannels() -> void boost::asio::post(fWorkerQueue, [=]() { try { { - std::unique_lock lk(fUpdateMutex); + unique_lock lk(fUpdateMutex); fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); } string val = value; @@ -341,6 +346,7 @@ auto DDS::PublishBoundChannels() -> void auto DDS::HeartbeatSender() -> void { + using namespace sdk::cmd; string id = GetProperty("id"); while (!fDeviceTerminationRequested) { @@ -348,7 +354,7 @@ auto DDS::HeartbeatSender() -> void lock_guard lock{fHeartbeatSubscriberMutex}; for (const auto subscriberId : fHeartbeatSubscribers) { - fDDS.Send("heartbeat: " + id , to_string(subscriberId)); + fDDS.Send(Cmds(make(id)).Serialize(), to_string(subscriberId)); } } @@ -358,86 +364,100 @@ auto DDS::HeartbeatSender() -> void auto DDS::SubscribeForCustomCommands() -> void { + using namespace sdk::cmd; LOG(debug) << "Subscribing for DDS custom commands."; string id = GetProperty("id"); - fDDS.SubscribeCustomCmd([id, this](const string& cmd, const string& cond, uint64_t senderId) { - LOG(info) << "Received command: '" << cmd << "' from " << senderId; + fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) { + // LOG(info) << "Received command: '" << cmdStr << "' from " << senderId; - if (cmd == "check-state") { - fDDS.Send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); - } else if (fTransitions.find(cmd) != fTransitions.end()) { - if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); - } else { - fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); - } - if (cmd == "END" && ToStr(GetCurrentDeviceState()) == "EXITING") { - unique_lock lock(fStopMutex); - fStopCondition.notify_one(); - } - { - lock_guard lock{fStateChangeSubscriberMutex}; - fLastExternalController = senderId; - } - } else if (cmd == "dump-config") { - stringstream ss; - for (const auto pKey: GetPropertyKeys()) { - ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl; - } - fDDS.Send(ss.str(), to_string(senderId)); - } else if (cmd == "subscribe-to-heartbeats") { - { - // auto size = fHeartbeatSubscribers.size(); - lock_guard lock{fHeartbeatSubscriberMutex}; - fHeartbeatSubscribers.insert(senderId); - } - fDDS.Send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); - } else if (cmd == "unsubscribe-from-heartbeats") { - { - lock_guard lock{fHeartbeatSubscriberMutex}; - fHeartbeatSubscribers.erase(senderId); - } - fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); - } else if (cmd == "state-change-exiting-received") { - { - lock_guard lock{fStateChangeSubscriberMutex}; - if (fLastExternalController == senderId) { - fExitingAckedByLastExternalController = true; + Cmds inCmds; + inCmds.Deserialize(cmdStr); + + for (const auto& cmd : inCmds) { + switch (cmd->GetType()) { + case Type::check_state: { + fDDS.Send(Cmds(make(id, GetCurrentDeviceState())).Serialize(), to_string(senderId)); } - } - fExitingAcked.notify_one(); - } else if (cmd == "subscribe-to-state-changes") { - { - // auto size = fStateChangeSubscribers.size(); - lock_guard lock{fStateChangeSubscriberMutex}; - fStateChangeSubscribers.insert(senderId); - if (!fControllerThread.joinable()) { - fControllerThread = thread(&DDS::WaitForExitingAck, this); + break; + case Type::change_state: { + Transition transition = static_cast(*cmd).GetTransition(); + if (ChangeDeviceState(transition)) { + Cmds outCmds(make(id, Result::Ok, transition)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } else { + Cmds outCmds(make(id, Result::Failure, transition)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } } + break; + case Type::dump_config: { + stringstream ss; + for (const auto pKey: GetPropertyKeys()) { + ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl; + } + Cmds outCmds(make(id, ss.str())); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + break; + case Type::subscribe_to_heartbeats: { + { + lock_guard lock{fHeartbeatSubscriberMutex}; + fHeartbeatSubscribers.insert(senderId); + } + Cmds outCmds(make(id, Result::Ok)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + break; + case Type::unsubscribe_from_heartbeats: { + { + lock_guard lock{fHeartbeatSubscriberMutex}; + fHeartbeatSubscribers.erase(senderId); + } + Cmds outCmds(make(id, Result::Ok)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + break; + case Type::state_change_exiting_received: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + if (fLastExternalController == senderId) { + fExitingAckedByLastExternalController = true; + } + } + fExitingAcked.notify_one(); + } + break; + case Type::subscribe_to_state_change: { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.insert(senderId); + if (!fControllerThread.joinable()) { + fControllerThread = thread(&DDS::WaitForExitingAck, this); + } + + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; + + Cmds outCmds(make(id, Result::Ok), make(id, dds::env_prop(), fLastState, fCurrentState)); + + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + break; + case Type::unsubscribe_from_state_change: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.erase(senderId); + } + Cmds outCmds(make(id, Result::Ok)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + break; + default: + LOG(warn) << "Unexpected/unknown command received: " << cmdStr; + LOG(warn) << "Origin: " << senderId; + LOG(warn) << "Destination: " << cond; + break; } - fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId)); - { - lock_guard lock{fStateChangeSubscriberMutex}; - LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; - // fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); - fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop()) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); - } - } else if (cmd == "unsubscribe-from-state-changes") { - { - lock_guard lock{fStateChangeSubscriberMutex}; - fStateChangeSubscribers.erase(senderId); - } - fDDS.Send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); - } else if (cmd == "SHUTDOWN") { - TransitionDeviceStateTo(DeviceState::Exiting); - } else if (cmd == "STARTUP") { - TransitionDeviceStateTo(DeviceState::Running); - } else { - LOG(warn) << "Unknown command: " << cmd; - LOG(warn) << "Origin: " << senderId; - LOG(warn) << "Destination: " << cond; } }); } diff --git a/fairmq/sdk/CMakeLists.txt b/fairmq/sdk/CMakeLists.txt index 883366c2..5489d5b7 100644 --- a/fairmq/sdk/CMakeLists.txt +++ b/fairmq/sdk/CMakeLists.txt @@ -61,6 +61,7 @@ target_link_libraries(${target} Threads::Threads Tools StateMachine + Commands PRIVATE DDS::dds_intercom_lib diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index d2847942..4e3bc9c4 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -9,25 +9,27 @@ #ifndef FAIR_MQ_SDK_TOPOLOGY_H #define FAIR_MQ_SDK_TOPOLOGY_H -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include +#include #include #include #include #include #include #include +#include +#include + +#include + +#include +#include +#include +#include + +#include +#include #include #include #include @@ -161,44 +163,58 @@ class BasicTopology : public AsioBase throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")"); } + using namespace fair::mq::sdk::cmd; fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { - // LOG(debug) << "Received from " << senderId << ": " << msg; - std::vector parts; - boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); - - for (unsigned int i = 0; i < parts.size(); ++i) { - boost::trim(parts.at(i)); - } - - if (parts[0] == "state-change") { - DDSTask::Id taskId(std::stoull(parts[2])); - fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); - if (parts[3] == "IDLE->EXITING") { - fDDSSession.SendCommand("state-change-exiting-received", senderId); - } - UpdateStateEntry(taskId, parts[3]); - } else if (parts[0] == "state-changes-subscription") { - LOG(debug) << "Received from " << senderId << ": " << msg; - if (parts[2] != "OK") { - LOG(error) << "state-changes-subscription failed with return code: " << parts[2]; - } - } else if (parts[0] == "state-changes-unsubscription") { - LOG(debug) << "Received from " << senderId << ": " << msg; - if (parts[2] != "OK") { - LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; - } - } else if (parts[1] == "could not queue") { - std::lock_guard lk(fMtx); - if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) { - fChangeStateOpTimer.cancel(); - fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData); + Cmds inCmds; + inCmds.Deserialize(msg); + // LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: "; + for (const auto& cmd : inCmds) { + // LOG(debug) << " > " << cmd->GetType(); + switch (cmd->GetType()) { + case Type::state_change: { + DDSTask::Id taskId(static_cast(*cmd).GetTaskId()); + fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); + if (static_cast(*cmd).GetCurrentState() == DeviceState::Exiting) { + Cmds outCmds; + outCmds.Add(); + fDDSSession.SendCommand(outCmds.Serialize(), senderId); + } + UpdateStateEntry(taskId, static_cast(*cmd).GetCurrentState()); + } + break; + case Type::state_change_subscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + LOG(error) << "State change subscription failed for " << static_cast(*cmd).GetDeviceId(); + } + break; + case Type::state_change_unsubscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + LOG(error) << "State change unsubscription failed for " << static_cast(*cmd).GetDeviceId(); + } + break; + case Type::transition_status: { + if (static_cast(*cmd).GetResult() != Result::Ok) { + LOG(error) << "Transition failed for " << static_cast(*cmd).GetDeviceId(); + std::lock_guard lk(fMtx); + if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) { + fChangeStateOpTimer.cancel(); + fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData); + } + } + } + break; + default: + LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType(); + LOG(warn) << "Origin: " << senderId; + break; } } }); fDDSSession.StartDDSService(); - LOG(debug) << "subscribe-to-state-changes"; - fDDSSession.SendCommand("subscribe-to-state-changes"); + LOG(debug) << "Subscribing to state change"; + Cmds cmds(make()); + fDDSSession.SendCommand(cmds.Serialize()); } /// not copyable @@ -318,7 +334,8 @@ class BasicTopology : public AsioBase std::move(handler)); fChangeStateTarget = expectedState.at(transition); ResetTransitionedCount(fChangeStateTarget); - fDDSSession.SendCommand(GetTransitionName(transition)); + cmd::Cmds cmds(cmd::make(transition)); + fDDSSession.SendCommand(cmds.Serialize()); if (timeout > std::chrono::milliseconds(0)) { fChangeStateOpTimer.expires_after(timeout); fChangeStateOpTimer.async_wait([&](std::error_code ec) { @@ -330,10 +347,8 @@ class BasicTopology : public AsioBase } } else { // TODO refactor to hide boiler plate - auto ex2(asio::get_associated_executor( - handler, AsioBase::GetExecutor())); - auto alloc2(asio::get_associated_allocator( - handler, AsioBase::GetAllocator())); + auto ex2(asio::get_associated_executor(handler, AsioBase::GetExecutor())); + auto alloc2(asio::get_associated_allocator(handler, AsioBase::GetAllocator())); auto state(GetCurrentStateUnsafe()); ex2.post( @@ -341,8 +356,7 @@ class BasicTopology : public AsioBase try { h(MakeErrorCode(ErrorCode::OperationInProgress), s); } catch (const std::exception& e) { - LOG(error) - << "Uncaught exception in completion handler: " << e.what(); + LOG(error) << "Uncaught exception in completion handler: " << e.what(); } catch (...) { LOG(error) << "Unknown uncaught exception in completion handler."; } @@ -420,19 +434,17 @@ class BasicTopology : public AsioBase } } - auto UpdateStateEntry(DDSTask::Id taskId, const std::string& state) -> void + auto UpdateStateEntry(const DDSTask::Id taskId, const DeviceState state) -> void { - std::size_t pos = state.find("->"); - std::string endState = state.substr(pos + 2); try { std::lock_guard lk(fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.initialized = true; - task.state = fair::mq::GetState(endState); + task.state = state; if (task.state == fChangeStateTarget) { ++fTransitionedCount; } - // LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << endState; + // LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; TryChangeStateCompletion(); } catch (const std::exception& e) { LOG(error) << "Exception in UpdateStateEntry: " << e.what(); diff --git a/fairmq/sdk/commands/CMakeLists.txt b/fairmq/sdk/commands/CMakeLists.txt new file mode 100644 index 00000000..6f44170c --- /dev/null +++ b/fairmq/sdk/commands/CMakeLists.txt @@ -0,0 +1,44 @@ +################################################################################ +# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # +# # +# This software is distributed under the terms of the # +# GNU Lesser General Public Licence (LGPL) version 3, # +# copied verbatim in the file "LICENSE" # +################################################################################ + +set(target Commands) + +add_custom_command( + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/CommandsFormat.h + COMMAND $ -c -o ${CMAKE_CURRENT_BINARY_DIR} CommandsFormat.fbs + COMMAND ${CMAKE_COMMAND} -E rename ${CMAKE_CURRENT_BINARY_DIR}/CommandsFormat_generated.h ${CMAKE_CURRENT_BINARY_DIR}/CommandsFormat.h + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} +) + +add_library(${target} Commands.cxx Commands.h ${CMAKE_CURRENT_BINARY_DIR}/CommandsFormat.h) +add_library(FairMQ::${target} ALIAS ${target}) + +target_link_libraries(${target} + PUBLIC + StateMachine + Tools + + PRIVATE + flatbuffers::flatbuffers +) + +target_include_directories(${target} + PUBLIC + $ + $ +) + +install( + TARGETS ${target} + EXPORT ${PROJECT_EXPORT_SET} + DESTINATION ${PROJECT_INSTALL_LIBDIR} +) + +install(FILES Commands.h + DESTINATION ${PROJECT_INSTALL_INCDIR}/sdk/commands +) diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx new file mode 100644 index 00000000..8094ce7c --- /dev/null +++ b/fairmq/sdk/commands/Commands.cxx @@ -0,0 +1,403 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Commands.h" + +#include + +#include + +using namespace std; + +namespace fair { +namespace mq { +namespace sdk { +namespace cmd { + +array fbResultToResult = +{ + { + Result::Ok, + Result::Failure + } +}; + +array resultToFBResult = +{ + { + FBResult::FBResult_Ok, + FBResult::FBResult_Failure + } +}; + +array resultNames = +{ + { + "Ok", + "Failure" + } +}; + +array typeNames = +{ + { + "CheckState", + "ChangeState", + "DumpConfig", + "SubscribeToHeartbeats", + "UnsubscribeFromHeartbeats", + "SubscribeToStateChange", + "UnsubscribeFromStateChange", + "StateChangeExitingReceived", + + "CurrentState", + "TransitionStatus", + "Config", + "HeartbeatSubscription", + "HeartbeatUnsubscription", + "Heartbeat", + "StateChangeSubscription", + "StateChangeUnsubscription", + "StateChange" + } +}; + +array fbStateToMQState = +{ + { + fair::mq::State::Ok, + fair::mq::State::Error, + fair::mq::State::Idle, + fair::mq::State::InitializingDevice, + fair::mq::State::Initialized, + fair::mq::State::Binding, + fair::mq::State::Bound, + fair::mq::State::Connecting, + fair::mq::State::DeviceReady, + fair::mq::State::InitializingTask, + fair::mq::State::Ready, + fair::mq::State::Running, + fair::mq::State::ResettingTask, + fair::mq::State::ResettingDevice, + fair::mq::State::Exiting + } +}; + +array mqStateToFBState = +{ + { + sdk::cmd::FBState_Ok, + sdk::cmd::FBState_Error, + sdk::cmd::FBState_Idle, + sdk::cmd::FBState_InitializingDevice, + sdk::cmd::FBState_Initialized, + sdk::cmd::FBState_Binding, + sdk::cmd::FBState_Bound, + sdk::cmd::FBState_Connecting, + sdk::cmd::FBState_DeviceReady, + sdk::cmd::FBState_InitializingTask, + sdk::cmd::FBState_Ready, + sdk::cmd::FBState_Running, + sdk::cmd::FBState_ResettingTask, + sdk::cmd::FBState_ResettingDevice, + sdk::cmd::FBState_Exiting + } +}; + +array fbTransitionToMQTransition = +{ + { + fair::mq::Transition::Auto, + fair::mq::Transition::InitDevice, + fair::mq::Transition::CompleteInit, + fair::mq::Transition::Bind, + fair::mq::Transition::Connect, + fair::mq::Transition::InitTask, + fair::mq::Transition::Run, + fair::mq::Transition::Stop, + fair::mq::Transition::ResetTask, + fair::mq::Transition::ResetDevice, + fair::mq::Transition::End, + fair::mq::Transition::ErrorFound + } +}; + +array mqTransitionToFBTransition = +{ + { + sdk::cmd::FBTransition_Auto, + sdk::cmd::FBTransition_InitDevice, + sdk::cmd::FBTransition_CompleteInit, + sdk::cmd::FBTransition_Bind, + sdk::cmd::FBTransition_Connect, + sdk::cmd::FBTransition_InitTask, + sdk::cmd::FBTransition_Run, + sdk::cmd::FBTransition_Stop, + sdk::cmd::FBTransition_ResetTask, + sdk::cmd::FBTransition_ResetDevice, + sdk::cmd::FBTransition_End, + sdk::cmd::FBTransition_ErrorFound + } +}; + +array typeToFBCmd = +{ + { + FBCmd::FBCmd_check_state, + FBCmd::FBCmd_change_state, + FBCmd::FBCmd_dump_config, + FBCmd::FBCmd_subscribe_to_heartbeats, + FBCmd::FBCmd_unsubscribe_from_heartbeats, + FBCmd::FBCmd_subscribe_to_state_change, + FBCmd::FBCmd_unsubscribe_from_state_change, + FBCmd::FBCmd_state_change_exiting_received, + FBCmd::FBCmd_current_state, + FBCmd::FBCmd_transition_status, + FBCmd::FBCmd_config, + FBCmd::FBCmd_heartbeat_subscription, + FBCmd::FBCmd_heartbeat_unsubscription, + FBCmd::FBCmd_heartbeat, + FBCmd::FBCmd_state_change_subscription, + FBCmd::FBCmd_state_change_unsubscription, + FBCmd::FBCmd_state_change + } +}; + +array fbCmdToType = +{ + { + Type::check_state, + Type::change_state, + Type::dump_config, + Type::subscribe_to_heartbeats, + Type::unsubscribe_from_heartbeats, + Type::subscribe_to_state_change, + Type::unsubscribe_from_state_change, + Type::state_change_exiting_received, + Type::current_state, + Type::transition_status, + Type::config, + Type::heartbeat_subscription, + Type::heartbeat_unsubscription, + Type::heartbeat, + Type::state_change_subscription, + Type::state_change_unsubscription, + Type::state_change + } +}; + +fair::mq::State GetMQState(const FBState state) { return fbStateToMQState.at(state); } +FBState GetFBState(const fair::mq::State state) { return mqStateToFBState.at(static_cast(state)); } +fair::mq::Transition GetMQTransition(const FBTransition transition) { return fbTransitionToMQTransition.at(transition); } +FBTransition GetFBTransition(const fair::mq::Transition transition) { return mqTransitionToFBTransition.at(static_cast(transition)); } + +Result GetResult(const FBResult result) { return fbResultToResult.at(result); } +FBResult GetFBResult(const Result result) { return resultToFBResult.at(static_cast(result)); } +string GetResultName(const Result result) { return resultNames.at(static_cast(result)); } +string GetTypeName(const Type type) { return typeNames.at(static_cast(type)); } + +FBCmd GetFBCmd(const Type type) { return typeToFBCmd.at(static_cast(type)); } + +string Cmds::Serialize() const +{ + flatbuffers::FlatBufferBuilder fbb; + vector> commandOffsets; + + for (auto& cmd : fCmds) { + flatbuffers::Offset cmdOffset; + unique_ptr cmdBuilder; // delay the creation of the builder, because child strings need to be constructed first (which are conditional) + + switch (cmd->GetType()) { + case Type::check_state: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::change_state: { + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_transition(GetFBTransition(static_cast(*cmd).GetTransition())); + } + break; + case Type::dump_config: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::subscribe_to_heartbeats: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::unsubscribe_from_heartbeats: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::subscribe_to_state_change: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::unsubscribe_from_state_change: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::state_change_exiting_received: { + cmdBuilder = tools::make_unique(fbb); + } + break; + case Type::current_state: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_current_state(GetFBState(static_cast(*cmd).GetCurrentState())); + } + break; + case Type::transition_status: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + cmdBuilder->add_transition(GetFBTransition(static_cast(*cmd).GetTransition())); + } + break; + case Type::config: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto config = fbb.CreateString(static_cast(*cmd).GetConfig()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_config_string(config); + } + break; + case Type::heartbeat_subscription: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + } + break; + case Type::heartbeat_unsubscription: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + } + break; + case Type::heartbeat: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + } + break; + case Type::state_change_subscription: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + } + break; + case Type::state_change_unsubscription: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + } + break; + case Type::state_change: { + auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_task_id(static_cast(*cmd).GetTaskId()); + cmdBuilder->add_last_state(GetFBState(static_cast(*cmd).GetLastState())); + cmdBuilder->add_current_state(GetFBState(static_cast(*cmd).GetCurrentState())); + } + break; + default: + throw CommandFormatError("unrecognized command type given to fair::mq::cmd::Cmds::Serialize()"); + break; + } + + cmdBuilder->add_command_id(typeToFBCmd.at(static_cast(cmd->GetType()))); + + cmdOffset = cmdBuilder->Finish(); + commandOffsets.push_back(cmdOffset); + } + + auto commands = fbb.CreateVector(commandOffsets); + auto cmds = CreateFBCommands(fbb, commands); + fbb.Finish(cmds); + + return string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); +} + +void Cmds::Deserialize(const string& str) +{ + fCmds.clear(); + + auto cmds = cmd::GetFBCommands(const_cast(str.c_str()))->commands(); + + for (unsigned int i = 0; i < cmds->size(); ++i) { + const fair::mq::sdk::cmd::FBCommand& cmdPtr = *(cmds->Get(i)); + switch (cmdPtr.command_id()) { + case FBCmd_check_state: + fCmds.emplace_back(make()); + break; + case FBCmd_change_state: + fCmds.emplace_back(make(GetMQTransition(cmdPtr.transition()))); + break; + case FBCmd_dump_config: + fCmds.emplace_back(make()); + break; + case FBCmd_subscribe_to_heartbeats: + fCmds.emplace_back(make()); + break; + case FBCmd_unsubscribe_from_heartbeats: + fCmds.emplace_back(make()); + break; + case FBCmd_subscribe_to_state_change: + fCmds.emplace_back(make()); + break; + case FBCmd_unsubscribe_from_state_change: + fCmds.emplace_back(make()); + break; + case FBCmd_state_change_exiting_received: + fCmds.emplace_back(make()); + break; + case FBCmd_current_state: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetMQState(cmdPtr.current_state()))); + break; + case FBCmd_transition_status: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()), GetMQTransition(cmdPtr.transition()))); + break; + case FBCmd_config: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.config_string()->str())); + break; + case FBCmd_heartbeat_subscription: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + break; + case FBCmd_heartbeat_unsubscription: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + break; + case FBCmd_heartbeat: + fCmds.emplace_back(make(cmdPtr.device_id()->str())); + break; + case FBCmd_state_change_subscription: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + break; + case FBCmd_state_change_unsubscription: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()))); + break; + case FBCmd_state_change: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state()))); + break; + default: + throw CommandFormatError("unrecognized command type given to fair::mq::cmd::Cmds::Deserialize()"); + break; + } + } +} + +} // namespace cmd +} // namespace sdk +} // namespace mq +} // namespace fair diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h new file mode 100644 index 00000000..7ed27f2d --- /dev/null +++ b/fairmq/sdk/commands/Commands.h @@ -0,0 +1,356 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_COMMANDFACTORY +#define FAIR_MQ_SDK_COMMANDFACTORY + +#include +#include + +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace sdk +{ +namespace cmd +{ + +enum class Result : int { + Ok, + Failure +}; + +enum class Type : int +{ + check_state, // args: { } + change_state, // args: { transition } + dump_config, // args: { } + subscribe_to_heartbeats, // args: { } + unsubscribe_from_heartbeats, // args: { } + subscribe_to_state_change, // args: { } + unsubscribe_from_state_change, // args: { } + state_change_exiting_received, // args: { } + + current_state, // args: { device_id, current_state } + transition_status, // args: { device_id, Result, transition } + config, // args: { device_id, config_string } + heartbeat_subscription, // args: { device_id, Result } + heartbeat_unsubscription, // args: { device_id, Result } + heartbeat, // args: { device_id } + state_change_subscription, // args: { device_id, Result } + state_change_unsubscription, // args: { device_id, Result } + state_change // args: { device_id, task_id, last_state, current_state } +}; + +struct Cmd +{ + explicit Cmd(const Type type) : fType(type) {} + + Type GetType() const { return fType; } + + private: + Type fType; +}; + +struct CheckState : Cmd +{ + explicit CheckState() : Cmd(Type::check_state) {} +}; + +struct ChangeState : Cmd +{ + explicit ChangeState(Transition transition) + : Cmd(Type::change_state) + , fTransition(transition) + {} + + Transition GetTransition() const { return fTransition; } + void SetTransition(Transition transition) { fTransition = transition; } + + private: + Transition fTransition; +}; + +struct DumpConfig : Cmd +{ + explicit DumpConfig() : Cmd(Type::dump_config) {} +}; + +struct SubscribeToHeartbeats : Cmd +{ + explicit SubscribeToHeartbeats() : Cmd(Type::subscribe_to_heartbeats) {} +}; + +struct UnsubscribeFromHeartbeats : Cmd +{ + explicit UnsubscribeFromHeartbeats() : Cmd(Type::unsubscribe_from_heartbeats) {} +}; + +struct SubscribeToStateChange : Cmd +{ + explicit SubscribeToStateChange() : Cmd(Type::subscribe_to_state_change) {} +}; + +struct UnsubscribeFromStateChange : Cmd +{ + explicit UnsubscribeFromStateChange() : Cmd(Type::unsubscribe_from_state_change) {} +}; + +struct StateChangeExitingReceived : Cmd +{ + explicit StateChangeExitingReceived() : Cmd(Type::state_change_exiting_received) {} +}; + +struct CurrentState : Cmd +{ + explicit CurrentState(const std::string& id, State currentState) + : Cmd(Type::current_state) + , fDeviceId(id) + , fCurrentState(currentState) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + fair::mq::State GetCurrentState() const { return fCurrentState; } + void SetCurrentState(fair::mq::State state) { fCurrentState = state; } + + private: + std::string fDeviceId; + fair::mq::State fCurrentState; +}; + +struct TransitionStatus : Cmd +{ + explicit TransitionStatus(const std::string& id, Result result, Transition transition) + : Cmd(Type::transition_status) + , fDeviceId(id) + , fResult(result) + , fTransition(transition) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + Result GetResult() const { return fResult; } + void SetResult(Result result) { fResult = result; } + Transition GetTransition() const { return fTransition; } + void SetTransition(Transition transition) { fTransition = transition; } + + private: + std::string fDeviceId; + Result fResult; + Transition fTransition; +}; + +struct Config : Cmd +{ + explicit Config(const std::string& id, const std::string& config) + : Cmd(Type::config) + , fDeviceId(id) + , fConfig(config) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + std::string GetConfig() const { return fConfig; } + void SetConfig(const std::string& config) { fConfig = config; } + + private: + std::string fDeviceId; + std::string fConfig; +}; + +struct HeartbeatSubscription : Cmd +{ + explicit HeartbeatSubscription(const std::string& id, Result result) + : Cmd(Type::heartbeat_subscription) + , fDeviceId(id) + , fResult(result) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + Result GetResult() const { return fResult; } + void SetResult(Result result) { fResult = result; } + + private: + std::string fDeviceId; + Result fResult; +}; + +struct HeartbeatUnsubscription : Cmd +{ + explicit HeartbeatUnsubscription(const std::string& id, Result result) + : Cmd(Type::heartbeat_unsubscription) + , fDeviceId(id) + , fResult(result) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + Result GetResult() const { return fResult; } + void SetResult(Result result) { fResult = result; } + + private: + std::string fDeviceId; + Result fResult; +}; + +struct Heartbeat : Cmd +{ + explicit Heartbeat(const std::string& id) + : Cmd(Type::heartbeat) + , fDeviceId(id) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + + private: + std::string fDeviceId; +}; + +struct StateChangeSubscription : Cmd +{ + explicit StateChangeSubscription(const std::string& id, Result result) + : Cmd(Type::state_change_subscription) + , fDeviceId(id) + , fResult(result) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + Result GetResult() const { return fResult; } + void SetResult(Result result) { fResult = result; } + + private: + std::string fDeviceId; + Result fResult; +}; + +struct StateChangeUnsubscription : Cmd +{ + explicit StateChangeUnsubscription(const std::string& id, Result result) + : Cmd(Type::state_change_unsubscription) + , fDeviceId(id) + , fResult(result) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + Result GetResult() const { return fResult; } + void SetResult(Result result) { fResult = result; } + + private: + std::string fDeviceId; + Result fResult; +}; + +struct StateChange : Cmd +{ + explicit StateChange(const std::string& deviceId, uint64_t taskId, State lastState, State currentState) + : Cmd(Type::state_change) + , fDeviceId(deviceId) + , fTaskId(taskId) + , fLastState(lastState) + , fCurrentState(currentState) + {} + + std::string GetDeviceId() const { return fDeviceId; } + void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + uint64_t GetTaskId() const { return fTaskId; } + void SetTaskId(uint64_t taskId) { fTaskId = taskId; } + fair::mq::State GetLastState() const { return fLastState; } + void SetLastState(fair::mq::State state) { fLastState = state; } + fair::mq::State GetCurrentState() const { return fCurrentState; } + void SetCurrentState(fair::mq::State state) { fCurrentState = state; } + + private: + std::string fDeviceId; + uint64_t fTaskId; + fair::mq::State fLastState; + fair::mq::State fCurrentState; +}; + +template +std::unique_ptr make(Args&&... args) +{ + return fair::mq::tools::make_unique(std::forward(args)...); +} + +struct Cmds +{ + using container = std::vector>; + struct CommandFormatError : std::runtime_error { using std::runtime_error::runtime_error; }; + + explicit Cmds() {} + + template + explicit Cmds(std::unique_ptr&& first, Rest&&... rest) + { + Unpack(std::forward&&>(first), std::forward(rest)...); + } + + + void Add(std::unique_ptr&& cmd) { fCmds.emplace_back(std::move(cmd)); } + + template + void Add(Args&&... args) + { + static_assert(std::is_base_of::value, "Only types derived from fair::mq::cmd::Cmd are allowed"); + Add(make(std::forward(args)...)); + } + + Cmd& At(size_t i) { return *(fCmds.at(i)); } + + size_t Size() { return fCmds.size(); } + void Reset() { fCmds.clear(); } + + std::string Serialize() const; + void Deserialize(const std::string&); + + private: + container fCmds; + + void Unpack() {} + + template + void Unpack(std::unique_ptr&& first, Rest&&... rest) + { + fCmds.emplace_back(std::move(first)); + Unpack(std::forward(rest)...); + } + + public: + using iterator = container::iterator; + using const_iterator = container::const_iterator; + + auto begin() -> decltype(fCmds.begin()) { return fCmds.begin(); } + auto end() -> decltype(fCmds.end()) { return fCmds.end(); } + auto cbegin() -> decltype(fCmds.cbegin()) { return fCmds.cbegin(); } + auto cend() -> decltype(fCmds.cend()) { return fCmds.cend(); } +}; + +std::string GetResultName(const Result result); +std::string GetTypeName(const Type type); + +inline std::ostream& operator<<(std::ostream& os, const Result& result) { return os << GetResultName(result); } +inline std::ostream& operator<<(std::ostream& os, const Type& type) { return os << GetTypeName(type); } + +} /* namespace cmd */ +} /* namespace sdk */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_SDK_COMMANDFACTORY */ diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs new file mode 100644 index 00000000..5aadcfa5 --- /dev/null +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -0,0 +1,79 @@ +namespace fair.mq.sdk.cmd; + +enum FBResult:byte { + Ok, + Failure +} + +enum FBState:byte { + Ok, + Error, + Idle, + InitializingDevice, + Initialized, + Binding, + Bound, + Connecting, + DeviceReady, + InitializingTask, + Ready, + Running, + ResettingTask, + ResettingDevice, + Exiting +} + +enum FBTransition:byte { + Auto, + InitDevice, + CompleteInit, + Bind, + Connect, + InitTask, + Run, + Stop, + ResetTask, + ResetDevice, + End, + ErrorFound +} + +enum FBCmd:byte { + check_state, // args: { } + change_state, // args: { transition } + dump_config, // args: { } + subscribe_to_heartbeats, // args: { } + unsubscribe_from_heartbeats, // args: { } + subscribe_to_state_change, // args: { } + unsubscribe_from_state_change, // args: { } + state_change_exiting_received, // args: { } + + current_state, // args: { device_id, current_state } + transition_status, // args: { device_id, Result, transition } + config, // args: { device_id, config_string } + heartbeat_subscription, // args: { device_id, Result } + heartbeat_unsubscription, // args: { device_id, Result } + heartbeat, // args: { device_id } + state_change_subscription, // args: { device_id, Result } + state_change_unsubscription, // args: { device_id, Result } + state_change // args: { device_id, task_id, last_state, current_state } +} + +table FBCommand { + command_id:FBCmd; + device_id:string; + task_id:uint64; + state:FBState; + transition:FBTransition; + result:FBResult; + config_string:string; + last_state:FBState; + current_state:FBState; + debug:string; +} + +table FBCommands { + commands:[FBCommand]; +} + +root_type FBCommands; diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 2fff5f0b..1677850c 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -17,7 +17,6 @@ #include #include -#include #include