Hi numba community,
I have relatively simple piece of code which I am parallelizing. It works for smaller datasets as it is now, but I do not see the parallel scaling I would expect. I am new to using numba and as such my implementation is probably far from optimal. It is my hope that someone here point me in the right direction.
What I’m doing in brief:
I have datasets with 1k-12k rows (variables) and 400k-1.2m columns (samples) and I am calculating a similarity metric between all samples based on cosine similarity scaled by the number of non-zero overlaps. This generates a rather large square adjacency matrix which I preallocate and fill out using np.memmap()
to avoid memory constraints.
Code:
import numpy as np
import pandas as pd
from numba import njit, prange
from numba_progress import ProgressBar
# -------------------------------------
# functions
@njit
def nz_overlap_count_func(array):
"""
Returns the count of non-zero overlaps between two 1D arrays.
"""
n = 0
for i in array:
if i != 0:
n += 1
return n
@njit(nogil=True, parallel=True)
def cosines_memmap(input_data, output_memmap, progress_proxy):
"""
Calculates the cosine similarity between overlapping variables of
all columns in input array and updates an array stored on disk
with the values.
Input:
input_data: A 2D np.array of np.float32
output_memmap: A 2D np.array stored on disk using np.memmap()
progress_proxy: A ProgressBar object to track progress
"""
# Loop over all samples, i
for i in prange(input_data.shape[1]):
query = input_data[:,i]
# Loop over all samples j >= i
for j in range(i, input_data.shape[1]):
reference = input_data[:,j]
# Get number of non-zero overlaps between query and reference
ol = nz_overlap_count_func(query*reference)
if ol == 0:
ol = 1
# Calculate cosine similarity from overlapping variables between samples i and j,
# scale it by the number of overlaps between samples i and j,
# then update the (i,j)'th entry in the output array on disk
q_denom = np.sqrt(np.sum(query[reference.nonzero()]**2))
if q_denom == 0:
q_denom = 1
r_denom = np.sqrt(np.sum(reference[query.nonzero()]**2))
if r_denom == 0:
r_denom = 1
output_memmap[i,j] = ((query / q_denom) @ (reference / r_denom)) * ((1-(1/ol))**2)
progress_proxy.update(1)
# -------------------------------------
# Some test data
input_shape = (1000,5800)
input_data = np.random.default_rng(seed=12).normal(size=input_shape).astype(np.float32)
input_data[np.abs(input_data) < 2] = 0
output_arr = np.memmap('/path/to/output.file', dtype=np.float32, mode='w+', shape=(input_shape[1],input_shape[1]))
# -------------------------------------
# Run to compile
first_i = np.random.default_rng(seed=12).normal(size=(5,15))
first_o = np.memmap('/path/to/output_f.file', dtype=np.float32, mode='w+', shape=(first_i.shape[1],first_i.shape[1]))
with ProgressBar(total=first_i.shape[1]) as progress:
cosines_memmap(first_i, first_o, progress)
# Run actual
set_num_threads(8)
with ProgressBar(total=input_shape[1]) as progress:
cosines_memmap(input_data, output_arr, progress)
As the adjacency matrix is mirrored across the diagonal I only calculate the upper triangle (the j-loop). Thus when I run it with parallel=False
I would expect iterations/sec to increase over the run as we are making fewer and fewer calculations for each i. This is indeed what I observe, and I get a runtime of ~2min 50sec for the given example.
However for parallel=True
I no longer see this behaviour: In fact, iterations/sec decreases over the run. Perhaps this has to do with how numba splits up the work between the threads, where it does not take into account that the workload decreases along the i loop. Also, for the given example I see no further decreases in runtime past ~8 threads which finishes in ~45 seconds.
Eventually I get rid of the dense adjacency matrix by setting a threshold under which everything is set to zero and save that as a sparse matrix. Overall this feels wasteful, and my initial approach was to append (i,j,value) to a list if value was above some threshold. From that I could construct a sparse COO matrix without ever holding the full dense adjacency matrix in memory. At that point I would no longer have to use a memory mapped file on disk, which I assume would also speed the whole thing up. This unfortunately cannot be parallelized as I do not know the length of the list in advance, which is why I have ended up with the memory mapped approach.