From ebcbe2dde6224b89ebc1df085401847b4014b826 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 13 Oct 2021 22:05:36 +0200 Subject: [PATCH] feat: Add interactive controller button to print connected peers --- fairmq/Device.h | 8 ++++++++ fairmq/Plugin.h | 2 ++ fairmq/PluginServices.h | 3 +++ fairmq/plugins/control/Control.cxx | 19 +++++++++++++++++-- fairmq/plugins/control/Control.h | 1 + 5 files changed, 31 insertions(+), 2 deletions(-) diff --git a/fairmq/Device.h b/fairmq/Device.h index b71f24bb..bcfa3275 100644 --- a/fairmq/Device.h +++ b/fairmq/Device.h @@ -327,6 +327,14 @@ class Device throw; } + /// @brief Get numbers of connected peers for the given channel + /// @param name channel name + /// @param index sub-channel + unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0) + { + return fChannels.at(channelName).at(index).GetNumberOfConnectedPeers(); + } + virtual void RegisterChannelEndpoints() {} bool RegisterChannelEndpoint(const std::string& channelName, diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index d969a32b..2d06eef3 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -91,6 +91,8 @@ class Plugin auto SubscribeToDeviceStateChange(std::function callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); } auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); } + auto GetNumberOfConnectedPeers(const std::string& channelName, int index = 0) -> unsigned long { return fPluginServices->GetNumberOfConnectedPeers(channelName, index); } + // device config API // see for docs auto PropertyExists(const std::string& key) -> int { return fPluginServices->PropertyExists(key); } diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index b82c9ed3..b1ee9ed5 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -138,6 +138,9 @@ class PluginServices /// @param subscriber id auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice.UnsubscribeFromStateChange(subscriber); } + /// DO NOT USE, ONLY FOR TESTING, WILL BE REMOVED (and info made available via property api) + auto GetNumberOfConnectedPeers(const std::string& channelName, int index = 0) -> unsigned long { return fDevice.GetNumberOfConnectedPeers(channelName, index); } + // Config API /// @brief Checks a property with the given key exist in the configuration diff --git a/fairmq/plugins/control/Control.cxx b/fairmq/plugins/control/Control.cxx index 3e6e76ff..cd5204cf 100644 --- a/fairmq/plugins/control/Control.cxx +++ b/fairmq/plugins/control/Control.cxx @@ -206,6 +206,10 @@ try { cout << "\n --> [m] decrease log verbosity\n\n" << flush; CycleLogVerbosityDown(); break; + case 'p': + cout << "\n --> [p] print number of connected peers for all channels\n\n" << flush; + PrintNumberOfConnectedPeers(); + break; case 'h': cout << "\n --> [h] help\n\n" << flush; if (color) { @@ -248,7 +252,7 @@ auto Control::PrintInteractiveHelpColor() -> void { stringstream ss; ss << "Following control commands are available:\n\n" - << " [\033[01;32mh\033[0m] help, [\033[01;32mc\033[0m] check current device state,\n" + << " [\033[01;32mh\033[0m] help, [\033[01;32mc\033[0m] check current device state, [\033[01;32mp\033[0m] print number of connected peers for channels,\n" << " [\033[01;32mi\033[0m] init device, [\033[01;32mb\033[0m] bind, [\033[01;32mx\033[0m] connect, [\033[01;32mj\033[0m] init task," << " [\033[01;32mr\033[0m] run, [\033[01;32ms\033[0m] stop,\n" << " [\033[01;32mt\033[0m] reset task, [\033[01;32md\033[0m] reset device, [\033[01;32mq\033[0m] end,\n" @@ -260,7 +264,7 @@ auto Control::PrintInteractiveHelp() -> void { stringstream ss; ss << "Following control commands are available:\n\n" - << " [h] help, [c] check current device state,\n" + << " [h] help, [c] check current device state, [p] print number of connected peers for channels\n" << " [i] init device, [b] bind, [x] connect, [j] init task,\n" << " [r] run, [s] stop,\n" << " [t] reset task, [d] reset device, [q] end,\n" @@ -344,6 +348,17 @@ void Control::PrintStateMachine() cout << ss.str() << flush; } +auto Control::PrintNumberOfConnectedPeers() -> void +{ + unordered_map channelInfo(GetChannelInfo()); + + for (const auto& c : channelInfo) { + for (int i = 0; i < c.second; ++i) { + LOG(info) << c.first << "[" << i << "]: " << GetNumberOfConnectedPeers(c.first, i) << " peers connected"; + } + } +} + auto Control::StaticMode() -> void try { RunStartupSequence(); diff --git a/fairmq/plugins/control/Control.h b/fairmq/plugins/control/Control.h index b8fd40f3..61439d9f 100644 --- a/fairmq/plugins/control/Control.h +++ b/fairmq/plugins/control/Control.h @@ -41,6 +41,7 @@ class Control : public Plugin static auto PrintInteractiveHelp() -> void; static auto PrintStateMachineColor() -> void; static auto PrintStateMachine() -> void; + auto PrintNumberOfConnectedPeers() -> void; auto StaticMode() -> void; auto SignalHandler() -> void; auto RunShutdownSequence() -> void;