QUDA  v1.1.0
A library for QCD on GPUs
reduce_helper.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <cub_helper.cuh>
4 
5 #ifdef QUAD_SUM
7 #else
8 using device_reduce_t = double;
9 #endif
10 
11 #ifdef HETEROGENEOUS_ATOMIC
12 #include <cuda/std/atomic>
13 using count_t = cuda::atomic<unsigned int, cuda::thread_scope_device>;
14 #else
15 using count_t = unsigned int;
16 #endif
17 
18 namespace quda
19 {
20 
21  namespace reducer
22  {
24  size_t buffer_size();
25 
28  void *get_host_buffer();
30  cudaEvent_t &get_event();
31  } // namespace reducer
32 
33  constexpr int max_n_reduce() { return QUDA_MAX_MULTI_REDUCE; }
34 
38  template <typename T> constexpr T init_value() { return -std::numeric_limits<T>::infinity(); }
39 
44  template <typename T> constexpr T terminate_value() { return std::numeric_limits<T>::infinity(); }
45 
51  template <typename T> struct atomic_type {
53  };
54  template <> struct atomic_type<float> {
55  using type = float;
56  };
57 
58  template <typename T> struct ReduceArg {
59 
60  qudaError_t launch_error; // only do complete if no launch error to avoid hang
61 
62  private:
63  const int n_reduce; // number of reductions of length n_item
64 #ifdef HETEROGENEOUS_ATOMIC
65  using system_atomic_t = typename atomic_type<T>::type;
66  static constexpr int n_item = sizeof(T) / sizeof(system_atomic_t);
67  cuda::atomic<T, cuda::thread_scope_device> *partial;
68  // for heterogeneous atomics we need to use lock-free atomics -> operate on scalars
69  cuda::atomic<system_atomic_t, cuda::thread_scope_system> *result_d;
70  cuda::atomic<system_atomic_t, cuda::thread_scope_system> *result_h;
71 #else
72  T *partial;
73  T *result_d;
74  T *result_h;
75 #endif
76  count_t *count;
77  bool consumed; // check to ensure that we don't complete more than once unless we explicitly reset
78 
79  public:
80  ReduceArg(int n_reduce = 1) :
82  n_reduce(n_reduce),
83  partial(static_cast<decltype(partial)>(reducer::get_device_buffer())),
84  result_d(static_cast<decltype(result_d)>(reducer::get_mapped_buffer())),
85  result_h(static_cast<decltype(result_h)>(reducer::get_host_buffer())),
86  count {reducer::get_count()},
87  consumed(false)
88  {
89  // check reduction buffers are large enough if requested
90  auto max_reduce_blocks = 2 * deviceProp.multiProcessorCount;
91  auto reduce_size = max_reduce_blocks * n_reduce * sizeof(*partial);
92  if (reduce_size > reducer::buffer_size())
93  errorQuda("Requested reduction requires a larger buffer %lu than allocated %lu", reduce_size,
95 
96 #ifdef HETEROGENEOUS_ATOMIC
97  if (!commAsyncReduction()) {
98  // initialize the result buffer so we can test for completion
99  for (int i = 0; i < n_reduce * n_item; i++) {
100  new (result_h + i) cuda::atomic<system_atomic_t, cuda::thread_scope_system> {init_value<system_atomic_t>()};
101  }
102  std::atomic_thread_fence(std::memory_order_release);
103  } else {
104  // write reduction to GPU memory if asynchronous (reuse partial)
105  result_d = nullptr;
106  }
107 #else
108  if (commAsyncReduction()) result_d = partial;
109 #endif
110  }
111 
124  template <typename host_t, typename device_t = host_t>
125  void complete(std::vector<host_t> &result, const qudaStream_t stream = 0, bool reset = false)
126  {
127  if (launch_error == QUDA_ERROR) return; // kernel launch failed so return
128  if (launch_error == QUDA_ERROR_UNINITIALIZED) errorQuda("No reduction kernel appears to have been launched");
129 #ifdef HETEROGENEOUS_ATOMIC
130  if (consumed) errorQuda("Cannot call complete more than once for each construction");
131 
132  for (int i = 0; i < n_reduce * n_item; i++) {
133  while (result_h[i].load(cuda::std::memory_order_relaxed) == init_value<system_atomic_t>()) {}
134  }
135 #else
136  auto event = reducer::get_event();
137  qudaEventRecord(event, stream);
138  while (!qudaEventQuery(event)) {}
139 #endif
140  // copy back result element by element and convert if necessary to host reduce type
141  // unit size here may differ from system_atomic_t size, e.g., if doing double-double
142  const int n_element = n_reduce * sizeof(T) / sizeof(device_t);
143  if (result.size() != (unsigned)n_element)
144  errorQuda("result vector length %lu does not match n_reduce %d", result.size(), n_reduce);
145  for (int i = 0; i < n_element; i++) result[i] = reinterpret_cast<device_t *>(result_h)[i];
146 
147 #ifdef HETEROGENEOUS_ATOMIC
148  if (!reset) {
149  consumed = true;
150  } else {
151  // reset the atomic counter - this allows multiple calls to complete with ReduceArg construction
152  for (int i = 0; i < n_reduce * n_item; i++) {
153  result_h[i].store(init_value<system_atomic_t>(), cuda::std::memory_order_relaxed);
154  }
155  std::atomic_thread_fence(std::memory_order_release);
156  }
157 #endif
158  }
159 
168  template <typename host_t, typename device_t = host_t>
169  void complete(host_t &result, const qudaStream_t stream = 0, bool reset = false)
170  {
171  std::vector<host_t> result_(1);
172  complete(result_, stream, reset);
173  result = result_[0];
174  }
175 
176 #ifdef HETEROGENEOUS_ATOMIC
188  template <int block_size_x, int block_size_y, bool do_sum = true, typename Reducer = cub::Sum>
189  __device__ inline void reduce2d(const T &in, const int idx = 0)
190  {
191  using BlockReduce = cub::BlockReduce<T, block_size_x, cub::BLOCK_REDUCE_WARP_REDUCTIONS, block_size_y>;
192  __shared__ typename BlockReduce::TempStorage cub_tmp;
193  __shared__ bool isLastBlockDone;
194 
195  Reducer r;
196  T aggregate = do_sum ? BlockReduce(cub_tmp).Sum(in) : BlockReduce(cub_tmp).Reduce(in, r);
197 
198  if (threadIdx.x == 0 && threadIdx.y == 0) {
199  // need to call placement new constructor since partial is not necessarily constructed
200  new (partial + idx * gridDim.x + blockIdx.x) cuda::atomic<T, cuda::thread_scope_device> {aggregate};
201 
202  // increment global block counter for this reduction
203  auto value = count[idx].fetch_add(1, cuda::std::memory_order_release);
204 
205  // determine if last block
206  isLastBlockDone = (value == (gridDim.x - 1));
207  }
208 
209  __syncthreads();
210 
211  // finish the reduction if last block
212  if (isLastBlockDone) {
213  auto i = threadIdx.y * block_size_x + threadIdx.x;
214  T sum;
215  zero(sum);
216  while (i < gridDim.x) {
217  sum = r(sum, partial[idx * gridDim.x + i].load(cuda::std::memory_order_relaxed));
218  i += block_size_x * block_size_y;
219  }
220 
221  sum = (do_sum ? BlockReduce(cub_tmp).Sum(sum) : BlockReduce(cub_tmp).Reduce(sum, r));
222 
223  // write out the final reduced value
224  if (threadIdx.y * block_size_x + threadIdx.x == 0) {
225  if (result_d) { // write to host mapped memory
226  using atomic_t = typename atomic_type<T>::type;
227  constexpr size_t n = sizeof(T) / sizeof(atomic_t);
228  atomic_t sum_tmp[n];
229  memcpy(sum_tmp, &sum, sizeof(sum));
230 #pragma unroll
231  for (int i = 0; i < n; i++) {
232  // catch the case where the computed value is equal to the init_value
233  sum_tmp[i] = sum_tmp[i] == init_value<atomic_t>() ? terminate_value<atomic_t>() : sum_tmp[i];
234  result_d[n * idx + i].store(sum_tmp[i], cuda::std::memory_order_relaxed);
235  }
236  } else { // write to device memory
237  partial[idx].store(sum, cuda::std::memory_order_relaxed);
238  }
239  count[idx].store(0, cuda::std::memory_order_relaxed); // set to zero for next time
240  }
241  }
242  }
243 
244 #else
256  template <int block_size_x, int block_size_y, bool do_sum = true, typename Reducer = cub::Sum>
257  __device__ inline void reduce2d(const T &in, const int idx = 0)
258  {
259  using BlockReduce = cub::BlockReduce<T, block_size_x, cub::BLOCK_REDUCE_WARP_REDUCTIONS, block_size_y>;
260  __shared__ typename BlockReduce::TempStorage cub_tmp;
261  __shared__ bool isLastBlockDone;
262 
263  Reducer r;
264  T aggregate = do_sum ? BlockReduce(cub_tmp).Sum(in) : BlockReduce(cub_tmp).Reduce(in, r);
265 
266  if (threadIdx.x == 0 && threadIdx.y == 0) {
267  partial[idx * gridDim.x + blockIdx.x] = aggregate;
268  __threadfence(); // flush result
269 
270  // increment global block counter
271  auto value = atomicInc(&count[idx], gridDim.x);
272 
273  // determine if last block
274  isLastBlockDone = (value == (gridDim.x - 1));
275  }
276 
277  __syncthreads();
278 
279  // finish the reduction if last block
280  if (isLastBlockDone) {
281  auto i = threadIdx.y * block_size_x + threadIdx.x;
282  T sum;
283  zero(sum);
284  while (i < gridDim.x) {
285  sum = r(sum, const_cast<T &>(static_cast<volatile T *>(partial)[idx * gridDim.x + i]));
286  i += block_size_x * block_size_y;
287  }
288 
289  sum = (do_sum ? BlockReduce(cub_tmp).Sum(sum) : BlockReduce(cub_tmp).Reduce(sum, r));
290 
291  // write out the final reduced value
292  if (threadIdx.y * block_size_x + threadIdx.x == 0) {
293  result_d[idx] = sum;
294  count[idx] = 0; // set to zero for next time
295  }
296  }
297  }
298 #endif
299 
300  template <int block_size, bool do_sum = true, typename Reducer = cub::Sum>
301  __device__ inline void reduce(const T &in, const int idx = 0)
302  {
303  reduce2d<block_size, 1, do_sum, Reducer>(in, idx);
304  }
305  };
306 
307 } // namespace quda
bool commAsyncReduction()
qudaError_t
Definition: enum_quda.h:10
@ QUDA_ERROR_UNINITIALIZED
Definition: enum_quda.h:10
@ QUDA_ERROR
Definition: enum_quda.h:10
void * get_device_buffer()
cudaEvent_t & get_event()
count_t * get_count()
void * get_host_buffer()
void * get_mapped_buffer()
size_t buffer_size()
constexpr T init_value()
The initialization value we used to check for completion.
Definition: reduce_helper.h:38
__device__ __host__ void zero(double &a)
Definition: float_vector.h:318
constexpr T terminate_value()
The termination value we use to prevent a possible hang in case the computed reduction is equal to th...
Definition: reduce_helper.h:44
qudaStream_t * stream
constexpr int max_n_reduce()
Definition: reduce_helper.h:33
__device__ T load(const T *src)
Definition: aos.h:256
__host__ __device__ T sum(const array< T, s > &a)
Definition: utility.h:76
#define qudaEventQuery(event)
Definition: quda_api.h:235
cudaDeviceProp deviceProp
Definition: device.cpp:14
#define qudaEventRecord(event, stream)
Definition: quda_api.h:238
cudaStream_t qudaStream_t
Definition: quda_api.h:9
#define QUDA_MAX_MULTI_REDUCE
Maximum number of simultaneous reductions that can take place. This number may be increased if needed...
double device_reduce_t
Definition: reduce_helper.h:8
unsigned int count_t
Definition: reduce_helper.h:15
__device__ void reduce(const T &in, const int idx=0)
__device__ void reduce2d(const T &in, const int idx=0)
Generic reduction function that reduces block-distributed data "in" per thread to a single value....
ReduceArg(int n_reduce=1)
Definition: reduce_helper.h:80
void complete(std::vector< host_t > &result, const qudaStream_t stream=0, bool reset=false)
Finalize the reduction, returning the computed reduction into result. With heterogeneous atomics this...
qudaError_t launch_error
Definition: reduce_helper.h:60
void complete(host_t &result, const qudaStream_t stream=0, bool reset=false)
Overload providing a simple reference interface.
The atomic word size we use for a given reduction type. This type should be lock-free to guarantee co...
Definition: reduce_helper.h:51
device_reduce_t type
Definition: reduce_helper.h:52
#define errorQuda(...)
Definition: util_quda.h:120