mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Example tests: check exit codes
This commit is contained in:
committed by
Dennis Klein
parent
afd5700cca
commit
61d2797971
@@ -57,45 +57,48 @@ struct Sampler : fair::mq::Device
|
||||
));
|
||||
}
|
||||
|
||||
bool ConditionalRun() override
|
||||
void Run() override
|
||||
{
|
||||
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
while (!NewStatePending()) {
|
||||
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
|
||||
// static_cast<char*>(fRegion->GetData())[3] = 97;
|
||||
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
++fNumUnackedMsgs;
|
||||
if (Send(msg, "data", 0) > 0) {
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
++fNumUnackedMsgs;
|
||||
if (Send(msg, "data", 0) > 0) {
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
// wait for all acks to arrive
|
||||
while (!NewStatePending()) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
if (fNumUnackedMsgs == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(25));
|
||||
}
|
||||
|
||||
if (fNumUnackedMsgs != 0) {
|
||||
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
|
||||
} else {
|
||||
LOG(info) << "All acknowledgements received.";
|
||||
}
|
||||
}
|
||||
|
||||
void ResetTask() override
|
||||
{
|
||||
// give some time for acks to be received
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
fRegion.reset();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
if (fNumUnackedMsgs != 0) {
|
||||
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
|
||||
} else {
|
||||
LOG(info) << "All acknowledgements received.";
|
||||
}
|
||||
}
|
||||
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user