Understanding concurrency of streams in numba cuda correctly

I’m not really sure if I’m understanding how streams work with kernels asynchronously.

import pandas as pd
from datetime import datetime, timedelta
from numba import cuda
import numba
import numpy as np
import math

def min_max_of_arrays(x, Output):

    for i in range(x.shape[1]):
        Column = x[:, i]
        Maximum = Column[0]
        Minimum = Column[0]
        for j in Column:
            if j > Maximum:
                Maximum = j
            elif j < Minimum:
                Minimum = j
        Output[0, i] = Maximum 
        Output[1, i] = Minimum
    return Output

def normalise_arrays(FeatureSlices, MinMaxArray, NormalisedArray):

    x, y = cuda.grid(2)

    if x < NormalisedArray.shape[0] and y < NormalisedArray.shape[1]:
        Min = MinMaxArray[1, y]
        Max = MinMaxArray[0, y]
        NormalisedArray[x, y] = ((FeatureSlices[x, y] - Min) / (Max - Min))

# For this dummy example I'll use random numbers. I want to find the min and max of every column of every array in my list so that I can perform a min max scaler on the "InputArrayList".

InputArrayList = [np.random.rand(60, 98450) for j in range(18)]

# I find the min max of every column.

MinMaxArrayList = []
for FeatureSlice in InputArrayList:

    MinMaxArray = np.empty([2, 98450])

    min_max_of_arrays(FeatureSlice, MinMaxArray)
# Now I have my array of min and max's I can hand this into my kernel function using multiple streams 
# across multiple GPU's. This is where I'm unsure of my understanding of how streams work. I don't know 
# if using "del" at the end of each loop is really helping anything. I'm also unclear on how the loop 
# is managing to start a kernel on it's own loop and then continue running to start a new kernel before the previous one has fininshed. I basically think i'm still running everything synchronously rather than 
# asynchronously.

with cuda.defer_cleanup():

    NumberOfStreams = 18
    for i, (MinMax, arr) in enumerate(zip(MinMaxArrayList, InputArrayList)):
        if len(cuda.gpus) == 3:
            if i < NumberOfStreams / 3:
            elif i >= NumberOfStreams / 3 and i < NumberOfStreams:
        stream = cuda.stream()
        with cuda.pinned(arr):
            # We put our Input array on to the device
            Input = cuda.to_device(arr, stream=stream)
            MinMaxInput = cuda.to_device(MinMax, stream=stream)
            # We create an empty array on the device, ready to be filled with all the normalised columns
            # from our input array.
            NormalisedArray = cuda.device_array(
                (60, 98450), 
                dtype=Input.dtype, stream=stream)
            threadsperblock = (8, 128)
            blockspergrid_x = math.ceil(NormalisedArray.shape[0] / threadsperblock[0])
            blockspergrid_y = math.ceil(NormalisedArray.shape[1] / threadsperblock[1])
            blockspergrid = (blockspergrid_x, blockspergrid_y)
            normalise_arrays[blockspergrid, threadsperblock](Input, MinMaxInput, NormalisedArray)  
            NormalisedArray.copy_to_host(arr, stream=stream)
            del arr, MinMaxInput, NormalisedArray

I’ve also used NSight Systems to try and check what the GPU’s are doing. It looks as though I’ve managed to use two of the GPU’s but the kernels seem to be having to wait for one to finish before another can start.

The kernel also needs to be launched on the stream:

normalise_arrays[blockspergrid, threadsperblock, stream](Input, MinMaxInput, NormalisedArray)

This isn’t very prominently documented, but the kernel launch configuration with a stream is mentioned here in the docs:


1 Like

Thanks a lot for taking the time to help out Mark. I’ve added “stream” to the kernel call now but after passing the executable back through NSight Systems I’m still getting the same pattern in the attached picture. Is it possible that the GPU’s are doing what they are supposed to do but I’m just not understanding the readout from NSight properly?

Thanks again