Hello, this code runs fine if parallel=False but will crash jupyer and running from a terminal if parallel=True.
I suspect it may be related to the acf function call, or when calculating the sum of squared errors, but I cannot tell for sure. Any tips on how/where to refactor are greatly appreciated!
Background: Generate an n x m matrix of random binary values, and then rearrange the elements of each column m such that the resulting acf approximates that of a target acf. This code is inspired by:
J. Kruppa et al.
A genetic algorithm for simulating correlated binary data from biomedical research
Computers in Biology and Medicine (2018)
Environment: Python 3.10.12, numba 0.57.1 on Windows 10, jupyter lab and vscode
import numpy as np
import numba
@numba.jit(nopython=True, parallel=False)
def acf(x, n_lags=10):
"""
Calculate auto-correlation for a given number of lags
:param x: vector to find the auto-correlation of
:type x: np.array
:param n_lags: number of lags
:type n_lags: int
"""
return np.array([1]+[np.corrcoef(x[:-i], x[i:])[0,1] for i in range(1, n_lags+1)])
@numba.jit(nopython=True, parallel=True)
def correl(tgt:np.array, n:int = 200, m:int = 3, err:float = 0.9, n_lags:int = 10, maxIter:int = 10000):
"""
Create m number of auto-correlated binary sequences
:param tgt: target acf
:type tgt: np.array
:param n: number of elements in a sequence
:type n: int
:param m: number of sequences
:type m: int
:param err: min sse value
:type err: float
:param n_lags: numbr of acf lags
:type n_lags: int
:param maxIter: max iterations before stopping convergence
:type maxIter: int
...
Returns auto-correlated matrix
"""
# generate n x n matrix of random 0's and 1's
mat = np.random.binomial(n=1, p=0.48, size=(n,m))
#max index val of col vector
max_idx = mat.shape[0]
# loop through cols
for j in numba.prange(m):
# set initial sum of square error to inf
SSE0 = np.inf
# set current counter to 0
curr_int = 0
while SSE0 > err:
# choose two element ids at random
choice = np.random.choice(max_idx, size=2, replace=False)
# flip the two values
mat[choice[0],j] = mat[choice[0],j]^1
mat[choice[1],j] = mat[choice[1],j]^1
# calculate the acf for the vector
new_acf = acf(mat[:,j], n_lags=n_lags)
# calculate the resulting sse
sse = np.sum((new_acf[1:] - tgt[1:])**2)
if sse < SSE0:
SSE0 = sse
else:
# change the values back if there's no improvement in acf
mat[choice[0],j] = mat[choice[0],j]^1
mat[choice[1],j] = mat[choice[1],j]^1
# ++ the counter
curr_int += 1
# bail if it took to long
if curr_int == maxIter:
raise Exception("Max iterations has been reached without convergence")
return mat
if __name__ == "__main__":
tgt = np.array([1, 0.55, 0.35, .188, 0.039, -0.058, -0.0458, -0.035, -0.011, 0.00578, 0])
foo = correl(tgt, n=250, m = 1000, err = 0.01, n_lags = 10, maxIter = 10000)
Another thing that I find interesting, if I am interpreting the parallel debugging info correctly, is that the
for j in numba.prange(m):
loop appears to not be getting picked up as having parallel semantics per the docs for the-parallel-diagnostics-report-sections. I would not have expected this.
Parallel loop listing for Function correl, c:\Users\vdB9\Documents\Python\test\test\rv_generator.py (16)
---------------------------------------------------------------------------------------------------------------|loop #ID
@numba.jit(nopython=True, parallel=True) |
def correl(tgt:np.array, n:int = 200, m:int = 3, err:float = 0.9, n_lags:int = 10, maxIter:int = 10000): |
""" |
Create m number of auto-correlated binary sequences |
|
:param tgt: target acf |
:type tgt: np.array |
:param n: number of elements in a sequence |
:type n: int |
:param m: number of sequences |
:type m: int |
:param err: min sse value |
:type err: float |
:param n_lags: numbr of acf lags |
:type n_lags: int |
:param maxIter: max iterations before stopping convergence |
:type maxIter: int |
... |
Returns auto-correlated matrix |
""" |
|
# generate n x n matrix of random 0's and 1's |
mat = np.random.binomial(n=1, p=0.48, size=(n,m))----------------------------------------------------------| #0
|
#max index val of col vector |
max_idx = mat.shape[0] |
|
# loop through cols |
for j in numba.prange(m): |
|
# set initial sum of square error to inf |
SSE0 = np.inf |
|
# set current counter to 0 |
curr_int = 0 |
|
while SSE0 > err: |
# choose two element ids at random |
choice = np.random.choice(max_idx, size=2, replace=False) |
|
# flip the two values |
mat[choice[0],j] = mat[choice[0],j]^1 |
mat[choice[1],j] = mat[choice[1],j]^1 |
|
# calculate the acf for the vector |
new_acf = acf(mat[:,j], n_lags=n_lags) |
|
# calculate the resulting sse |
sse = np.sum((new_acf[1:] - tgt[1:])**2)-----------------------------------------------------------| #1, 2
|
if sse < SSE0: |
SSE0 = sse |
else: |
# change the values back if there's no improvement in acf |
mat[choice[0],j] = mat[choice[0],j]^1 |
mat[choice[1],j] = mat[choice[1],j]^1 |
|
# ++ the counter |
curr_int += 1 |
|
# bail if it took to long |
if curr_int == maxIter: |
raise Exception("Max iterations has been reached without convergence") |
|
return mat |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
Trying to fuse loops #1 and #2:
- fusion succeeded: parallel for-loop #2 is fused into for-loop #1.
----------------------------- Before Optimisation ------------------------------
Parallel region 0:
+--1 (parallel)
+--2 (parallel)
--------------------------------------------------------------------------------
------------------------------ After Optimisation ------------------------------
Parallel region 0:
+--1 (parallel, fused with loop(s): 2)
Parallel region 0 (loop #1) had 1 loop(s) fused.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
---------------------------Loop invariant code motion---------------------------
Allocation hoisting:
No allocation hoisting found
Instruction hoisting:
loop #0:
Has the following hoisted:
$push_global_to_block.165 = global(np: <module 'numpy' from 'C:\\Users\\vdB9\\miniconda3\\envs\\py310\\lib\\site-packages\\numpy\\__init__.py'>)
Failed to hoist the following:
dependency: $parfor_index_tuple_var.65 = build_tuple(items=[Var($parfor__index_58.177, <string>:2), Var($parfor__index_59.179, <string>:3)])
dependency: $push_getattr_to_block.163 = getattr(value=$push_getattr_to_block.164, attr=binomial)
dependency: $expr_out_var.64 = call $push_getattr_to_block.163(func=$push_getattr_to_block.163, args=[], kws=(('n', Var(_const8_3, rv_generator.py:38)), ('p', Var(_const10_4, rv_generator.py:38))), vararg=None, varkwarg=None, target=None)
loop #1:
Has the following hoisted:
$arg_out_var.74 = const(int, 2)
Failed to hoist the following:
dependency: $arg_out_var.72 = getitem(value=_184binary__subscr_58, index=$parfor__index_66.181, fn=<built-in function getitem>)
dependency: $arg_out_var.73 = getitem(value=_194binary__subscr_64, index=$parfor__index_66.181, fn=<built-in function getitem>)
dependency: $arg_out_var.71 = $arg_out_var.72 - $arg_out_var.73
dependency: $sum_parallel_impl__locals__sum_1_v86__v46binary_subscr_5.166 = $arg_out_var.71 ** $arg_out_var.74
dependency: sum_parallel_impl__locals__sum_1_v86__v48inplace_add_6 = inplace_binop(fn=<built-in function iadd>, immutable_fn=<built-in function add>, lhs=sum__parallel__impl____locals____sum__1__v86__val, rhs=$sum_parallel_impl__locals__sum_1_v86__v46binary_subscr_5.166, static_lhs=Undefined, static_rhs=Undefined)
dependency: sum__parallel__impl____locals____sum__1__v86__val = sum_parallel_impl__locals__sum_1_v86__v48inplace_add_6
--------------------------------------------------------------------------------