mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
parent
0ae53fd7d9
commit
3d4cd02812
|
@ -15,7 +15,6 @@
|
||||||
#include <asio/system_executor.hpp>
|
#include <asio/system_executor.hpp>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
#include <fairlogger/Logger.h>
|
|
||||||
#include <fairmq/sdk/Error.h>
|
#include <fairmq/sdk/Error.h>
|
||||||
#include <fairmq/sdk/Traits.h>
|
#include <fairmq/sdk/Traits.h>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
@ -24,6 +23,11 @@
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
#ifndef FAIR_LOG
|
||||||
|
#define FAIR_LOG LOG
|
||||||
|
#endif /* ifndef FAIR_LOG */
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
namespace mq {
|
namespace mq {
|
||||||
namespace sdk {
|
namespace sdk {
|
||||||
|
@ -70,9 +74,9 @@ struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
|
||||||
try {
|
try {
|
||||||
handler(ec, args...);
|
handler(ec, args...);
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
GetAlloc2());
|
GetAlloc2());
|
||||||
|
|
|
@ -23,6 +23,9 @@
|
||||||
#include <fairmq/tools/Unique.h>
|
#include <fairmq/tools/Unique.h>
|
||||||
|
|
||||||
#include <fairlogger/Logger.h>
|
#include <fairlogger/Logger.h>
|
||||||
|
#ifndef FAIR_LOG
|
||||||
|
#define FAIR_LOG LOG
|
||||||
|
#endif /* ifndef FAIR_LOG */
|
||||||
|
|
||||||
#include <asio/associated_executor.hpp>
|
#include <asio/associated_executor.hpp>
|
||||||
#include <asio/async_result.hpp>
|
#include <asio/async_result.hpp>
|
||||||
|
@ -187,10 +190,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||||
Cmds inCmds;
|
Cmds inCmds;
|
||||||
inCmds.Deserialize(msg);
|
inCmds.Deserialize(msg);
|
||||||
// LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
|
// FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
|
||||||
|
|
||||||
for (const auto& cmd : inCmds) {
|
for (const auto& cmd : inCmds) {
|
||||||
// LOG(debug) << " > " << cmd->GetType();
|
// FAIR_LOG(debug) << " > " << cmd->GetType();
|
||||||
switch (cmd->GetType()) {
|
switch (cmd->GetType()) {
|
||||||
case Type::state_change: {
|
case Type::state_change: {
|
||||||
auto _cmd = static_cast<StateChange&>(*cmd);
|
auto _cmd = static_cast<StateChange&>(*cmd);
|
||||||
|
@ -201,18 +204,18 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
} break;
|
} break;
|
||||||
case Type::state_change_subscription:
|
case Type::state_change_subscription:
|
||||||
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
|
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
|
||||||
LOG(error) << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId();
|
FAIR_LOG(error) << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Type::state_change_unsubscription:
|
case Type::state_change_unsubscription:
|
||||||
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
|
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
|
||||||
LOG(error) << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId();
|
FAIR_LOG(error) << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Type::transition_status: {
|
case Type::transition_status: {
|
||||||
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
||||||
if (_cmd.GetResult() != Result::Ok) {
|
if (_cmd.GetResult() != Result::Ok) {
|
||||||
LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
|
FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(_cmd.GetTaskId())).state != fChangeStateTarget) {
|
if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(_cmd.GetTaskId())).state != fChangeStateTarget) {
|
||||||
fChangeStateOpTimer.cancel();
|
fChangeStateOpTimer.cancel();
|
||||||
|
@ -230,15 +233,15 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
|
FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
|
||||||
LOG(warn) << "Origin: " << senderId;
|
FAIR_LOG(warn) << "Origin: " << senderId;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
// LOG(debug) << "Subscribing to state change";
|
// FAIR_LOG(debug) << "Subscribing to state change";
|
||||||
Cmds cmds(make<SubscribeToStateChange>());
|
Cmds cmds(make<SubscribeToStateChange>());
|
||||||
fDDSSession.SendCommand(cmds.Serialize());
|
fDDSSession.SendCommand(cmds.Serialize());
|
||||||
}
|
}
|
||||||
|
@ -391,9 +394,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
try {
|
try {
|
||||||
h(MakeErrorCode(ErrorCode::OperationInProgress), s);
|
h(MakeErrorCode(ErrorCode::OperationInProgress), s);
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
LOG(error) << "Uncaught exception in completion handler: " << e.what();
|
FAIR_LOG(error) << "Uncaught exception in completion handler: " << e.what();
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
LOG(error) << "Unknown uncaught exception in completion handler.";
|
FAIR_LOG(error) << "Unknown uncaught exception in completion handler.";
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
alloc2);
|
alloc2);
|
||||||
|
@ -687,7 +690,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
// FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
||||||
}
|
}
|
||||||
GetPropertiesOp() = delete;
|
GetPropertiesOp() = delete;
|
||||||
GetPropertiesOp(const GetPropertiesOp&) = delete;
|
GetPropertiesOp(const GetPropertiesOp&) = delete;
|
||||||
|
@ -741,9 +744,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
|
op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
|
||||||
} catch (std::out_of_range& e) {
|
} catch (std::out_of_range& e) {
|
||||||
LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
|
FAIR_LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
|
||||||
<< ") not found (probably completed or timed out), "
|
<< ") not found (probably completed or timed out), "
|
||||||
<< "discarding reply of device " << cmd.GetDeviceId();
|
<< "discarding reply of device " << cmd.GetDeviceId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -856,7 +859,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
// FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
||||||
}
|
}
|
||||||
SetPropertiesOp() = delete;
|
SetPropertiesOp() = delete;
|
||||||
SetPropertiesOp(const SetPropertiesOp&) = delete;
|
SetPropertiesOp(const SetPropertiesOp&) = delete;
|
||||||
|
@ -908,9 +911,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
lk.unlock();
|
lk.unlock();
|
||||||
op.Update(cmd.GetDeviceId(), cmd.GetResult());
|
op.Update(cmd.GetDeviceId(), cmd.GetResult());
|
||||||
} catch (std::out_of_range& e) {
|
} catch (std::out_of_range& e) {
|
||||||
LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
|
FAIR_LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
|
||||||
<< ") not found (probably completed or timed out), "
|
<< ") not found (probably completed or timed out), "
|
||||||
<< "discarding reply of device " << cmd.GetDeviceId();
|
<< "discarding reply of device " << cmd.GetDeviceId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1033,10 +1036,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
if (task.state == fChangeStateTarget) {
|
if (task.state == fChangeStateTarget) {
|
||||||
++fTransitionedCount;
|
++fTransitionedCount;
|
||||||
}
|
}
|
||||||
// LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||||
TryChangeStateCompletion();
|
TryChangeStateCompletion();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
LOG(error) << "Exception in UpdateStateEntry: " << e.what();
|
FAIR_LOG(error) << "Exception in UpdateStateEntry: " << e.what();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user