Problem using parallel=True and prange

Hi, I am trying to use Numba to improve the performance of my code. I have implemented an algorithm that executes in 420 seconds. The algorithm is embarrassingly parallel, but when I embed it inside a function decorated with @jit and use the parallel=True option, the execution time increases significantly instead of decreasing. Here is the issue I am facing:

Option 1: Execute the algorithm in plain Python.

################### V1 Value function iteration ##############################

    
S0 = np.zeros((Nz,Nx))
idx0 = np.zeros(Nz, dtype=np.int64)
Psi0 = np.zeros(Nz)
S0_z = np.zeros(Nz)

#Surplus's initial guess
p_guess = 0.03
for iz in range(Nz):
    for ix in range(Nx):
        S0[iz,ix] = z[iz]*dist_x[ix] - b
        S0[iz,ix] = S0[iz,ix]/(1-beta*(1-age)*((1-_lambda)*gamma-p_guess*phi))


# Psi initial guess...arg of first non-negative value for surplus
for iz in range(Nz):
    non_neg = np.flatnonzero(S0[iz,:] >= 0)
    if non_neg.size > 0:
        idx0[iz] = non_neg[0]
    else:
        idx0[iz] = Nx-1
    idx_z = idx0[iz]
    Psi0[iz] = dist_x[idx_z]

    #integral surplus initial guess (int_{psi(z)}^{inf} S(z,x)dF(x))
    S0_z[iz] = z[iz]*(1-G(Psi0[iz],sigma))*(E_cd(Psi0[iz],sigma)-Psi0[iz])

cont_z = np.zeros(Nz)
cont_esp = np.zeros(Nz)
S1_z = np.zeros(Nz)

S_int = np.zeros(Nz)
theta_z = np.ones(Nz)
p_z = np.ones(Nz)

S1 = np.zeros((Nz, Nx))

idx1 = np.zeros(Nz, dtype=np.int64)
Psi1 = np.zeros(Nz)

Psi_z = np.zeros(Nz)

error = 100
iter  = 1
while error > tol and iter < maxiter :
    
    for iz in range(Nz):

        #Update surplus's truncated expectation
        G_plus = (1-G(Psi0[iz],sigma))
        y_z = E_cd(Psi0[iz],sigma)
        idx_z = idx0[iz]
        cont_z[0:iz+1] = S0_z[0:iz+1]
        for iz1 in range(iz+1,Nz):
            I_S = rule_simpson(Psi0[iz1],Psi0[iz],iz1,param,x,P,S0,z,dist_x,p_z,S_int)
            cont_z[iz1] = S0_z[iz1] - I_S - G_plus*S0[iz1,idx_z]
        cont_esp[iz] = np.dot(P[:,iz],cont_z[:])
        S1_z[iz] = z[iz]*G_plus*(y_z - Psi0[iz]) + beta*(1-age)*(1-_lambda)*(1-gamma)*cont_esp[iz]
        
    for iz in range(Nz):
        
        # E_{z} { int_{psi(z')}^{inf} S(z',x)dF(x) }
        S_int[iz] = np.dot(P[:,iz],S1_z)
    
        # Tightness (Theta_z)
        theta_z[iz] = (beta*(1-age)*m/kappa)*(1-phi)*S_int[iz]
        if theta_z[iz] < 0:
            theta_z[iz] = 0
        theta_z[iz] = np.power(theta_z[iz],1/alpha)
        
        p_z[iz] = m*np.power(theta_z[iz],1-alpha)
        if p_z[iz] > 1:
            p_z[iz] = 1
        
        #Update surplus for VFI
        for ix in range(Nx):
            S_esp = np.dot(P[:,iz],np.maximum(S0[:,ix],0))
            S1[iz,ix] = z[iz]*dist_x[ix] - b + beta*(1-age)*((1-_lambda)*gamma - p_z[iz]*phi)*S_int[iz] + beta*(1-_lambda)*(1-gamma)*S_esp
        #Update Psi
        non_neg = np.flatnonzero(S1[iz,:] >= 0)
        if non_neg.size > 0:
            idx1[iz] = non_neg[0]
        else:
            idx1[iz] = Nx-1
        idx_z = idx1[iz]
        #first non-negative element of S1
        #bisection
        if S1[iz,idx_z-1] < 0 and S1[iz,idx_z] > 0:
            xtol = 2e-12
            #4*np.finfo(float).eps
            rtol = 8.881784197001252e-16 
            Psi1[iz] = S_bisect(dist_x[idx_z-1],dist_x[idx_z],iz,param,x,P,S0,z,dist_x,p_z,S_int,xtol,rtol)
        else:
            Psi1[iz] = -1.0

    error = np.max(np.abs(S0.ravel() - S1.ravel()))
    S0 = (1-adjf)*S0 + adjf*S1
    S0_z = S1_z
    Psi0 = Psi1
    idx0 = idx1
    iter += 1

Psi_z = Psi0
S_z = S0_z

In this code, the functions S_bisect(), rule_simpson(), G(), and E_cd() (as well as any other functions used) are decorated with @jit(nopython=True, nogil=True). The function S_bisect() calls an auxiliary function named Surplus_z, which is decorated with @jit(nopython=True, nogil=True, parallel=True). The algorithm performs well, with an execution time of 420 seconds. I monitored the CPU usage in Windows Task Manager, and the CPU utilization reaches approximately 100% during execution.

Since the entire algorithm is embarrassingly parallel, I would like to encapsulate it into a single function, decorate it with @jit(nopython=True, nogil=True, parallel=True), and replace range with prange.

Option 2: Using a decorated function with parallel = True and prange.

@jit(float64[:,:](float64[:],float64[:],float64[:,:],float64[:],float64[:],float64[:],float64[:]),
     nopython=True, nogil=True, parallel=True)
def value_iter(dist_x,dist_g,P,z,opt_value,param,x_iter):

    adjf, tol, maxiter = opt_value
    Nz, Nx, smooth, beta, phi, age = param
    b, kappa, _lambda, m, sigma, alpha, rho_in, sig_in, gamma = x_iter

    Nz = int(Nz)
    Nx = int(Nx)

    S0 = np.zeros((Nz,Nx))
    idx0 = np.zeros(Nz, dtype=np.int64)
    Psi0 = np.zeros(Nz)
    S0_z = np.zeros(Nz)
    
    #Surplus's initial guess
    p_guess = 0.03
    for iz in prange(Nz):
        for ix in prange(Nx):
            S0[iz,ix] = z[iz]*dist_x[ix] - b
            S0[iz,ix] = S0[iz,ix]/(1-beta*(1-age)*((1-_lambda)*gamma-p_guess*phi))
    
    
    # Psi initial guess...arg of first non-negative value for surplus
    for iz in prange(Nz):
        non_neg = np.flatnonzero(S0[iz,:] >= 0)
        if non_neg.size > 0:
            idx0[iz] = non_neg[0]
        else:
            idx0[iz] = Nx-1
        idx_z = idx0[iz]
        Psi0[iz] = dist_x[idx_z]
    
        #integral surplus initial guess (int_{psi(z)}^{inf} S(z,x)dF(x))
        S0_z[iz] = z[iz]*(1-G(Psi0[iz],sigma))*(E_cd(Psi0[iz],sigma)-Psi0[iz])
    
    cont_z = np.zeros(Nz)
    cont_esp = np.zeros(Nz)
    S1_z = np.zeros(Nz)
    
    S_int = np.zeros(Nz)
    theta_z = np.ones(Nz)
    p_z = np.ones(Nz)
    
    S1 = np.zeros((Nz, Nx))
    
    idx1 = np.zeros(Nz, dtype=np.int64)
    Psi1 = np.zeros(Nz)
    
    Psi_z = np.zeros(Nz)
    
    error = 100
    iter  = 1
    while error > tol and iter < maxiter :
        
        for iz in prange(Nz):
    
            #Update surplus's truncated expectation
            G_plus = (1-G(Psi0[iz],sigma))
            y_z = E_cd(Psi0[iz],sigma)
            idx_z = idx0[iz]
            cont_z[0:iz+1] = S0_z[0:iz+1]
            for iz1 in prange(iz+1,Nz):
                I_S = rule_simpson(Psi0[iz1],Psi0[iz],iz1,param,x_iter,P,S0,z,dist_x,p_z,S_int)
                cont_z[iz1] = S0_z[iz1] - I_S - G_plus*S0[iz1,idx_z]
            cont_esp[iz] = np.dot(P[:,iz],cont_z[:])
            S1_z[iz] = z[iz]*G_plus*(y_z - Psi0[iz]) + beta*(1-age)*(1-_lambda)*(1-gamma)*cont_esp[iz]
            
        for iz in prange(Nz):
            
            # E_{z} { int_{psi(z')}^{inf} S(z',x)dF(x) }
            S_int[iz] = np.dot(P[:,iz],S1_z)
        
            # Tightness (Theta_z)
            theta_z[iz] = (beta*(1-age)*m/kappa)*(1-phi)*S_int[iz]
            if theta_z[iz] < 0:
                theta_z[iz] = 0
            theta_z[iz] = np.power(theta_z[iz],1/alpha)
            
            p_z[iz] = m*np.power(theta_z[iz],1-alpha)
            if p_z[iz] > 1:
                p_z[iz] = 1
            
            #Update surplus for VFI
            for ix in prange(Nx):
                S_esp = np.dot(P[:,iz],np.maximum(S0[:,ix],0))
                S1[iz,ix] = z[iz]*dist_x[ix] - b + beta*(1-age)*((1-_lambda)*gamma - p_z[iz]*phi)*S_int[iz] + beta*(1-_lambda)*(1-gamma)*S_esp
            #Update Psi
            non_neg = np.flatnonzero(S1[iz,:] >= 0)
            if non_neg.size > 0:
                idx1[iz] = non_neg[0]
            else:
                idx1[iz] = Nx-1
            idx_z = idx1[iz]
            #bisection
            if S1[iz,idx_z-1] < 0 and S1[iz,idx_z] > 0:
                xtol = 2e-12
                #4*np.finfo(float).eps
                rtol = 8.881784197001252e-16 
                Psi1[iz] = S_bisect(dist_x[idx_z-1],dist_x[idx_z],iz,param,x_iter,P,S0,z,dist_x,p_z,S_int,xtol,rtol)
            else:
                Psi1[iz] = -1.0
    
        error = np.max(np.abs(S0.ravel() - S1.ravel()))
        S0 = (1-adjf)*S0 + adjf*S1
        S0_z = S1_z
        Psi0 = Psi1
        idx0 = idx1
        iter += 1

    out = np.zeros((5,Nz))
    #Psi_z
    out[0,:] = Psi0
    out[1,:] = theta_z
    #S_z
    out[2,:] = S0_z
    out[3,:] = p_z
    out[4,:] = S_int

    return out

################### V2 Value function iteration ##############################

value_opt = np.array([adjf, tol, maxiter],dtype=np.float64) 
values = value_iter(dist_x,dist_g,P,z,value_opt,param,x)

Psi_z = values[0]
theta_z = values[1]
S_z = values[2]
p_z = values[3]
S_int = values[4]

When I attempt this, the execution time increases dramatically (exceeding 24 hours), and the CPU usage drops to less than 30%. What could be causing this issue?

I will copy here the auxiliary functions that I am using:

@jit(float64(float64,float64), nopython=True, nogil = True)
def G(x,sigma):
    res = np.log(x) + pow(sigma,2)/2
    res = res/(sigma*np.sqrt(2))

    return 0.5*(1+math.erf(res))

@jit(float64(float64,float64), nopython=True, nogil = True)
def E_cd(x,sigma):
    res = pow(sigma,2)/2 - np.log(x)
    res = res/(sigma * np.sqrt(2))
    res = 1/2 + (1/2)*math.erf(res)
    return res/(1-G(x,sigma))

@jit(float64(float64[:,:],float64[:],int64,float64), 
     nopython=True, nogil=True)
def S_interpolate(S,dist_x,iz,point):
    non_neg = np.flatnonzero(dist_x - point > 0)
    if non_neg.size > 0:
        ix = non_neg[0]
        if ix > 1:
            y = np.array([S[iz,ix-2],S[iz,ix-1],S[iz,ix],S[iz,ix+1]])
            _x = np.array([dist_x[ix-2],dist_x[ix-1],dist_x[ix],dist_x[ix+1]])
            A = np.array(
                [
                    [np.power(_x[0],2),_x[0],1,0,0,0,0,0],
                    [np.power(_x[1],2),_x[1],1,0,0,0,0,0],
                    [0,0,0,np.power(_x[1],2),_x[1],1,0,0],
                    [0,0,0,np.power(_x[2],2),_x[2],1,0,0],
                    [0,0,0,0,0,0,_x[2],1],
                    [0,0,0,0,0,0,_x[3],1],
                    [2*_x[1],1,0,-2*_x[1],-1,0,0,0],
                    [0,0,0,2*_x[2],1,0,-1,0]
                ]
            )
            _y = np.array([y[0],y[1],y[1],y[2],y[2],y[3],0,0])
            coeff = np.linalg.inv(A) @ _y
            a,b,c = coeff[3],coeff[4],coeff[5]
            return a*np.power(point,2) + b*point + c
        elif ix == 1:
            m = (S[iz,ix] - S[iz,ix-1])/(dist_x[ix]-dist_x[ix-1])
            c = S[iz,ix-1] - m*dist_x[ix-1]
            return m*point + c
        else:
            return S[iz,ix]
    else:
        Nx = len(dist_x)
        return S[iz,Nx-1]

@jit(float64(float64,int64,float64[:],float64[:],float64[:,:],float64[:,:],float64[:],float64[:],float64[:],float64[:]), 
     nopython=True, nogil=True, parallel=True)
def Surplus_z(point,iz,param,x_iter,P,S,z,dist_x,p_z,S_int):
    Nz, Nx, smooth, beta, phi, age = param
    b, kappa, _lambda, m, sigma, alpha, rho_in, sig_in, gamma = x_iter
    Nz = int(Nz)
    
    S_inter = np.zeros(Nz)
    for indc in prange(Nz):
        S_inter[indc] = S_interpolate(S,dist_x,indc,point)
    S_esp = np.dot(P[:,iz],np.maximum(S_inter,0))
    out = z[iz]*point - b + beta*(1-age)*((1-_lambda)*gamma - p_z[iz]*phi)*S_int[iz] + beta*(1-_lambda)*(1-gamma)*S_esp

    return out

@jit(float64(float64,float64,int64,float64[:],float64[:],float64[:,:],float64[:,:],float64[:],float64[:],float64[:],float64[:]),
     nopython=True, nogil=True)
def rule_simpson(inf,sup,indc,param,x_iter,P,S,z,dist_x,p_z,S_int):
    b, kappa, _lambda, m, sigma, alpha, rho_in, sig_in, gamma = x_iter
    mid_point = (inf+sup)/2
    #f_a = Surplus_z(a,indc,param,x_iter,P,S,z,dist_x,p_z,S_int)*g(a,sigma)
    f_inf = 0
    f_m = Surplus_z(mid_point,indc,param,x_iter,P,S,z,dist_x,p_z,S_int)*g(mid_point,sigma)
    f_sup = Surplus_z(sup,indc,param,x_iter,P,S,z,dist_x,p_z,S_int)*g(b,sigma)

    out = f_inf + 4*f_m + f_sup
    out = (sup-inf)*out/6

    return out

@jit(float64(float64,float64,int64,float64[:],float64[:],float64[:,:],float64[:,:],float64[:],float64[:],float64[:],float64[:],float64,float64), 
     nopython=True, nogil=True)
def S_bisect(inf,sup,iz,param,x_iter,P,S,z,dist_x,p_z,S_int,xtol,rtol): 
    # Check if inf and sup bound a root
    # Get midpoint
    m = (inf + sup) / 2
    root = Surplus_z(m,iz,param,x_iter,P,S,z,dist_x,p_z,S_int)
    if np.allclose(0,root,atol=xtol,rtol=rtol):
        # Stopping condition, report m as root
        return m
    elif Surplus_z(m,iz,param,x_iter,P,S,z,dist_x,p_z,S_int) < 0:
        # Case where m is an improvement on inf. Make recursive call with a = m
        return S_bisect(m,sup,iz,param,x_iter,P,S,z,dist_x,p_z,S_int,xtol,rtol)
    else:
        # Case where m is an improvement on sup. Make recursive call with b = m
        return S_bisect(inf,m,iz,param,x_iter,P,S,z,dist_x,p_z,S_int,xtol,rtol)

As you can see, I implemented my own versions of the bisection method, an interpolation method, and a function to calculate the CDF of a log-normal distribution, all decorated with @jit.

Thank you in advance for your help.

Felipe

What are the values of Nz and Nx? I’m not sure how Numba handles prange, but you may be creating a huge number of threads, which will bring the OS to a crawl.

While I’m not 100% sure this will help you or not, try to minimize to the number of dynamic arrays/matrices being created inside your prange loops.

For instance, I’m seeing non_neg being created by using np.flatnonzero. I’m not familiar with this function but it appears this function creates a dynamically sized array based on whether a value is zero or not.

Whenever prange has to create new arrays, you are going to get crushed perfomance-wise with memory allocations. This happened to me awhile back with np.append inside a prange loop. Just because a numba function can be jit-compiled doesn’t me it should be used inside a prange loop.

Again, not sure if this would make that big of a difference, but I do encourage you to look through your code and look for any dynamically-sized arrays being created within prange and find an alternative method to get the information for which you are looking.

Fully runnable minimal example programs that demonstrate both before and after behaviors may get more eyeballs on your question.