FairMQ  1.4.14
C++ Message Queuing Library and Framework
PMIxCommands.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef PMIXCOMMANDS_H
10 #define PMIXCOMMANDS_H
11 
12 #include "PMIx.hpp"
13 
14 #include <FairMQLogger.h>
15 #include <fairmq/tools/Semaphore.h>
16 #include <fairmq/tools/CppSTL.h>
17 #include <string>
18 
19 namespace pmix
20 {
21 
22 std::array<std::string, 47> typeNames =
23 {
24  {
25  "PMIX_UNDEF",
26  "PMIX_BOOL",
27  "PMIX_BYTE",
28  "PMIX_STRING",
29  "PMIX_SIZE",
30  "PMIX_PID",
31  "PMIX_INT",
32  "PMIX_INT8",
33  "PMIX_INT16",
34  "PMIX_INT32",
35  "PMIX_INT64",
36  "PMIX_UINT",
37  "PMIX_UINT8",
38  "PMIX_UINT16",
39  "PMIX_UINT32",
40  "PMIX_UINT64",
41  "PMIX_FLOAT",
42  "PMIX_DOUBLE",
43  "PMIX_TIMEVAL",
44  "PMIX_TIME",
45  "PMIX_STATUS",
46  "PMIX_VALUE",
47  "PMIX_PROC",
48  "PMIX_APP",
49  "PMIX_INFO",
50  "PMIX_PDATA",
51  "PMIX_BUFFER",
52  "PMIX_BYTE_OBJECT",
53  "PMIX_KVAL",
54  "PMIX_MODEX",
55  "PMIX_PERSIST",
56  "PMIX_POINTER",
57  "PMIX_SCOPE",
58  "PMIX_DATA_RANGE",
59  "PMIX_COMMAND",
60  "PMIX_INFO_DIRECTIVES",
61  "PMIX_DATA_TYPE",
62  "PMIX_PROC_STATE",
63  "PMIX_PROC_INFO",
64  "PMIX_DATA_ARRAY",
65  "PMIX_PROC_RANK",
66  "PMIX_QUERY",
67  "PMIX_COMPRESSED_STRING",
68  "PMIX_ALLOC_DIRECTIVE",
69  "PMIX_INFO_ARRAY",
70  "PMIX_IOF_CHANNEL",
71  "PMIX_ENVAR"
72  }
73 };
74 
75 enum class Command : int
76 {
77  general = PMIX_EXTERNAL_ERR_BASE,
78  error = PMIX_EXTERNAL_ERR_BASE - 1
79 };
80 
81 
82 class Commands
83 {
84  public:
85  Commands(const proc& process)
86  : fProcess(process)
87  , fSubscribed(false)
88  {
89  }
90 
91  ~Commands()
92  {
93  Unsubscribe();
94  }
95 
96  void Subscribe(std::function<void(const std::string& msg, const proc& sender)> callback)
97  {
98  using namespace std::placeholders;
99 
100  LOG(debug) << "PMIxCommands: Subscribing...";
101 
102  fCallback = callback;
103  std::array<pmix::status, 1> codes;
104  codes[0] = static_cast<int>(pmix::Command::general);
105 
106  PMIX_INFO_LOAD(&(fInfos[0]), PMIX_EVENT_RETURN_OBJECT, this, PMIX_POINTER);
107 
108  PMIx_Register_event_handler(codes.data(), codes.size(),
109  fInfos.data(), fInfos.size(),
110  &Commands::Handler,
111  &Commands::EventHandlerRegistration,
112  this);
113  fBlocker.Wait();
114  LOG(debug) << "PMIxCommands: Subscribing complete!";
115  }
116 
117  void Unsubscribe()
118  {
119  if (fSubscribed) {
120  LOG(debug) << "PMIxCommands: Unsubscribing...";
121  PMIx_Deregister_event_handler(fHandlerRef, &Commands::EventHandlerDeregistration, this);
122  fBlocker.Wait();
123  LOG(debug) << "PMIxCommands: Unsubscribing complete!";
124  } else {
125  LOG(debug) << "Unsubscribe() is called while no subscription is active";
126  }
127  }
128 
129  struct Holder
130  {
131  Holder() : fData(nullptr) {}
132  ~Holder() { PMIX_DATA_ARRAY_FREE(fData); }
133 
134  std::vector<pmix::info> fInfos;
135  pmix_data_array_t* fData;
136  };
137 
138  void Send(const std::string& msg)
139  {
140  std::vector<pmix::info>* infos = new std::vector<pmix::info>();
141  infos->emplace_back("fairmq.cmd", msg);
142  PMIx_Notify_event(static_cast<int>(pmix::Command::general),
143  &fProcess,
144  PMIX_RANGE_NAMESPACE,
145  infos->data(), infos->size(),
146  &Commands::OpCompleteCallback<std::vector<pmix::info>>,
147  infos);
148  }
149 
150  void Send(const std::string& msg, rank rank)
151  {
152  pmix::proc destination(fProcess);
153  destination.rank = rank;
154  Send(msg, {destination});
155  }
156 
157  void Send(const std::string& msg, const std::vector<proc>& destination)
158  {
159  std::unique_ptr<Holder> holder = fair::mq::tools::make_unique<Holder>();
160 
161  PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC);
162  memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t));
163  // LOG(warn) << "OLOG: " << msg << " > " << static_cast<pmix_proc_t*>(holder->fData->array)[0].nspace << ": " << static_cast<pmix_proc_t*>(holder->fData->array)[0].rank;
164  holder->fInfos.emplace_back(PMIX_EVENT_CUSTOM_RANGE, holder->fData);
165  // LOG(warn) << msg << " // packed range: " << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].nspace << "_" << static_cast<pmix_proc_t*>(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->array)[0].rank;
166  // LOG(warn) << msg << " // packed range.type: " << pmix::typeNames.at(holder->fInfos.at(0).value.type);
167  // LOG(warn) << msg << " // packed range.array.type: " << pmix::typeNames.at(static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->type);
168  // LOG(warn) << msg << " // packed range.array.size: " << static_cast<pmix_data_array_t*>(holder->fInfos.at(0).value.data.darray)->size;
169  // LOG(warn) << holder->fInfos.size();
170  holder->fInfos.emplace_back("fairmq.cmd", msg);
171  // LOG(warn) << msg << " // packed msg: " << holder->fInfos.at(1).value.data.string;
172  // LOG(warn) << msg << " // packed msg.type: " << pmix::typeNames.at(holder->fInfos.at(1).value.type);
173  // LOG(warn) << holder->fInfos.size();
174 
175  PMIx_Notify_event(static_cast<int>(pmix::Command::general),
176  &fProcess,
177  PMIX_RANGE_CUSTOM,
178  holder->fInfos.data(), holder->fInfos.size(),
179  &Commands::OpCompleteCallback<Holder>,
180  holder.get());
181  holder.release();
182  }
183 
184  private:
185  static void EventHandlerRegistration(pmix_status_t s, size_t handlerRef, void* obj)
186  {
187  if (s == PMIX_SUCCESS) {
188  LOG(debug) << "Successfully registered event handler, reference = " << static_cast<unsigned long>(handlerRef);
189  static_cast<Commands*>(obj)->fHandlerRef = handlerRef;
190  static_cast<Commands*>(obj)->fSubscribed = true;
191  } else {
192  LOG(error) << "Could not register PMIx event handler, status = " << s;
193  }
194  static_cast<Commands*>(obj)->fBlocker.Signal();
195  }
196 
197  static void EventHandlerDeregistration(pmix_status_t s, void* obj)
198  {
199  if (s == PMIX_SUCCESS) {
200  LOG(debug) << "Successfully deregistered event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef;
201  static_cast<Commands*>(obj)->fSubscribed = false;
202  } else {
203  LOG(error) << "Could not deregister PMIx event handler, reference = " << static_cast<Commands*>(obj)->fHandlerRef << ", status = " << s;
204  }
205  static_cast<Commands*>(obj)->fBlocker.Signal();
206  }
207 
208  template<typename T>
209  static void OpCompleteCallback(pmix_status_t s, void* data)
210  {
211  if (s == PMIX_SUCCESS) {
212  // LOG(info) << "Operation completed successfully";
213  } else {
214  LOG(error) << "Could not complete operation, status = " << s;
215  }
216  if (data) {
217  // LOG(warn) << "Destroying event data...";
218  delete static_cast<T*>(data);
219  }
220  }
221 
222  static void Handler(size_t handlerId,
223  pmix_status_t s,
224  const pmix_proc_t* src,
225  pmix_info_t info[], size_t ninfo,
226  pmix_info_t[] /* results */, size_t nresults,
227  pmix_event_notification_cbfunc_fn_t cbfunc,
228  void* cbdata)
229  {
230  std::stringstream ss;
231  ss << "Event handler called with "
232  << "status: " << s << ", "
233  << "source: " << src->nspace << "_" << src->rank << ", "
234  << "ninfo: " << ninfo << ", "
235  << "nresults: " << nresults << ", "
236  << "handlerId: " << handlerId;
237 
238  std::string msg;
239 
240  Commands* obj = nullptr;
241 
242  if (ninfo > 0) {
243  ss << ":\n";
244  for (size_t i = 0; i < ninfo; ++i) {
245  ss << " [" << i << "]: key: '" << info[i].key
246  << "', value: '" << pmix::get_value_str(info[i].value)
247  << "', value.type: '" << pmix::typeNames.at(info[i].value.type)
248  << "', flags: " << info[i].flags;
249 
250  if (std::strcmp(info[i].key, "fairmq.cmd") == 0) {
251  msg = pmix::get_value_str(info[i].value);
252  }
253 
254  if (std::strcmp(info[i].key, PMIX_EVENT_RETURN_OBJECT) == 0) {
255  obj = static_cast<Commands*>(info[i].value.data.ptr);
256  }
257 
258  if (i < ninfo - 1) {
259  ss << "\n";
260  }
261  }
262  }
263 
264 
265  if (obj != nullptr) {
266  if (static_cast<Commands*>(obj)->fProcess.rank != src->rank) {
267  // LOG(warn) << ss.str();
268  static_cast<Commands*>(obj)->fCallback(msg, proc(const_cast<char*>(src->nspace), rank(src->rank)));
269  } else {
270  // LOG(trace) << "suppressing message from itself";
271  }
272  } else {
273  LOG(ERROR) << "ERROR";
274  }
275 
276  if (cbfunc != nullptr) {
277  cbfunc(PMIX_SUCCESS, nullptr, 0, nullptr, nullptr, cbdata);
278  }
279  }
280 
281  const proc& fProcess;
282  size_t fHandlerRef;
283  std::function<void(const std::string& msg, const proc& sender)> fCallback;
284  std::array<pmix_info_t, 1> fInfos;
285  bool fSubscribed;
287 };
288 
289 } /* namespace pmix */
290 
291 #endif /* PMIXCOMMANDS_H */
Definition: PMIx.hpp:42
Definition: PMIx.hpp:77
Definition: PMIx.hpp:121
Definition: PMIx.hpp:26
Definition: PMIx.hpp:61
A simple copyable blocking semaphore.
Definition: Semaphore.h:45
Definition: PMIxCommands.h:129
Definition: PMIxCommands.h:82

privacy