QUDA  1.0.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
comm_qmp.cpp
Go to the documentation of this file.
1 #include <qmp.h>
2 #include <csignal>
3 #include <algorithm>
4 #include <numeric>
5 #include <quda_internal.h>
6 #include <comm_quda.h>
7 #include <mpi_comm_handle.h>
8 
9 #define QMP_CHECK(qmp_call) do { \
10  QMP_status_t status = qmp_call; \
11  if (status != QMP_SUCCESS) \
12  errorQuda("(QMP) %s", QMP_error_string(status)); \
13 } while (0)
14 
15 #define MPI_CHECK(mpi_call) \
16  do { \
17  int status = mpi_call; \
18  if (status != MPI_SUCCESS) { \
19  char err_string[128]; \
20  int err_len; \
21  MPI_Error_string(status, err_string, &err_len); \
22  err_string[127] = '\0'; \
23  errorQuda("(MPI) %s", err_string); \
24  } \
25  } while (0)
26 
27 struct MsgHandle_s {
28  QMP_msgmem_t mem;
29  QMP_msghandle_t handle;
30 };
31 
32 // While we can emulate an all-gather using QMP reductions, this
33 // scales horribly as the number of nodes increases, so for
34 // performance we just call MPI directly
35 #define USE_MPI_GATHER
36 
37 #ifdef USE_MPI_GATHER
38 #include <mpi.h>
39 #endif
40 
41 // There are more efficient ways to do the following,
42 // but it doesn't really matter since this function should be
43 // called just once.
44 void comm_gather_hostname(char *hostname_recv_buf) {
45  // determine which GPU this rank will use
46  char *hostname = comm_hostname();
47 
48 #ifdef USE_MPI_GATHER
49  MPI_CHECK(MPI_Allgather(hostname, 128, MPI_CHAR, hostname_recv_buf, 128, MPI_CHAR, MPI_COMM_HANDLE));
50 #else
51  // Abuse reductions to emulate all-gather. We need to copy the
52  // local hostname to all other nodes
53  // this isn't very scalable though
54  for (int i=0; i<comm_size(); i++) {
55  int data[128];
56  for (int j=0; j<128; j++) {
57  data[j] = (i == comm_rank()) ? hostname[j] : 0;
58  QMP_sum_int(data+j);
59  hostname_recv_buf[i*128 + j] = data[j];
60  }
61  }
62 #endif
63 
64 }
65 
66 
67 // There are more efficient ways to do the following,
68 // but it doesn't really matter since this function should be
69 // called just once.
70 void comm_gather_gpuid(int *gpuid_recv_buf) {
71 
72 #ifdef USE_MPI_GATHER
73  int gpuid = comm_gpuid();
74  MPI_CHECK(MPI_Allgather(&gpuid, 1, MPI_INT, gpuid_recv_buf, 1, MPI_INT, MPI_COMM_HANDLE));
75 #else
76  // Abuse reductions to emulate all-gather. We need to copy the
77  // local gpu to all other nodes
78  for (int i=0; i<comm_size(); i++) {
79  int data = (i == comm_rank()) ? comm_gpuid() : 0;
80  QMP_sum_int(&data);
81  gpuid_recv_buf[i] = data;
82  }
83 #endif
84 }
85 
86 
87 void comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
88 {
89  if ( QMP_is_initialized() != QMP_TRUE ) {
90  errorQuda("QMP has not been initialized");
91  }
92 
93  int grid_size = 1;
94  for (int i = 0; i < ndim; i++) {
95  grid_size *= dims[i];
96  }
97  if (grid_size != QMP_get_number_of_nodes()) {
98  errorQuda("Communication grid size declared via initCommsGridQuda() does not match"
99  " total number of QMP nodes (%d != %d)", grid_size, QMP_get_number_of_nodes());
100  }
101 
102  comm_init_common(ndim, dims, rank_from_coords, map_data);
103 }
104 
105 int comm_rank(void)
106 {
107  return QMP_get_node_number();
108 }
109 
110 
111 int comm_size(void)
112 {
113  return QMP_get_number_of_nodes();
114 }
115 
116 
120 MsgHandle *comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
121 {
123 
124  int rank = comm_rank_displaced(topo, displacement);
125  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
126 
127  mh->mem = QMP_declare_msgmem(buffer, nbytes);
128  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
129 
130  mh->handle = QMP_declare_send_to(mh->mem, rank, 0);
131  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
132 
133  return mh;
134 }
135 
139 MsgHandle *comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
140 {
142 
143  int rank = comm_rank_displaced(topo, displacement);
144  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
145 
146  mh->mem = QMP_declare_msgmem(buffer, nbytes);
147  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
148 
149  mh->handle = QMP_declare_receive_from(mh->mem, rank, 0);
150  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
151 
152  return mh;
153 }
154 
155 
160 MsgHandle *comm_declare_strided_send_displaced(void *buffer, const int displacement[],
161  size_t blksize, int nblocks, size_t stride)
162 {
164 
165  int rank = comm_rank_displaced(topo, displacement);
166  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
167 
168  mh->mem = QMP_declare_strided_msgmem(buffer, blksize, nblocks, stride);
169  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
170 
171  mh->handle = QMP_declare_send_to(mh->mem, rank, 0);
172  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
173 
174  return mh;
175 }
176 
181 MsgHandle *comm_declare_strided_receive_displaced(void *buffer, const int displacement[],
182  size_t blksize, int nblocks, size_t stride)
183 {
185 
186  int rank = comm_rank_displaced(topo, displacement);
187  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
188 
189  mh->mem = QMP_declare_strided_msgmem(buffer, blksize, nblocks, stride);
190  if (mh->mem == NULL) errorQuda("Unable to allocate QMP message memory");
191 
192  mh->handle = QMP_declare_receive_from(mh->mem, rank, 0);
193  if (mh->handle == NULL) errorQuda("Unable to allocate QMP message handle");
194 
195  return mh;
196 }
197 
199 {
200  QMP_free_msghandle(mh->handle);
201  QMP_free_msgmem(mh->mem);
202  host_free(mh);
203  mh = nullptr;
204 }
205 
206 
208 {
209  QMP_CHECK( QMP_start(mh->handle) );
210 }
211 
212 
214 {
215  QMP_CHECK( QMP_wait(mh->handle) );
216 }
217 
218 
220 {
221  return (QMP_is_complete(mh->handle) == QMP_TRUE);
222 }
223 
224 template <typename T> T deterministic_reduce(T *array, int n)
225 {
226  std::sort(array, array + n); // sort reduction into ascending order for deterministic reduction
227  return std::accumulate(array, array + n, 0.0);
228 }
229 
230 void comm_allreduce(double* data)
231 {
232  if (!comm_deterministic_reduce()) {
233  QMP_CHECK(QMP_sum_double(data));
234  } else {
235  // we need to break out of QMP for the deterministic floating point reductions
236  const size_t n = comm_size();
237  double *recv_buf = (double *)safe_malloc(n * sizeof(double));
238  MPI_CHECK(MPI_Allgather(data, 1, MPI_DOUBLE, recv_buf, 1, MPI_DOUBLE, MPI_COMM_HANDLE));
239  *data = deterministic_reduce(recv_buf, n);
240  host_free(recv_buf);
241  }
242 }
243 
244 
245 void comm_allreduce_max(double* data)
246 {
247  QMP_CHECK( QMP_max_double(data) );
248 }
249 
250 void comm_allreduce_min(double* data)
251 {
252  QMP_CHECK( QMP_min_double(data) );
253 }
254 
255 
256 void comm_allreduce_array(double* data, size_t size)
257 {
258  if (!comm_deterministic_reduce()) {
259  QMP_CHECK(QMP_sum_double_array(data, size));
260  } else {
261  // we need to break out of QMP for the deterministic floating point reductions
262  size_t n = comm_size();
263  double *recv_buf = new double[size * n];
264  MPI_CHECK(MPI_Allgather(data, size, MPI_DOUBLE, recv_buf, size, MPI_DOUBLE, MPI_COMM_HANDLE));
265 
266  double *recv_trans = new double[size * n];
267  for (size_t i = 0; i < n; i++) {
268  for (size_t j = 0; j < size; j++) { recv_trans[j * n + i] = recv_buf[i * size + j]; }
269  }
270 
271  for (size_t i = 0; i < size; i++) { data[i] = deterministic_reduce(recv_trans + i * n, n); }
272 
273  delete[] recv_buf;
274  delete[] recv_trans;
275  }
276 }
277 
278 void comm_allreduce_max_array(double* data, size_t size)
279 {
280  for (size_t i = 0; i < size; i++) { QMP_CHECK(QMP_max_double(data + i)); }
281 }
282 
283 void comm_allreduce_int(int* data)
284 {
285  QMP_CHECK( QMP_sum_int(data) );
286 }
287 
288 void comm_allreduce_xor(uint64_t *data)
289 {
290  if (sizeof(uint64_t) != sizeof(unsigned long)) errorQuda("unsigned long is not 64-bit");
291  QMP_CHECK( QMP_xor_ulong( reinterpret_cast<unsigned long*>(data) ));
292 }
293 
294 void comm_broadcast(void *data, size_t nbytes)
295 {
296  QMP_CHECK( QMP_broadcast(data, nbytes) );
297 }
298 
299 
300 void comm_barrier(void)
301 {
302  QMP_CHECK( QMP_barrier() );
303 }
304 
305 
306 void comm_abort(int status)
307 {
308 #ifdef HOST_DEBUG
309  raise(SIGINT);
310 #endif
311  QMP_abort(status);
312 }
void comm_gather_gpuid(int *gpuid_recv_buf)
Gather all GPU ids.
Definition: comm_qmp.cpp:70
QMP_msghandle_t handle
Definition: comm_qmp.cpp:29
int comm_query(MsgHandle *mh)
Definition: comm_qmp.cpp:219
_EXTERN_C_ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: nvtx_pmpi.c:340
void comm_free(MsgHandle *&mh)
Definition: comm_qmp.cpp:198
#define errorQuda(...)
Definition: util_quda.h:121
#define host_free(ptr)
Definition: malloc_quda.h:71
QMP_msgmem_t mem
Definition: comm_qmp.cpp:28
static int rank
Definition: comm_mpi.cpp:44
void comm_allreduce_xor(uint64_t *data)
Definition: comm_qmp.cpp:288
Topology * comm_default_topology(void)
void comm_wait(MsgHandle *mh)
Definition: comm_qmp.cpp:213
int comm_gpuid(void)
static int size
Definition: comm_mpi.cpp:45
void comm_start(MsgHandle *mh)
Definition: comm_qmp.cpp:207
static int ndim
Definition: layout_hyper.c:53
void comm_allreduce_max_array(double *data, size_t size)
Definition: comm_qmp.cpp:278
MsgHandle * comm_declare_strided_send_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
Definition: comm_qmp.cpp:160
char * comm_hostname(void)
Definition: comm_common.cpp:58
MsgHandle * comm_declare_strided_receive_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
Definition: comm_qmp.cpp:181
void comm_allreduce(double *data)
Definition: comm_qmp.cpp:230
void comm_abort(int status)
Definition: comm_qmp.cpp:306
#define MPI_CHECK(mpi_call)
Definition: comm_qmp.cpp:15
#define QMP_CHECK(qmp_call)
Definition: comm_qmp.cpp:9
int comm_rank_displaced(const Topology *topo, const int displacement[])
int(* QudaCommsMap)(const int *coords, void *fdata)
Definition: comm_quda.h:12
void comm_init_common(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
Initialize the communications common to all communications abstractions.
void comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
Initialize the communications, implemented in comm_single.cpp, comm_qmp.cpp, and comm_mpi.cpp.
Definition: comm_qmp.cpp:87
#define safe_malloc(size)
Definition: malloc_quda.h:66
bool comm_deterministic_reduce()
void comm_allreduce_max(double *data)
Definition: comm_qmp.cpp:245
static int dims[4]
Definition: face_gauge.cpp:41
static int gpuid
int comm_size(void)
Definition: comm_qmp.cpp:111
T deterministic_reduce(T *array, int n)
Definition: comm_qmp.cpp:224
void comm_allreduce_int(int *data)
Definition: comm_qmp.cpp:283
MsgHandle * comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
Definition: comm_qmp.cpp:139
void comm_allreduce_array(double *data, size_t size)
Definition: comm_qmp.cpp:256
void comm_broadcast(void *data, size_t nbytes)
Definition: comm_qmp.cpp:294
void comm_gather_hostname(char *hostname_recv_buf)
Gather all hostnames.
Definition: comm_qmp.cpp:44
void comm_allreduce_min(double *data)
Definition: comm_qmp.cpp:250
MsgHandle * comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
Definition: comm_qmp.cpp:120
int comm_rank(void)
Definition: comm_qmp.cpp:105
void comm_barrier(void)
Definition: comm_qmp.cpp:300