I’m not really sure if I’m understanding how streams work with kernels asynchronously.
import pandas as pd
from datetime import datetime, timedelta
from numba import cuda
import numba
import numpy as np
import math
@numba.jit
def min_max_of_arrays(x, Output):
for i in range(x.shape[1]):
Column = x[:, i]
Maximum = Column[0]
Minimum = Column[0]
for j in Column:
if j > Maximum:
Maximum = j
elif j < Minimum:
Minimum = j
Output[0, i] = Maximum
Output[1, i] = Minimum
return Output
@cuda.jit
def normalise_arrays(FeatureSlices, MinMaxArray, NormalisedArray):
x, y = cuda.grid(2)
if x < NormalisedArray.shape[0] and y < NormalisedArray.shape[1]:
Min = MinMaxArray[1, y]
Max = MinMaxArray[0, y]
NormalisedArray[x, y] = ((FeatureSlices[x, y] - Min) / (Max - Min))
# For this dummy example I'll use random numbers. I want to find the min and max of every column of every array in my list so that I can perform a min max scaler on the "InputArrayList".
InputArrayList = [np.random.rand(60, 98450) for j in range(18)]
# I find the min max of every column.
MinMaxArrayList = []
for FeatureSlice in InputArrayList:
MinMaxArray = np.empty([2, 98450])
min_max_of_arrays(FeatureSlice, MinMaxArray)
MinMaxArrayList.append(MinMaxArray)
# Now I have my array of min and max's I can hand this into my kernel function using multiple streams
# across multiple GPU's. This is where I'm unsure of my understanding of how streams work. I don't know
# if using "del" at the end of each loop is really helping anything. I'm also unclear on how the loop
# is managing to start a kernel on it's own loop and then continue running to start a new kernel before the previous one has fininshed. I basically think i'm still running everything synchronously rather than
# asynchronously.
with cuda.defer_cleanup():
NumberOfStreams = 18
for i, (MinMax, arr) in enumerate(zip(MinMaxArrayList, InputArrayList)):
if len(cuda.gpus) == 3:
if i < NumberOfStreams / 3:
cuda.select_device(0)
elif i >= NumberOfStreams / 3 and i < NumberOfStreams:
cuda.select_device(1)
else:
cuda.select_device(2)
stream = cuda.stream()
with cuda.pinned(arr):
# We put our Input array on to the device
Input = cuda.to_device(arr, stream=stream)
MinMaxInput = cuda.to_device(MinMax, stream=stream)
# We create an empty array on the device, ready to be filled with all the normalised columns
# from our input array.
NormalisedArray = cuda.device_array(
(60, 98450),
dtype=Input.dtype, stream=stream)
threadsperblock = (8, 128)
blockspergrid_x = math.ceil(NormalisedArray.shape[0] / threadsperblock[0])
blockspergrid_y = math.ceil(NormalisedArray.shape[1] / threadsperblock[1])
blockspergrid = (blockspergrid_x, blockspergrid_y)
normalise_arrays[blockspergrid, threadsperblock](Input, MinMaxInput, NormalisedArray)
NormalisedArray.copy_to_host(arr, stream=stream)
del arr, MinMaxInput, NormalisedArray
I’ve also used NSight Systems to try and check what the GPU’s are doing. It looks as though I’ve managed to use two of the GPU’s but the kernels seem to be having to wait for one to finish before another can start.