QUDA  v1.1.0
A library for QCD on GPUs
communicator_qmp.cpp
Go to the documentation of this file.
1 #include <communicator_quda.h>
2 #include <mpi_comm_handle.h>
3 
4 #define QMP_CHECK(qmp_call) \
5  do { \
6  QMP_status_t status = qmp_call; \
7  if (status != QMP_SUCCESS) errorQuda("(QMP) %s", QMP_error_string(status)); \
8  } while (0)
9 
10 #define MPI_CHECK(mpi_call) \
11  do { \
12  int status = mpi_call; \
13  if (status != MPI_SUCCESS) { \
14  char err_string[128]; \
15  int err_len; \
16  MPI_Error_string(status, err_string, &err_len); \
17  err_string[127] = '\0'; \
18  errorQuda("(MPI) %s", err_string); \
19  } \
20  } while (0)
21 
22 struct MsgHandle_s {
23  QMP_msgmem_t mem;
24  QMP_msghandle_t handle;
25 };
26 
27 // While we can emulate an all-gather using QMP reductions, this
28 // scales horribly as the number of nodes increases, so for
29 // performance we just call MPI directly
30 #define USE_MPI_GATHER
31 
32 #ifdef USE_MPI_GATHER
33 #include <mpi.h>
34 #endif
35 
36 Communicator::Communicator(int nDim, const int *commDims, QudaCommsMap rank_from_coords, void *map_data,
37  bool user_set_comm_handle_, void *user_comm)
38 {
39  user_set_comm_handle = user_set_comm_handle_;
40 
41  int initialized;
42  MPI_CHECK(MPI_Initialized(&initialized));
43 
44  if (!initialized) { assert(false); }
45 
47  // The logic here being: previouly all QMP calls are based on QMP_comm_get_default(), and user can
48  // feed in their own MPI_COMM_HANDLE. Now with the following this behavior should remain the same.
49  QMP_COMM_HANDLE = QMP_comm_get_default();
50  MPI_COMM_HANDLE = *((MPI_Comm *)user_comm);
51  } else {
52  QMP_COMM_HANDLE = QMP_comm_get_default();
53  void *my_comm;
54  QMP_get_mpi_comm(QMP_COMM_HANDLE, &my_comm);
55  MPI_COMM_HANDLE = *((MPI_Comm *)my_comm);
56  }
57 
58  is_qmp_handle_default = true; // the QMP handle is the default one.
59 
60  comm_init(nDim, commDims, rank_from_coords, map_data);
61 }
62 
63 Communicator::Communicator(Communicator &other, const int *comm_split)
64 {
65  user_set_comm_handle = false;
66 
67  constexpr int nDim = 4;
68 
69  quda::CommKey comm_dims_split;
70 
71  quda::CommKey comm_key_split;
72  quda::CommKey comm_color_split;
73 
74  for (int d = 0; d < nDim; d++) {
75  assert(other.comm_dim(d) % comm_split[d] == 0);
76  comm_dims_split[d] = other.comm_dim(d) / comm_split[d];
77  comm_key_split[d] = other.comm_coord(d) % comm_dims_split[d];
78  comm_color_split[d] = other.comm_coord(d) / comm_dims_split[d];
79  }
80 
81  int key = index(nDim, comm_dims_split.data(), comm_key_split.data());
82  int color = index(nDim, comm_split, comm_color_split.data());
83 
84  QMP_comm_split(other.QMP_COMM_HANDLE, color, key, &QMP_COMM_HANDLE);
85  void *my_comm;
86  QMP_get_mpi_comm(QMP_COMM_HANDLE, &my_comm);
87  MPI_COMM_HANDLE = *((MPI_Comm *)my_comm);
88 
89  is_qmp_handle_default = false; // the QMP handle is not the default one.
90 
91  int my_rank_ = QMP_get_node_number();
92 
94  comm_init(nDim, comm_dims_split.data(), func, comm_dims_split.data());
95 
96  printf("Creating split communicator, base_rank = %5d, key = %5d, color = %5d, split_rank = %5d, gpuid = %d\n",
97  other.comm_rank(), key, color, my_rank_, comm_gpuid());
98 }
99 
101 {
102  comm_finalize();
103  if (!(user_set_comm_handle || is_qmp_handle_default)) {
104  // - if it's a user set handle, or if it's the default QMP handle, we don't free it.
105  QMP_comm_free(QMP_COMM_HANDLE);
106  }
107 }
108 
109 // There are more efficient ways to do the following,
110 // but it doesn't really matter since this function should be
111 // called just once.
112 void Communicator::comm_gather_hostname(char *hostname_recv_buf)
113 {
114  // determine which GPU this rank will use
115  char *hostname = comm_hostname();
116 
117 #ifdef USE_MPI_GATHER
118  MPI_CHECK(MPI_Allgather(hostname, 128, MPI_CHAR, hostname_recv_buf, 128, MPI_CHAR, MPI_COMM_HANDLE));
119 #else
120  // Abuse reductions to emulate all-gather. We need to copy the
121  // local hostname to all other nodes
122  // this isn't very scalable though
123  for (int i = 0; i < comm_size(); i++) {
124  int data[128];
125  for (int j = 0; j < 128; j++) {
126  data[j] = (i == comm_rank()) ? hostname[j] : 0;
127  QMP_comm_sum_int(QMP_COMM_HANDLE, data + j);
128  hostname_recv_buf[i * 128 + j] = data[j];
129  }
130  }
131 #endif
132 }
133 
134 // There are more efficient ways to do the following,
135 // but it doesn't really matter since this function should be
136 // called just once.
137 void Communicator::comm_gather_gpuid(int *gpuid_recv_buf)
138 {
139 
140 #ifdef USE_MPI_GATHER
141  int gpuid = comm_gpuid();
142  MPI_CHECK(MPI_Allgather(&gpuid, 1, MPI_INT, gpuid_recv_buf, 1, MPI_INT, MPI_COMM_HANDLE));
143 #else
144  // Abuse reductions to emulate all-gather. We need to copy the
145  // local gpu to all other nodes
146  for (int i = 0; i < comm_size(); i++) {
147  int data = (i == comm_rank()) ? comm_gpuid() : 0;
148  QMP_comm_sum_int(QMP_COMM_HANDLE, &data);
149  gpuid_recv_buf[i] = data;
150  }
151 #endif
152 }
153 
154 void Communicator::comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
155 {
156  if (QMP_is_initialized() != QMP_TRUE) { errorQuda("QMP has not been initialized"); }
157 
158  int grid_size = 1;
159  for (int i = 0; i < ndim; i++) { grid_size *= dims[i]; }
160  if (grid_size != QMP_comm_get_number_of_nodes(QMP_COMM_HANDLE)) {
161  errorQuda("Communication grid size declared via initCommsGridQuda() does not match"
162  " total number of QMP nodes (%d != %d)",
163  grid_size, QMP_comm_get_number_of_nodes(QMP_COMM_HANDLE));
164  }
165 
166  comm_init_common(ndim, dims, rank_from_coords, map_data);
167 }
168 
169 int Communicator::comm_rank(void) { return QMP_comm_get_node_number(QMP_COMM_HANDLE); }
170 
171 int Communicator::comm_size(void) { return QMP_comm_get_number_of_nodes(QMP_COMM_HANDLE); }
172 
176 MsgHandle *Communicator::comm_declare_send_rank(void *buffer, int rank, int tag, size_t nbytes)
177 {
178  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
179 
180  mh->mem = QMP_declare_msgmem(buffer, nbytes);
181  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
182 
183  mh->handle = QMP_comm_declare_send_to(QMP_COMM_HANDLE, mh->mem, rank, 0);
184  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
185 
186  return mh;
187 }
188 
192 MsgHandle *Communicator::comm_declare_recv_rank(void *buffer, int rank, int tag, size_t nbytes)
193 {
194  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
195 
196  mh->mem = QMP_declare_msgmem(buffer, nbytes);
197  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
198 
199  mh->handle = QMP_comm_declare_receive_from(QMP_COMM_HANDLE, mh->mem, rank, 0);
200  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
201 
202  return mh;
203 }
204 
208 MsgHandle *Communicator::comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
209 {
211 
212  int rank = comm_rank_displaced(topo, displacement);
213  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
214 
215  mh->mem = QMP_declare_msgmem(buffer, nbytes);
216  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
217 
218  mh->handle = QMP_comm_declare_send_to(QMP_COMM_HANDLE, mh->mem, rank, 0);
219  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
220 
221  return mh;
222 }
223 
227 MsgHandle *Communicator::comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
228 {
230 
231  int rank = comm_rank_displaced(topo, displacement);
232  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
233 
234  mh->mem = QMP_declare_msgmem(buffer, nbytes);
235  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
236 
237  mh->handle = QMP_comm_declare_receive_from(QMP_COMM_HANDLE, mh->mem, rank, 0);
238  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
239 
240  return mh;
241 }
242 
247 MsgHandle *Communicator::comm_declare_strided_send_displaced(void *buffer, const int displacement[], size_t blksize,
248  int nblocks, size_t stride)
249 {
251 
252  int rank = comm_rank_displaced(topo, displacement);
253  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
254 
255  mh->mem = QMP_declare_strided_msgmem(buffer, blksize, nblocks, stride);
256  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
257 
258  mh->handle = QMP_comm_declare_send_to(QMP_COMM_HANDLE, mh->mem, rank, 0);
259  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
260 
261  return mh;
262 }
263 
268 MsgHandle *Communicator::comm_declare_strided_receive_displaced(void *buffer, const int displacement[], size_t blksize,
269  int nblocks, size_t stride)
270 {
272 
273  int rank = comm_rank_displaced(topo, displacement);
274  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
275 
276  mh->mem = QMP_declare_strided_msgmem(buffer, blksize, nblocks, stride);
277  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
278 
279  mh->handle = QMP_comm_declare_receive_from(QMP_COMM_HANDLE, mh->mem, rank, 0);
280  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
281 
282  return mh;
283 }
284 
286 {
287  QMP_free_msghandle(mh->handle);
288  QMP_free_msgmem(mh->mem);
289  host_free(mh);
290  mh = nullptr;
291 }
292 
293 void Communicator::comm_start(MsgHandle *mh) { QMP_CHECK(QMP_start(mh->handle)); }
294 
295 void Communicator::comm_wait(MsgHandle *mh) { QMP_CHECK(QMP_wait(mh->handle)); }
296 
297 int Communicator::comm_query(MsgHandle *mh) { return (QMP_is_complete(mh->handle) == QMP_TRUE); }
298 
299 void Communicator::comm_allreduce(double *data)
300 {
301  if (!comm_deterministic_reduce()) {
302  QMP_CHECK(QMP_comm_sum_double(QMP_COMM_HANDLE, data));
303  } else {
304  // we need to break out of QMP for the deterministic floating point reductions
305  const size_t n = comm_size();
306  double *recv_buf = (double *)safe_malloc(n * sizeof(double));
307  MPI_CHECK(MPI_Allgather(data, 1, MPI_DOUBLE, recv_buf, 1, MPI_DOUBLE, MPI_COMM_HANDLE));
308  *data = deterministic_reduce(recv_buf, n);
309  host_free(recv_buf);
310  }
311 }
312 
313 void Communicator::comm_allreduce_max(double *data) { QMP_CHECK(QMP_comm_max_double(QMP_COMM_HANDLE, data)); }
314 
315 void Communicator::comm_allreduce_min(double *data) { QMP_CHECK(QMP_comm_min_double(QMP_COMM_HANDLE, data)); }
316 
317 void Communicator::comm_allreduce_array(double *data, size_t size)
318 {
319  if (!comm_deterministic_reduce()) {
320  QMP_CHECK(QMP_comm_sum_double_array(QMP_COMM_HANDLE, data, size));
321  } else {
322  // we need to break out of QMP for the deterministic floating point reductions
323  size_t n = comm_size();
324  double *recv_buf = new double[size * n];
325  MPI_CHECK(MPI_Allgather(data, size, MPI_DOUBLE, recv_buf, size, MPI_DOUBLE, MPI_COMM_HANDLE));
326 
327  double *recv_trans = new double[size * n];
328  for (size_t i = 0; i < n; i++) {
329  for (size_t j = 0; j < size; j++) { recv_trans[j * n + i] = recv_buf[i * size + j]; }
330  }
331 
332  for (size_t i = 0; i < size; i++) { data[i] = deterministic_reduce(recv_trans + i * n, n); }
333 
334  delete[] recv_buf;
335  delete[] recv_trans;
336  }
337 }
338 
339 void Communicator::comm_allreduce_max_array(double *data, size_t size)
340 {
341 
342  for (size_t i = 0; i < size; i++) { QMP_CHECK(QMP_comm_max_double(QMP_COMM_HANDLE, data + i)); }
343 }
344 
345 void Communicator::comm_allreduce_int(int *data) { QMP_CHECK(QMP_comm_sum_int(QMP_COMM_HANDLE, data)); }
346 
347 void Communicator::comm_allreduce_xor(uint64_t *data)
348 {
349  if (sizeof(uint64_t) != sizeof(unsigned long)) errorQuda("unsigned long is not 64-bit");
350  QMP_CHECK(QMP_comm_xor_ulong(QMP_COMM_HANDLE, reinterpret_cast<unsigned long *>(data)));
351 }
352 
353 void Communicator::comm_broadcast(void *data, size_t nbytes)
354 {
355  QMP_CHECK(QMP_comm_broadcast(QMP_COMM_HANDLE, data, nbytes));
356 }
357 
358 void Communicator::comm_barrier(void) { QMP_CHECK(QMP_comm_barrier(QMP_COMM_HANDLE)); }
359 
360 void Communicator::comm_abort_(int status) { QMP_abort(status); }
361 
362 int Communicator::comm_rank_global() { return QMP_get_node_number(); }
int comm_rank_displaced(const Topology *topo, const int displacement[])
char * comm_hostname(void)
Definition: comm_common.cpp:10
int(* QudaCommsMap)(const int *coords, void *fdata)
Definition: comm_quda.h:12
#define MPI_CHECK(mpi_call)
#define QMP_CHECK(qmp_call)
int lex_rank_from_coords_dim_t(const int *coords, void *fdata)
#define safe_malloc(size)
Definition: malloc_quda.h:106
#define host_free(ptr)
Definition: malloc_quda.h:115
_EXTERN_C_ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: nvtx_pmpi.c:340
int comm_dim(int dim)
bool comm_deterministic_reduce()
void comm_allreduce_max(double *data)
T deterministic_reduce(T *array, int n)
void comm_wait(MsgHandle *mh)
void comm_allreduce(double *data)
void comm_broadcast(void *data, size_t nbytes)
int comm_query(MsgHandle *mh)
void comm_gather_gpuid(int *gpuid_recv_buf)
void comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
void comm_allreduce_min(double *data)
int comm_coord(int dim)
static int comm_rank_global()
MsgHandle * comm_declare_strided_send_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
void comm_barrier(void)
static void comm_abort_(int status)
MsgHandle * comm_declare_send_rank(void *buffer, int rank, int tag, size_t nbytes)
MsgHandle * comm_declare_strided_receive_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
void comm_allreduce_max_array(double *data, size_t size)
MsgHandle * comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
void comm_gather_hostname(char *hostname_recv_buf)
void comm_allreduce_xor(uint64_t *data)
void comm_allreduce_int(int *data)
MsgHandle * comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
static int gpuid
void comm_free(MsgHandle *&mh)
Topology * comm_default_topology(void)
MsgHandle * comm_declare_recv_rank(void *buffer, int rank, int tag, size_t nbytes)
void comm_finalize(void)
static int comm_gpuid()
void comm_init_common(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
void comm_start(MsgHandle *mh)
void comm_allreduce_array(double *data, size_t size)
QMP_msgmem_t mem
QMP_msghandle_t handle
constexpr int * data()
Definition: comm_key.h:18
#define errorQuda(...)
Definition: util_quda.h:120