Very fast vector sum without CUDA.
In my most recent blogpost I gave an introduction to the Mojo programming language.
This blogpost will provide a workflow how to archive peak performance on a H100 GPU for the task of vector reduction.
In the end we will archive GPU utilisation very close to NVIDIAs CUB library.
If you are interested in a comparison to CUDA
it might be interesting to compare with my other blogpost where I did similar work using CUDA
.
Naive approach
The naive approach is to reduce in a two pass approach.
In the first pass we reduce to an output vector of TPB
elements.
In the second pass we reduce this vector to a scalar output (i.e. we launch the kernel with one block.)
fn sum_kernel[
size: Int
](out: UnsafePointer[Int32], a: UnsafePointer[Int32],):
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
sum += a[i]
sums[tid] = sum
barrier()
# See https://docs.modular.com/mojo/manual/decorators/parameter/#parametric-for-statement
active_threads = TPB
@parameter
for power in range(1, LOG_TPB + 1):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid == 0:
out[block_idx.x] = sums[tid][0]
Let's go step by step over this code:
We initialise an array of size TPB
in shared memory and accumulate for each thread into it.
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
sum += a[i]
sums[tid] = sum
barrier()
In the next step we reduce the sum like we would do in CUDA
active_threads = TPB
@parameter
for power in range(1, LOG_TPB + 1):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid == 0:
out[block_idx.x] = sums[tid][0]
Note that in CUDA
we would write the loop:
for (int activeThreads = threadsPerBlock >> 1; activeThreads;
activeThreads >>= 1) {
if (tid < activeThreads) {
sums[tid] += sums[tid + activeThreads];
}
__syncthreads();
}
On the host side we call the two stages:
alias TPB = 512
alias LOG_TPB = 9
alias SIZE = 1 << 30
alias NUM_BLOCKS = ceildiv(SIZE, TPB)
alias BLOCKS_PER_GRID_STAGE_1 = NUM_BLOCKS
alias BLOCKS_PER_GRID_STAGE_2 = 1
ctx.enqueue_function[sum_kernel[SIZE]](
stage_1_out_tensor,
a_tensor,
grid_dim=BLOCKS_PER_GRID_STAGE_1,
block_dim=TPB,
)
ctx.synchronize()
# with stage_1_out.map_to_host() as stage_1_out_host:
# print("stage 1out:", stage_1_out_host)
# STAGE 2
ctx.enqueue_function[sum_kernel[NUM_BLOCKS]](
out_tensor,
stage_1_out_tensor,
grid_dim=BLOCKS_PER_GRID_STAGE_2,
block_dim=TPB,
)
ctx.synchronize()
This kernel archives 622.87 GB/s
, i.e. 18.87%
utilisation, on an H100
and 108.23 GB/s
, i.e. 32.21%
utilisation, on my small consumer card NVIDIA GeForce RTX 3060 Mobile
.
Here and below I take always N = 2^30
elements for H100
and N = 2^29
elements for the GeForce RTX 3060 Mobile
.
Reduce last warp
We can slightly improve the performance by saving a blocksync (barrier
in Mojo) by reducing the last warp using shuffle
instruction similar to what we would do in CUDA
:
fn sum_kernel[
size: Int
](out: UnsafePointer[Int32], a: UnsafePointer[Int32],):
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
sum += a[i]
sums[tid] = sum
barrier()
# See https://docs.modular.com/mojo/manual/decorators/parameter/#parametric-for-statement
active_threads = TPB
@parameter
for power in range(1, LOG_TPB - 4):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid < 32:
var warp_sum: Int32 = sums[tid][0]
warp_sum = warp.sum(warp_sum)
if tid == 0:
out[block_idx.x] = warp_sum
Note that warp.sum
is an abstraction which can be equivalently implemented on a lower level:
if tid < 32:
# See https://github.com/modular/modular/blob/a9ee7332b0eed46e7bec79a09cb0219cfec065db/mojo/stdlib/stdlib/gpu/warp.mojo
var warp_sum: Int32 = sums[tid][0]
offset = 32
for _ in range(LOG_WS):
offset >>= 1
warp_sum += warp.shuffle_down(warp_sum, offset)
if tid == 0:
_ = Atomic.fetch_add(out, warp_sum)
Mojo has a large Open Source codebase and we can see how the shuffle instruction is implemented here:
if type is DType.float32:
return llvm_intrinsic[
"llvm.nvvm.shfl.sync." + mnemonic + ".f32", Scalar[type]
](Int32(mask), val, offset, WIDTH_MASK)
elif type in (DType.int32, DType.uint32):
return llvm_intrinsic[
"llvm.nvvm.shfl.sync." + mnemonic + ".i32", Scalar[type]
](Int32(mask), val, offset, WIDTH_MASK)
We see above that this is a simple wrapper around the corresponding PTX
instruction.
The above approach will increase performance to 718.54 GB/s
, i.e. 21.77%
utilisation, on H100
and 160.18 GB/s
, i.e. 47.69%
on my consumer GPU.
One pass
So far we implemented two pass approach. To have one pass approach (i.e. only one kernel call) we need Atomic instruction
. In CUDA
we can do this using atomicAdd
, while in Mojo we do the following:
fn sum_kernel[
size: Int
](out: UnsafePointer[Int32], a: UnsafePointer[Int32],):
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
sum += a[i]
sums[tid] = sum
barrier()
# See https://docs.modular.com/mojo/manual/decorators/parameter/#parametric-for-statement
active_threads = TPB
@parameter
for power in range(1, LOG_TPB - 4):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid < 32:
var warp_sum: Int32 = sums[tid][0]
warp_sum = warp.sum(warp_sum)
if tid == 0:
_ = Atomic.fetch_add(out, warp_sum)
You see that the kernel logic is highly similar. The only thing that changes is that in the end we accumulate the results over all blocks via an atomic add.
On the host side that means we only need one kernel invocation like so:
_ = out.enqueue_fill(0)
ctx.enqueue_function[sum_kernel[SIZE]](
out_tensor,
a_tensor,
grid_dim=NUM_BLOCKS,
block_dim=TPB,
)
ctx.synchronize()
This will boost performance to 985.32 GB/s
, i.e. 29.86%
utilisation, on H100
and 167.45 GB/s
, i.e. 49.84%
utilisation, on my consumer GPU.
Batching
To archive peak performance in memory bound kernels it is often necessary to increase workload per thread. In mojo we can implement this as follows:
fn sum_kernel[
size: Int, batch_size: Int
](out: UnsafePointer[Int32], a: UnsafePointer[Int32],):
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
@parameter
for j in range(batch_size):
idx = i * batch_size + j
if idx < size:
sum += a[idx]
sums[tid] = sum
barrier()
# See https://docs.modular.com/mojo/manual/decorators/parameter/#parametric-for-statement
active_threads = TPB
@parameter
for power in range(1, LOG_TPB - 4):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid < 32:
var warp_sum: Int32 = sums[tid][0]
warp_sum = warp.sum(warp_sum)
if tid == 0:
_ = Atomic.fetch_add(out, warp_sum)
We see above that now each thread will have a higher workload and calculate the sum over multiple elements.
We can think of it such that we reshape our matrix from N -> (batch_size, N/batch_size)
elements, than perform reduction for each thread along the first dimension and than perform the final reduction over the second dimension if we want.
This brings the consumer GPU to peak performance of 324.77 GB/s
and an almost saturated utilisation of 96.66%
on the Hopper GPU we are however not saturated and have 1881.55 GB/s
and a utilisation of 57.02%
.
Vectorised load
To archive a nearly saturated GPU bandwidth on Hopper we need to perform vectorised load.
In CUDA
we have int4
datatype to enforce 128B
load. In Mojo we have load which by the documentation provides a high-level interface for vectorised memory loads with configurable cache behavior and memory access patterns.
The corresponding kernel looks as follows:
fn sum_kernel[
size: Int, batch_size: Int
](out: UnsafePointer[Int32], a: UnsafePointer[Int32],):
sums = tb[dtype]().row_major[TPB]().shared().alloc()
global_tid = block_idx.x * block_dim.x + thread_idx.x
tid = thread_idx.x
threads_in_grid = TPB * grid_dim.x
var sum: Int32 = 0
for i in range(global_tid, size, threads_in_grid):
# @parameter
# for j in range(batch_size):
# idx = i * batch_size + j
# if idx < size:
# sum += a[idx]
idx = i * batch_size
if idx < size:
sum += load[width=batch_size](a, idx).reduce_add()
sums[tid] = sum
barrier()
# See https://docs.modular.com/mojo/manual/decorators/parameter/#parametric-for-statement
active_threads = TPB
@parameter
for power in range(1, LOG_TPB - 4):
active_threads >>= 1
if tid < active_threads:
sums[tid] += sums[tid + active_threads]
barrier()
if tid < 32:
var warp_sum: Int32 = sums[tid][0]
warp_sum = warp.sum(warp_sum)
if tid == 0:
_ = Atomic.fetch_add(out, warp_sum)
As you can see the adjustment is minimal. We load into a SIMD
vector which we than call the reduce_add
operation onto.
This will than give us 3134.26 GB/s
, i.e. 94.98%
utilisation, on the Hopper architecture. The last time I measured CUB
on Hopper it gave us 3190.53 GB/s
so we are already very close to this performance.
To close the gap we would probably need to do some profiling or looking at the generated LLVM IR
or assembly
to identify where there is potential for optimisation. I will choose to stop at this point to keep the blog concise.
Conclusion
I hope you found this blogpost enjoyable and it gave you a better understanding of how performant Mojo is.
One thing we should remark is that Mojo is not limited to NVIDIA
GPUs. We could probably archive very high performance on AMD
GPUs using the same kernels as above.
To read the full kernels please check out my repo.