mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Avoid accessing Device.fChannels directly, use getters
This commit is contained in:
parent
a3bb5fb4b0
commit
dbdf17c661
|
@ -19,7 +19,7 @@ struct Sampler : fair::mq::Device
|
|||
{
|
||||
void InitTask() override
|
||||
{
|
||||
fNumDataChannels = fChannels.at("data").size();
|
||||
fNumDataChannels = GetNumSubChannels("data");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ struct Sender : fair::mq::Device
|
|||
|
||||
void Run() override
|
||||
{
|
||||
FairMQChannel& dataInChannel = fChannels.at("sync").at(0);
|
||||
FairMQChannel& dataInChannel = GetChannel("sync", 0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
Header h;
|
||||
|
|
|
@ -26,7 +26,7 @@ struct Receiver : Device
|
|||
|
||||
void Run() override
|
||||
{
|
||||
Channel& dataInChannel = fChannels.at("sr").at(0);
|
||||
Channel& dataInChannel = GetChannel("sr", 0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
auto msg(dataInChannel.NewMessage());
|
||||
|
|
|
@ -23,7 +23,7 @@ struct Sampler : fair::mq::Device
|
|||
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
LOG(info) << "Region event: " << info.event << ": "
|
||||
<< (info.managed ? "managed" : "unmanaged")
|
||||
<< ", id: " << info.id
|
||||
|
@ -87,7 +87,7 @@ struct Sampler : fair::mq::Device
|
|||
LOG(info) << "All acknowledgements received.";
|
||||
}
|
||||
}
|
||||
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
|
||||
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -22,7 +22,7 @@ struct Sink : Device
|
|||
{
|
||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
|
||||
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
|
||||
LOG(info) << "Region event: " << info.event << ": "
|
||||
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr << ", size: " << info.size
|
||||
|
@ -32,7 +32,7 @@ struct Sink : Device
|
|||
|
||||
void Run() override
|
||||
{
|
||||
Channel& dataInChannel = fChannels.at("data").at(0);
|
||||
Channel& dataInChannel = GetChannel("data", 0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
auto msg(dataInChannel.Transport()->CreateMessage());
|
||||
|
@ -51,7 +51,7 @@ struct Sink : Device
|
|||
|
||||
void ResetTask() override
|
||||
{
|
||||
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
|
||||
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -44,7 +44,7 @@ class BenchmarkSampler : public Device
|
|||
void Run() override
|
||||
{
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
|
||||
FairMQChannel& dataOutChannel = GetChannel(fOutChannelName, 0);
|
||||
|
||||
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
|
|
@ -43,7 +43,7 @@ class Merger : public Device
|
|||
|
||||
void Run() override
|
||||
{
|
||||
int numInputs = fChannels.at(fInChannelName).size();
|
||||
int numInputs = GetNumSubChannels(fInChannelName);
|
||||
|
||||
std::vector<FairMQChannel*> chans;
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ class Multiplier : public Device
|
|||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
||||
fOutChannelNames = fConfig->GetProperty<std::vector<std::string>>("out-channel");
|
||||
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
|
||||
fNumOutputs = GetNumSubChannels(fOutChannelNames.at(0));
|
||||
|
||||
if (fMultipart) {
|
||||
OnData(fInChannelName, &Multiplier::HandleMultipartData);
|
||||
|
@ -43,7 +43,7 @@ class Multiplier : public Device
|
|||
bool HandleSingleData(std::unique_ptr<FairMQMessage>& payload, int)
|
||||
{
|
||||
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
|
||||
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
|
||||
for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel
|
||||
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
|
||||
msgCopy->Copy(*payload);
|
||||
|
||||
|
@ -51,7 +51,7 @@ class Multiplier : public Device
|
|||
}
|
||||
}
|
||||
|
||||
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
|
||||
unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back());
|
||||
|
||||
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
|
||||
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
|
||||
|
@ -68,7 +68,7 @@ class Multiplier : public Device
|
|||
bool HandleMultipartData(FairMQParts& payload, int)
|
||||
{
|
||||
for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
|
||||
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
|
||||
for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel
|
||||
FairMQParts parts;
|
||||
|
||||
for (int k = 0; k < payload.Size(); ++k) {
|
||||
|
@ -81,7 +81,7 @@ class Multiplier : public Device
|
|||
}
|
||||
}
|
||||
|
||||
unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
|
||||
unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back());
|
||||
|
||||
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
|
||||
FairMQParts parts;
|
||||
|
|
|
@ -48,7 +48,7 @@ class Sink : public Device
|
|||
void Run() override
|
||||
{
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
||||
FairMQChannel& dataInChannel = GetChannel(fInChannelName, 0);
|
||||
|
||||
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
|
|
@ -30,7 +30,7 @@ class Splitter : public Device
|
|||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
||||
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
|
||||
fNumOutputs = fChannels.at(fOutChannelName).size();
|
||||
fNumOutputs = GetNumSubChannels(fOutChannelName);
|
||||
fDirection = 0;
|
||||
|
||||
if (fMultipart) {
|
||||
|
|
|
@ -37,8 +37,8 @@ class PollIn : public FairMQDevice
|
|||
{
|
||||
vector<FairMQChannel*> chans;
|
||||
|
||||
chans.push_back(&fChannels.at("data1").at(0));
|
||||
chans.push_back(&fChannels.at("data2").at(0));
|
||||
chans.push_back(&GetChannel("data1", 0));
|
||||
chans.push_back(&GetChannel("data2", 0));
|
||||
|
||||
FairMQPollerPtr poller = nullptr;
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user