Fully-JIT'ed modular calculation on a graph

In case anyone is interested in simultaneously

  • Developing interfaces in Python
  • Performing large modular calculation on a graph
  • Minimal-assumption flexible graph nodes supporting scalar, vector, and structured payloads
  • High performance
  • 100% numba JIT’ed implementation of all the core functionality

feel free to consider the numbox Work nodes.

A user-friendly Python abstraction is available and is illustrated in these tests.

Documentation with detailed examples of provided functionality can be found here.

Take it for a spin, contribute, provide feedback and suggestions!

The numbox project is published on PyPI, at the moment of this writing the latest version is 0.2.6.

Here’s an example:

import numpy

from numba.core.types import float32

from numbox.core.work.builder import Derived, End, make_graph
from numbox.core.work.explain import explain
from numbox.core.work.print_tree import make_image


x = End(name="x", init_value=numpy.array([1.0, 2.0, 3.0], dtype=numpy.float32))
y = End(name="y", init_value=numpy.array([0.5, 0.25, 0.75], dtype=numpy.float32))
threshold = End(name="threshold", init_value=1.5, ty=float32)
alpha = End(name="alpha", init_value=0.1, ty=float32)
beta = End(name="beta", init_value=0.01, ty=float32)


def derive_mask(x_, threshold_):
    return x_ > threshold_


mask = Derived(
    name="mask",
    init_value=numpy.full_like(x.init_value, False, dtype=bool),
    derive=derive_mask,
    sources=(x, threshold),
)


def derive_scaled_y(y_, mask_, alpha_):
    return numpy.where(mask_, y_ * alpha_, y_)


scaled_y = Derived(
    name="scaled_y",
    init_value=numpy.zeros_like(y.init_value, dtype=numpy.float32),
    derive=derive_scaled_y,
    sources=(y, mask, alpha),
)


def derive_weighted_sum(x_, scaled_y_, beta_):
    return numpy.sum(x_ * scaled_y_ + beta_)


weighted_sum = Derived(
    name="weighted_sum",
    init_value=float32(0.0),
    derive=derive_weighted_sum,
    sources=(x, scaled_y, beta),
)


def derive_running_avg(x_, y_):
    avg = numpy.zeros_like(x_, dtype=numpy.float32)
    for i in range(len(x_)):
        avg[i] = numpy.mean(x_[:i+1] + y_[:i+1])
    return avg


running_avg = Derived(
    name="running_avg",
    init_value=numpy.zeros_like(x.init_value, dtype=numpy.float32),
    derive=derive_running_avg,
    sources=(x, y),
)


def derive_interaction(running_avg_, scaled_y_):
    return numpy.tanh(running_avg_ * scaled_y_)


interaction = Derived(
    name="interaction",
    init_value=numpy.zeros_like(x.init_value, dtype=numpy.float32),
    derive=derive_interaction,
    sources=(running_avg, scaled_y),
)


def derive_output(interaction_, weighted_sum_):
    return numpy.mean(interaction_) + weighted_sum_


output = Derived(
    name="output",
    init_value=float32(0.0),
    derive=derive_output,
    sources=(interaction, weighted_sum),
)


if __name__ == "__main__":
    # Build the graph with the `output` as accessor node
    access = make_graph(output)
    output = access.output

    # `output` node's `data` was initialized to zero
    assert output.data == 0

    # display all the `End` nodes required to derive `output`
    print(output.all_end_nodes())  # ["x", "y", "threshold", "alpha", "beta"]

    # walk through the derivation chain to calculate `output`
    derivation_of_output = explain(output)
    assert derivation_of_output == """All required end nodes: ['x', 'y', 'threshold', 'alpha', 'beta']

x: end node

y: end node

running_avg: derive_running_avg(x, y)

    def derive_running_avg(x_, y_):
        avg = numpy.zeros_like(x_, dtype=numpy.float32)
        for i in range(len(x_)):
            avg[i] = numpy.mean(x_[:i+1] + y_[:i+1])
        return avg

threshold: end node

mask: derive_mask(x, threshold)

    def derive_mask(x_, threshold_):
        return x_ > threshold_

alpha: end node

scaled_y: derive_scaled_y(y, mask, alpha)

    def derive_scaled_y(y_, mask_, alpha_):
        return numpy.where(mask_, y_ * alpha_, y_)

interaction: derive_interaction(running_avg, scaled_y)

    def derive_interaction(running_avg_, scaled_y_):
        return numpy.tanh(running_avg_ * scaled_y_)

beta: end node

weighted_sum: derive_weighted_sum(x, scaled_y, beta)

    def derive_weighted_sum(x_, scaled_y_, beta_):
        return numpy.sum(x_ * scaled_y_ + beta_)

output: derive_output(interaction, weighted_sum)

    def derive_output(interaction_, weighted_sum_):
        return numpy.mean(interaction_) + weighted_sum_
"""

    # Calculate `output`. Will DFS traverse the graph deriving all the sources first.
    output.calculate()

    # Display the calculated value of the `output`
    print(output.data)  # 1.09410762...

    # Display the sub-graph structure starting from `output`
    assert make_image(output) == """
output--interaction---running_avg--x
        |             |            |
        |             |            y
        |             |
        |             scaled_y-----y
        |                          |
        |                          mask---x
        |                          |      |
        |                          |      threshold
        |                          |
        |                          alpha
        |
        weighted_sum--x
                      |
                      scaled_y-----y
                      |            |
                      |            mask---x
                      |            |      |
                      |            |      threshold
                      |            |
                      |            alpha
                      |
                      beta"""

Additionally, functionality to re-load the specified nodes with the given values is provided. Only the affected graph nodes will then be re-calculated.

Functionality to traverse the graph and harvest the desired nodes is provided as well.

1 Like