QUDA  0.9.0
comm_mpi.cpp
Go to the documentation of this file.
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <mpi.h>
5 #include <csignal>
6 #include <quda_internal.h>
7 #include <comm_quda.h>
8 
9 
10 #define MPI_CHECK(mpi_call) do { \
11  int status = mpi_call; \
12  if (status != MPI_SUCCESS) { \
13  char err_string[128]; \
14  int err_len; \
15  MPI_Error_string(status, err_string, &err_len); \
16  err_string[127] = '\0'; \
17  errorQuda("(MPI) %s", err_string); \
18  } \
19 } while (0)
20 
21 
22 struct MsgHandle_s {
27  MPI_Request request;
28 
33  MPI_Datatype datatype;
34 
39  bool custom;
40 };
41 
42 static int rank = -1;
43 static int size = -1;
44 static int gpuid = -1;
45 
46 static char partition_string[16];
47 static char topology_string[16];
48 
49 
50 void comm_gather_hostname(char *hostname_recv_buf) {
51  // determine which GPU this rank will use
52  char *hostname = comm_hostname();
53  MPI_CHECK( MPI_Allgather(hostname, 128, MPI_CHAR, hostname_recv_buf, 128, MPI_CHAR, MPI_COMM_WORLD) );
54 }
55 
56 void comm_gather_gpuid(int *gpuid_recv_buf) {
57  MPI_CHECK(MPI_Allgather(&gpuid, 1, MPI_INT, gpuid_recv_buf, 1, MPI_INT, MPI_COMM_WORLD));
58 }
59 
60 
61 void comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
62 {
63  int initialized;
64  MPI_CHECK( MPI_Initialized(&initialized) );
65 
66  if (!initialized) {
67  errorQuda("MPI has not been initialized");
68  }
69 
70  MPI_CHECK( MPI_Comm_rank(MPI_COMM_WORLD, &rank) );
71  MPI_CHECK( MPI_Comm_size(MPI_COMM_WORLD, &size) );
72 
73  int grid_size = 1;
74  for (int i = 0; i < ndim; i++) {
75  grid_size *= dims[i];
76  }
77  if (grid_size != size) {
78  errorQuda("Communication grid size declared via initCommsGridQuda() does not match"
79  " total number of MPI ranks (%d != %d)", grid_size, size);
80  }
81 
82  Topology *topo = comm_create_topology(ndim, dims, rank_from_coords, map_data);
84 
85  // determine which GPU this MPI rank will use
86  char *hostname_recv_buf = (char *)safe_malloc(128*size);
87 
88  comm_gather_hostname(hostname_recv_buf);
89 
90  gpuid = 0;
91  for (int i = 0; i < rank; i++) {
92  if (!strncmp(comm_hostname(), &hostname_recv_buf[128*i], 128)) {
93  gpuid++;
94  }
95  }
96 
97  int device_count;
98  cudaGetDeviceCount(&device_count);
99  if (device_count == 0) {
100  errorQuda("No CUDA devices found");
101  }
102  if (gpuid >= device_count) {
103  char *enable_mps_env = getenv("QUDA_ENABLE_MPS");
104  if (enable_mps_env && strcmp(enable_mps_env,"1") == 0) {
105  gpuid = gpuid%device_count;
106  printf("MPS enabled, rank=%d -> gpu=%d\n", comm_rank(), gpuid);
107  } else {
108  errorQuda("Too few GPUs available on %s", comm_hostname());
109  }
110  }
111 
112  comm_peer2peer_init(hostname_recv_buf);
113 
114  host_free(hostname_recv_buf);
115 
117  snprintf(topology_string, 16, ",topo=%d%d%d%d", comm_dim(0), comm_dim(1), comm_dim(2), comm_dim(3));
118 }
119 
120 int comm_rank(void)
121 {
122  return rank;
123 }
124 
125 
126 int comm_size(void)
127 {
128  return size;
129 }
130 
131 
132 int comm_gpuid(void)
133 {
134  return gpuid;
135 }
136 
137 
138 static const int max_displacement = 4;
139 
140 static void check_displacement(const int displacement[], int ndim) {
141  for (int i=0; i<ndim; i++) {
142  if (abs(displacement[i]) > max_displacement){
143  errorQuda("Requested displacement[%d] = %d is greater than maximum allowed", i, displacement[i]);
144  }
145  }
146 }
147 
151 MsgHandle *comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
152 {
154  int ndim = comm_ndim(topo);
155  check_displacement(displacement, ndim);
156 
157  int rank = comm_rank_displaced(topo, displacement);
158 
159  int tag = 0;
160  for (int i=ndim-1; i>=0; i--) tag = tag * 4 * max_displacement + displacement[i] + max_displacement;
161  tag = tag >= 0 ? tag : 2*pow(4*max_displacement,ndim) + tag;
162 
163  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
164  MPI_CHECK( MPI_Send_init(buffer, nbytes, MPI_BYTE, rank, tag, MPI_COMM_WORLD, &(mh->request)) );
165  mh->custom = false;
166 
167  return mh;
168 }
169 
170 
174 MsgHandle *comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
175 {
177  int ndim = comm_ndim(topo);
178  check_displacement(displacement,ndim);
179 
180  int rank = comm_rank_displaced(topo, displacement);
181 
182  int tag = 0;
183  for (int i=ndim-1; i>=0; i--) tag = tag * 4 * max_displacement - displacement[i] + max_displacement;
184  tag = tag >= 0 ? tag : 2*pow(4*max_displacement,ndim) + tag;
185 
186  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
187  MPI_CHECK( MPI_Recv_init(buffer, nbytes, MPI_BYTE, rank, tag, MPI_COMM_WORLD, &(mh->request)) );
188  mh->custom = false;
189 
190  return mh;
191 }
192 
193 
197 MsgHandle *comm_declare_strided_send_displaced(void *buffer, const int displacement[],
198  size_t blksize, int nblocks, size_t stride)
199 {
201  int ndim = comm_ndim(topo);
202  check_displacement(displacement, ndim);
203 
204  int rank = comm_rank_displaced(topo, displacement);
205 
206  int tag = 0;
207  for (int i=ndim-1; i>=0; i--) tag = tag * 4 * max_displacement + displacement[i] + max_displacement;
208  tag = tag >= 0 ? tag : 2*pow(4*max_displacement,ndim) + tag;
209 
210  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
211 
212  // create a new strided MPI type
213  MPI_CHECK( MPI_Type_vector(nblocks, blksize, stride, MPI_BYTE, &(mh->datatype)) );
214  MPI_CHECK( MPI_Type_commit(&(mh->datatype)) );
215  mh->custom = true;
216 
217  MPI_CHECK( MPI_Send_init(buffer, 1, mh->datatype, rank, tag, MPI_COMM_WORLD, &(mh->request)) );
218 
219  return mh;
220 }
221 
222 
226 MsgHandle *comm_declare_strided_receive_displaced(void *buffer, const int displacement[],
227  size_t blksize, int nblocks, size_t stride)
228 {
230  int ndim = comm_ndim(topo);
231  check_displacement(displacement,ndim);
232 
233  int rank = comm_rank_displaced(topo, displacement);
234 
235  int tag = 0;
236  for (int i=ndim-1; i>=0; i--) tag = tag * 4 * max_displacement - displacement[i] + max_displacement;
237  tag = tag >= 0 ? tag : 2*pow(4*max_displacement,ndim) + tag;
238 
239  MsgHandle *mh = (MsgHandle *)safe_malloc(sizeof(MsgHandle));
240 
241  // create a new strided MPI type
242  MPI_CHECK( MPI_Type_vector(nblocks, blksize, stride, MPI_BYTE, &(mh->datatype)) );
243  MPI_CHECK( MPI_Type_commit(&(mh->datatype)) );
244  mh->custom = true;
245 
246  MPI_CHECK( MPI_Recv_init(buffer, 1, mh->datatype, rank, tag, MPI_COMM_WORLD, &(mh->request)) );
247 
248  return mh;
249 }
250 
251 
253 {
254  MPI_CHECK(MPI_Request_free(&(mh->request)));
255  if (mh->custom) MPI_CHECK(MPI_Type_free(&(mh->datatype)));
256  host_free(mh);
257 }
258 
259 
261 {
262  MPI_CHECK( MPI_Start(&(mh->request)) );
263 }
264 
265 
267 {
268  MPI_CHECK( MPI_Wait(&(mh->request), MPI_STATUS_IGNORE) );
269 }
270 
271 
273 {
274  int query;
275  MPI_CHECK( MPI_Test(&(mh->request), &query, MPI_STATUS_IGNORE) );
276 
277  return query;
278 }
279 
280 
281 void comm_allreduce(double* data)
282 {
283  double recvbuf;
284  MPI_CHECK( MPI_Allreduce(data, &recvbuf, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD) );
285  *data = recvbuf;
286 }
287 
288 
289 void comm_allreduce_max(double* data)
290 {
291  double recvbuf;
292  MPI_CHECK( MPI_Allreduce(data, &recvbuf, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD) );
293  *data = recvbuf;
294 }
295 
296 void comm_allreduce_array(double* data, size_t size)
297 {
298  double *recvbuf = new double[size];
299  MPI_CHECK( MPI_Allreduce(data, recvbuf, size, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD) );
300  memcpy(data, recvbuf, size*sizeof(double));
301  delete []recvbuf;
302 }
303 
304 
305 void comm_allreduce_int(int* data)
306 {
307  int recvbuf;
308  MPI_CHECK( MPI_Allreduce(data, &recvbuf, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD) );
309  *data = recvbuf;
310 }
311 
313 {
314  if (sizeof(uint64_t) != sizeof(unsigned long)) errorQuda("unsigned long is not 64-bit");
315  uint64_t recvbuf;
316  MPI_CHECK( MPI_Allreduce(data, &recvbuf, 1, MPI_UNSIGNED_LONG, MPI_BXOR, MPI_COMM_WORLD) );
317  *data = recvbuf;
318 }
319 
320 
322 void comm_broadcast(void *data, size_t nbytes)
323 {
324  MPI_CHECK( MPI_Bcast(data, (int)nbytes, MPI_BYTE, 0, MPI_COMM_WORLD) );
325 }
326 
327 
328 void comm_barrier(void)
329 {
330  MPI_CHECK( MPI_Barrier(MPI_COMM_WORLD) );
331 }
332 
333 
334 void comm_abort(int status)
335 {
336 #ifdef HOST_DEBUG
337  raise(SIGINT);
338 #endif
339  MPI_Abort(MPI_COMM_WORLD, status) ;
340 }
341 
343  return partition_string;
344 }
345 
347  return topology_string;
348 }
void comm_allreduce(double *data)
Definition: comm_mpi.cpp:281
MsgHandle * comm_declare_send_displaced(void *buffer, const int displacement[], size_t nbytes)
Definition: comm_mpi.cpp:151
void comm_gather_hostname(char *hostname_recv_buf)
Gather all hostnames.
Definition: comm_mpi.cpp:50
_EXTERN_C_ int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Definition: nvtx_pmpi.c:308
MPI_Request request
Definition: comm_mpi.cpp:27
_EXTERN_C_ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Definition: nvtx_pmpi.c:98
int comm_query(MsgHandle *mh)
Definition: comm_mpi.cpp:272
int snprintf(char *__str, size_t __size, const char *__format,...) __attribute__((__format__(__printf__
Topology * comm_create_topology(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
Definition: comm_common.cpp:94
void comm_peer2peer_init(const char *hostname_recv_buf)
void comm_wait(MsgHandle *mh)
Definition: comm_mpi.cpp:266
void comm_allreduce_array(double *data, size_t size)
Definition: comm_mpi.cpp:296
void comm_allreduce_max(double *data)
Definition: comm_mpi.cpp:289
_EXTERN_C_ int MPI_Wait(MPI_Request *request, MPI_Status *status)
Definition: nvtx_pmpi.c:140
#define errorQuda(...)
Definition: util_quda.h:90
void comm_gather_gpuid(int *gpuid_recv_buf)
Gather all GPU ids.
Definition: comm_mpi.cpp:56
void comm_allreduce_int(int *data)
Definition: comm_mpi.cpp:305
#define host_free(ptr)
Definition: malloc_quda.h:59
int comm_dim(int dim)
static const int max_displacement
Definition: comm_mpi.cpp:138
#define MPI_CHECK(mpi_call)
Definition: comm_mpi.cpp:10
static int rank
Definition: comm_mpi.cpp:42
Topology * comm_default_topology(void)
MsgHandle * comm_declare_strided_receive_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
Definition: comm_mpi.cpp:226
double pow(double, double)
static int size
Definition: comm_mpi.cpp:43
int comm_gpuid(void)
Definition: comm_mpi.cpp:132
const char * comm_dim_partitioned_string()
Return a string that defines the comm partitioning (used as a tuneKey)
Definition: comm_mpi.cpp:342
static char partition_string[16]
Definition: comm_mpi.cpp:46
static int ndim
Definition: layout_hyper.c:53
_EXTERN_C_ int MPI_Barrier(MPI_Comm comm)
Definition: nvtx_pmpi.c:455
int strcmp(const char *__s1, const char *__s2)
char * comm_hostname(void)
Definition: comm_common.cpp:58
int comm_rank(void)
Definition: comm_mpi.cpp:120
void comm_start(MsgHandle *mh)
Definition: comm_mpi.cpp:260
void comm_barrier(void)
Definition: comm_mpi.cpp:328
int printf(const char *,...) __attribute__((__format__(__printf__
_EXTERN_C_ int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)
Definition: nvtx_pmpi.c:497
static bool initialized
Profiler for initQuda.
void comm_init(int ndim, const int *dims, QudaCommsMap rank_from_coords, void *map_data)
Definition: comm_mpi.cpp:61
void comm_broadcast(void *data, size_t nbytes)
Definition: comm_mpi.cpp:322
int comm_rank_displaced(const Topology *topo, const int displacement[])
unsigned long long uint64_t
_EXTERN_C_ int MPI_Send_init(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
Definition: nvtx_pmpi.c:518
static int gpuid
Definition: comm_mpi.cpp:44
int(* QudaCommsMap)(const int *coords, void *fdata)
Definition: comm_quda.h:12
MPI_Datatype datatype
Definition: comm_mpi.cpp:33
_EXTERN_C_ int MPI_Recv_init(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)
Definition: nvtx_pmpi.c:539
void * memcpy(void *__dst, const void *__src, size_t __n)
#define safe_malloc(size)
Definition: malloc_quda.h:54
_EXTERN_C_ int MPI_Start(MPI_Request *request)
Definition: nvtx_pmpi.c:476
int abs(int) __attribute__((const))
bool custom
Definition: comm_mpi.cpp:39
MsgHandle * comm_declare_receive_displaced(void *buffer, const int displacement[], size_t nbytes)
Definition: comm_mpi.cpp:174
int strncmp(const char *__s1, const char *__s2, size_t __n)
void comm_set_default_topology(Topology *topo)
int comm_ndim(const Topology *topo)
void comm_allreduce_xor(uint64_t *data)
Definition: comm_mpi.cpp:312
static char topology_string[16]
Definition: comm_mpi.cpp:47
void comm_free(MsgHandle *mh)
Definition: comm_mpi.cpp:252
static void check_displacement(const int displacement[], int ndim)
Definition: comm_mpi.cpp:140
const char * comm_dim_topology_string()
Return a string that defines the comm topology (for use as a tuneKey)
Definition: comm_mpi.cpp:346
MsgHandle * comm_declare_strided_send_displaced(void *buffer, const int displacement[], size_t blksize, int nblocks, size_t stride)
Definition: comm_mpi.cpp:197
void comm_abort(int status)
Definition: comm_mpi.cpp:334
char * getenv(const char *)
_EXTERN_C_ int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
Definition: nvtx_pmpi.c:413
int comm_dim_partitioned(int dim)
int comm_size(void)
Definition: comm_mpi.cpp:126