Hi everyone,
I’m trying to write an hybrid MPI+OpenMP python program that uses the openmp threading layer of Numba and mpi4py/schwimbadd.
As an example, I wrote this code in which I compute the Euclidean pairwise distance between two matrices XA, XB parallelize using numba.njit(parallel=True) and numba.prange.
I then create a pool of N worker that have to compute N different pairwise distances. I would like to make each of the N worker using 4 threads.
I tried with this code
import numba as nb
import numpy as np
import timeit
from numba import config
from schwimmbad import MPIPool
from mpi4py import MPI
import os
config.THREADING_LAYER = 'omp'
@nb.njit(parallel=True)
def euclidean_cdist(XA, XB):
n_samples_A, n_samples_B = XA.shape[0], XB.shape[0]
cdist = np.empty((n_samples_A, n_samples_B), dtype=np.float64)
for i in nb.prange(n_samples_A):
for j in range(n_samples_B):
dist = 0.0
for k in range(XA.shape[1]):
diff = XA[i, k] - XB[j, k]
dist += diff * diff
cdist[i, j] = np.sqrt(dist)
return cdist
def worker(task):
np.random.seed(task)
# DATA
XA = np.random.rand(5000,3)
XB = np.random.rand(5000,3)
# SET THREADS AND COMPILE
nb.set_num_threads(4)
euclidean_cdist(XA, XB)
# PROFILE
times = timeit.repeat(lambda: euclidean_cdist(XA, XB), number=1, repeat=100)
std = np.std(times) * 1000 # Convert to milliseconds
mean = np.mean(times) * 1000 # Convert to milliseconds
return mean, std
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
pool = MPIPool()
if not pool.is_master():
pool.wait()
pool.close()
exit()
tasks = [123+rank for _ in range(size)] # Create a list of tasks
results = pool.map(worker, tasks)
pool.close()
if rank == 0:
# Print the mean and std of KDE profiling
for i, (mean, std) in enumerate(results):
print("Task {}: Mean: {:.2f} ms, Std: {:.2f} ms".format(i, mean, std))
However, if I launch it with the command mpirun -n 2 python test.py I got the error
ValueError: The number of threads must be between 1 and 2.
The API Reference says that the argument of set_num_threads must be between 1 and NUMBA_NUM_THREADS, and by default NUMBA_NUM_THREADS is equal to multiprocessing.cpu_count().
So it seems, that cpu_cpunt() see only the 2 cpus, one for each process launched by MPI.
The only workaround I found is to substitute the line nb.set_num_threads(4) with os.environ["NUMBA_NUM_THREADS"]=str(4), but the code does not scale as expected.
The average time from each worker is the same for nthreads = 1,2,4,8 and is the same of the time taken using two threads only.
So it seems that os.environ["NUMBA_NUM_THREADS"]=str(4) does not work, but I have to explicitly export the number of Numba threads in the shell.
Even in this case, no time gaining is obtained in the MPI program, while in a serial version of the program there’s a speed up when increasing the number of exported threads.
Does anyone know how to assign a specific amount of Numba threads to each MPI process?
Thanks!