3 #include <cub_helper.cuh>
11 #ifdef HETEROGENEOUS_ATOMIC
12 #include <cuda/std/atomic>
13 using count_t = cuda::atomic<unsigned int, cuda::thread_scope_device>;
38 template <
typename T> constexpr T
init_value() {
return -std::numeric_limits<T>::infinity(); }
44 template <
typename T> constexpr T
terminate_value() {
return std::numeric_limits<T>::infinity(); }
64 #ifdef HETEROGENEOUS_ATOMIC
66 static constexpr
int n_item =
sizeof(T) /
sizeof(system_atomic_t);
67 cuda::atomic<T, cuda::thread_scope_device> *partial;
69 cuda::atomic<system_atomic_t, cuda::thread_scope_system> *result_d;
70 cuda::atomic<system_atomic_t, cuda::thread_scope_system> *result_h;
90 auto max_reduce_blocks = 2 *
deviceProp.multiProcessorCount;
91 auto reduce_size = max_reduce_blocks * n_reduce *
sizeof(*partial);
93 errorQuda(
"Requested reduction requires a larger buffer %lu than allocated %lu", reduce_size,
96 #ifdef HETEROGENEOUS_ATOMIC
99 for (
int i = 0; i < n_reduce * n_item; i++) {
100 new (result_h + i) cuda::atomic<system_atomic_t, cuda::thread_scope_system> {init_value<system_atomic_t>()};
102 std::atomic_thread_fence(std::memory_order_release);
124 template <
typename host_t,
typename device_t = host_t>
129 #ifdef HETEROGENEOUS_ATOMIC
130 if (consumed)
errorQuda(
"Cannot call complete more than once for each construction");
132 for (
int i = 0; i < n_reduce * n_item; i++) {
133 while (result_h[i].
load(cuda::std::memory_order_relaxed) == init_value<system_atomic_t>()) {}
142 const int n_element = n_reduce *
sizeof(T) /
sizeof(device_t);
143 if (result.size() != (
unsigned)n_element)
144 errorQuda(
"result vector length %lu does not match n_reduce %d", result.size(), n_reduce);
145 for (
int i = 0; i < n_element; i++) result[i] = reinterpret_cast<device_t *>(result_h)[i];
147 #ifdef HETEROGENEOUS_ATOMIC
152 for (
int i = 0; i < n_reduce * n_item; i++) {
153 result_h[i].store(init_value<system_atomic_t>(), cuda::std::memory_order_relaxed);
155 std::atomic_thread_fence(std::memory_order_release);
168 template <
typename host_t,
typename device_t = host_t>
171 std::vector<host_t> result_(1);
176 #ifdef HETEROGENEOUS_ATOMIC
188 template <
int block_size_x,
int block_size_y,
bool do_sum = true,
typename Reducer = cub::Sum>
189 __device__
inline void reduce2d(
const T &in,
const int idx = 0)
191 using BlockReduce = cub::BlockReduce<T, block_size_x, cub::BLOCK_REDUCE_WARP_REDUCTIONS, block_size_y>;
192 __shared__
typename BlockReduce::TempStorage cub_tmp;
193 __shared__
bool isLastBlockDone;
196 T aggregate = do_sum ? BlockReduce(cub_tmp).Sum(in) : BlockReduce(cub_tmp).Reduce(in, r);
198 if (threadIdx.x == 0 && threadIdx.y == 0) {
200 new (partial + idx * gridDim.x + blockIdx.x) cuda::atomic<T, cuda::thread_scope_device> {aggregate};
203 auto value = count[idx].fetch_add(1, cuda::std::memory_order_release);
206 isLastBlockDone = (value == (gridDim.x - 1));
212 if (isLastBlockDone) {
213 auto i = threadIdx.y * block_size_x + threadIdx.x;
216 while (i < gridDim.x) {
217 sum = r(
sum, partial[idx * gridDim.x + i].load(cuda::std::memory_order_relaxed));
218 i += block_size_x * block_size_y;
221 sum = (do_sum ? BlockReduce(cub_tmp).Sum(
sum) : BlockReduce(cub_tmp).Reduce(
sum, r));
224 if (threadIdx.y * block_size_x + threadIdx.x == 0) {
227 constexpr
size_t n =
sizeof(T) /
sizeof(atomic_t);
229 memcpy(sum_tmp, &
sum,
sizeof(
sum));
231 for (
int i = 0; i < n; i++) {
233 sum_tmp[i] = sum_tmp[i] == init_value<atomic_t>() ? terminate_value<atomic_t>() : sum_tmp[i];
234 result_d[n * idx + i].store(sum_tmp[i], cuda::std::memory_order_relaxed);
237 partial[idx].store(
sum, cuda::std::memory_order_relaxed);
239 count[idx].store(0, cuda::std::memory_order_relaxed);
256 template <
int block_size_x,
int block_size_y,
bool do_sum = true,
typename Reducer = cub::Sum>
257 __device__
inline void reduce2d(
const T &in,
const int idx = 0)
259 using BlockReduce = cub::BlockReduce<T, block_size_x, cub::BLOCK_REDUCE_WARP_REDUCTIONS, block_size_y>;
260 __shared__
typename BlockReduce::TempStorage cub_tmp;
261 __shared__
bool isLastBlockDone;
264 T aggregate = do_sum ? BlockReduce(cub_tmp).Sum(in) : BlockReduce(cub_tmp).Reduce(in, r);
266 if (threadIdx.x == 0 && threadIdx.y == 0) {
267 partial[idx * gridDim.x + blockIdx.x] = aggregate;
271 auto value = atomicInc(&count[idx], gridDim.x);
274 isLastBlockDone = (value == (gridDim.x - 1));
280 if (isLastBlockDone) {
281 auto i = threadIdx.y * block_size_x + threadIdx.x;
284 while (i < gridDim.x) {
285 sum = r(
sum,
const_cast<T &
>(
static_cast<volatile T *
>(partial)[idx * gridDim.x + i]));
286 i += block_size_x * block_size_y;
289 sum = (do_sum ? BlockReduce(cub_tmp).Sum(
sum) : BlockReduce(cub_tmp).Reduce(
sum, r));
292 if (threadIdx.y * block_size_x + threadIdx.x == 0) {
300 template <
int block_size,
bool do_sum = true,
typename Reducer = cub::Sum>
301 __device__
inline void reduce(
const T &in,
const int idx = 0)
303 reduce2d<block_size, 1, do_sum, Reducer>(in, idx);
bool commAsyncReduction()
@ QUDA_ERROR_UNINITIALIZED
void * get_device_buffer()
cudaEvent_t & get_event()
void * get_mapped_buffer()
constexpr T init_value()
The initialization value we used to check for completion.
__device__ __host__ void zero(double &a)
constexpr T terminate_value()
The termination value we use to prevent a possible hang in case the computed reduction is equal to th...
constexpr int max_n_reduce()
__device__ T load(const T *src)
__host__ __device__ T sum(const array< T, s > &a)
#define qudaEventQuery(event)
cudaDeviceProp deviceProp
#define qudaEventRecord(event, stream)
cudaStream_t qudaStream_t
#define QUDA_MAX_MULTI_REDUCE
Maximum number of simultaneous reductions that can take place. This number may be increased if needed...
__device__ void reduce(const T &in, const int idx=0)
__device__ void reduce2d(const T &in, const int idx=0)
Generic reduction function that reduces block-distributed data "in" per thread to a single value....
ReduceArg(int n_reduce=1)
void complete(std::vector< host_t > &result, const qudaStream_t stream=0, bool reset=false)
Finalize the reduction, returning the computed reduction into result. With heterogeneous atomics this...
void complete(host_t &result, const qudaStream_t stream=0, bool reset=false)
Overload providing a simple reference interface.
The atomic word size we use for a given reduction type. This type should be lock-free to guarantee co...