Looking for advice on "ValueError: ctypes objects containing pointers cannot be pickled

Hoping someone has an idea of what can be done, even if its not a numba-specific issue.

I am looking for advice on the error-message ctypes objects containing pointers cannot be pickled, with the goal of running njitted scipy.special.cython_special-functions on remote workers.

  1. I collect the cythonfunction-addresses using numba.extending as in this documentation. I am using this approach. Running the resuling function locally works great.
(This is using `scipy.special.cython_special.huber` just as an example)
from numba.extending import get_cython_function_address
from numba import njit
import ctypes
import scipy
addr = get_cython_function_address('scipy.special.cython_special', 'huber')
functype = ctypes.CFUNCTYPE(ctypes.c_double, ctypes.c_double, ctypes.c_double)
chuber = functype(addr)

@njit
def nchuber(delta, r):
    return chuber(delta, r)

nchuber(1.0, 4.0)

Have not yet found a way to run the function on a remote worker.
What would be a way around this?

(Using distributed’s LocalCluster here just to easier create an MRE. But the goal is to run on GatewayCluster).

MRE
from distributed import Client, LocalCluster
cluster=LocalCluster()
client=Client(cluster)
submitted = client.submit(nchuber, 1.0, 4.0)

Which leads to:

ValueError: ctypes objects containing pointers cannot be pickled;
Full traceback, when running on remote worker
2023-11-18 20:05:51,525 - distributed.protocol.pickle - INFO - Failed to serialize CPUDispatcher(<function Matern_SpaceTimeCovariance_Nblocks_latlon_GenExp at 0x7fb8f21a9cf0>). Exception: ctypes objects containing pointers cannot be pickled
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/protocol/pickle.py:46, in dumps(x, buffer_callback, protocol)
     45 buffers.clear()
---> 46 result = pickle.dumps(x, **dump_kwargs)
     47 if b"__main__" in result or (
     48     CLOUDPICKLE_GTE_20
     49     and getattr(inspect.getmodule(x), "__name__", None)
     50     in cloudpickle.list_registry_pickle_by_value()
     51 ):

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:106, in _pickle__CustomPickled(cp)
    102 """standard pickling for `_CustomPickled`.
    103 
    104 Uses `NumbaPickler` to dump.
    105 """
--> 106 serialized = dumps((cp.ctor, cp.states))
    107 return _unpickle__CustomPickled, (serialized,)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:57, in dumps(obj)
     56 p = pickler(buf, protocol=4)
---> 57 p.dump(obj)
     58 pickled = buf.getvalue()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/cloudpickle/cloudpickle_fast.py:568, in CloudPickler.dump(self, obj)
    567 try:
--> 568     return Pickler.dump(self, obj)
    569 except RuntimeError as e:

ValueError: ctypes objects containing pointers cannot be pickled

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
Cell In[27], line 1
----> 1 Ktheta = client.run(covf.Matern_SpaceTimeCovariance_Nblocks_latlon_GenExp, n, lats, lons, times, const, const, const, const, const, const )

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2901, in Client.run(self, function, workers, wait, nanny, on_error, *args, **kwargs)
   2818 def run(
   2819     self,
   2820     function,
   (...)
   2826     **kwargs,
   2827 ):
   2828     """
   2829     Run a function on all workers outside of task scheduling system
   2830 
   (...)
   2899     >>> c.run(print_state, wait=False)  # doctest: +SKIP
   2900     """
-> 2901     return self.sync(
   2902         self._run,
   2903         function,
   2904         *args,
   2905         workers=workers,
   2906         wait=wait,
   2907         nanny=nanny,
   2908         on_error=on_error,
   2909         **kwargs,
   2910     )

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    337     return future
    338 else:
--> 339     return sync(
    340         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    341     )

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
    404 if error:
    405     typ, exc, tb = error
--> 406     raise exc.with_traceback(tb)
    407 else:
    408     return result

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:379, in sync.<locals>.f()
    377         future = asyncio.wait_for(future, callback_timeout)
    378     future = asyncio.ensure_future(future)
--> 379     result = yield future
    380 except Exception:
    381     error = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2781, in Client._run(self, function, nanny, workers, wait, on_error, *args, **kwargs)
   2768 async def _run(
   2769     self,
   2770     function,
   (...)
   2776     **kwargs,
   2777 ):
   2778     responses = await self.scheduler.broadcast(
   2779         msg=dict(
   2780             op="run",
-> 2781             function=dumps(function),
   2782             args=dumps(args),
   2783             wait=wait,
   2784             kwargs=dumps(kwargs),
   2785         ),
   2786         workers=workers,
   2787         nanny=nanny,
   2788         on_error="return_pickle",
   2789     )
   2790     results = {}
   2791     for key, resp in responses.items():

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/protocol/pickle.py:58, in dumps(x, buffer_callback, protocol)
     56 try:
     57     buffers.clear()
---> 58     result = cloudpickle.dumps(x, **dump_kwargs)
     59 except Exception as e:
     60     logger.info("Failed to serialize %s. Exception: %s", x, e)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     69 with io.BytesIO() as file:
     70     cp = CloudPickler(
     71         file, protocol=protocol, buffer_callback=buffer_callback
     72     )
---> 73     cp.dump(obj)
     74     return file.getvalue()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:632, in CloudPickler.dump(self, obj)
    630 def dump(self, obj):
    631     try:
--> 632         return Pickler.dump(self, obj)
    633     except RuntimeError as e:
    634         if "recursion" in e.args[0]:

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:106, in _pickle__CustomPickled(cp)
    101 def _pickle__CustomPickled(cp):
    102     """standard pickling for `_CustomPickled`.
    103 
    104     Uses `NumbaPickler` to dump.
    105     """
--> 106     serialized = dumps((cp.ctor, cp.states))
    107     return _unpickle__CustomPickled, (serialized,)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:57, in dumps(obj)
     55 with io.BytesIO() as buf:
     56     p = pickler(buf, protocol=4)
---> 57     p.dump(obj)
     58     pickled = buf.getvalue()
     60 return pickled

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/cloudpickle/cloudpickle_fast.py:568, in CloudPickler.dump(self, obj)
    566 def dump(self, obj):
    567     try:
--> 568         return Pickler.dump(self, obj)
    569     except RuntimeError as e:
    570         if "recursion" in e.args[0]:

ValueError: ctypes objects containing pointers cannot be pickled

WAP described here should help:
https://numba.readthedocs.io/en/stable/reference/types.html#wrapper-address-protocol-wap

By this, I mean on a local machine without using a LocalCluster. Submitting it to a LocalCluster fails as can be seen above.

@ofk123 that is unfortunate.
Have you tried switching the default multiprocessing context?

Hi @Oyibo, thanks for reaching out.
Do you mean using something else than distributed? I have gotten very used to the above approach. But if you think it would be worth trying something else, then I would be willing to!

Calling scipy.special.cython_special-function from another module, seems to let me execute the function on a cluster. I think, serialising the pythonfile where a c-function is defined is not possible/allowed.

cython_special.py
import ctypes
import scipy
from numba import njit
from numba.extending import get_cython_function_address

addr = get_cython_function_address('scipy.special.cython_special', 'huber')
functype = ctypes.CFUNCTYPE(ctypes.c_double, ctypes.c_double, ctypes.c_double)
chuber = functype(addr)

@njit(cache=False)
def nchuber(delta, r):
    return chuber(delta, r)

module.py
from numba import njit
import cython_special as cs

@njit(cache=False)
def call_nchuber(delta, r):
    return cs.nchuber(delta, r)

from distributed import Client, LocalCluster
cluster=LocalCluster()
client=Client(cluster)
import module as m
import cython_special as cs
Calling from another module works:
submitted = client.submit(m.call_nchuber, 1.0, 4.0)
submitted.result()
[Out]: 3.5
Submitting the function from the module where the C-function is defined does not work:
submitted = client.submit(cs.nchuber, 1.0, 4.0)
submitted.result()
[Out]: 2024-02-28 19:02:43,072 - distributed.protocol.pickle - INFO - Failed to serialize CPUDispatcher(<function nchuber at 0x7f50bd4fe440>). Exception: ctypes objects containing pointers cannot be pickled
ValueError: ctypes objects containing pointers cannot be pickled
ValueError: ctypes objects containing pointers cannot be pickled (Traceback)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/worker.py:2919, in dumps_function(func)
   2918     with _cache_lock:
-> 2919         result = cache_dumps[func]
   2920 except KeyError:

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/collections.py:24, in LRU.__getitem__(self, key)
     23 def __getitem__(self, key):
---> 24     value = super().__getitem__(key)
     25     cast(OrderedDict, self.data).move_to_end(key)

File /srv/conda/envs/notebook/lib/python3.10/collections/__init__.py:1106, in UserDict.__getitem__(self, key)
   1105     return self.__class__.__missing__(self, key)
-> 1106 raise KeyError(key)

KeyError: CPUDispatcher(<function nchuber at 0x7faff5313130>)

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/protocol/pickle.py:46, in dumps(x, buffer_callback, protocol)
     45 buffers.clear()
---> 46 result = pickle.dumps(x, **dump_kwargs)
     47 if b"__main__" in result or (
     48     CLOUDPICKLE_GTE_20
     49     and getattr(inspect.getmodule(x), "__name__", None)
     50     in cloudpickle.list_registry_pickle_by_value()
     51 ):

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:106, in _pickle__CustomPickled(cp)
    102 """standard pickling for `_CustomPickled`.
    103 
    104 Uses `NumbaPickler` to dump.
    105 """
--> 106 serialized = dumps((cp.ctor, cp.states))
    107 return _unpickle__CustomPickled, (serialized,)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:57, in dumps(obj)
     56 p = pickler(buf, protocol=4)
---> 57 p.dump(obj)
     58 pickled = buf.getvalue()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/cloudpickle/cloudpickle_fast.py:568, in CloudPickler.dump(self, obj)
    567 try:
--> 568     return Pickler.dump(self, obj)
    569 except RuntimeError as e:

ValueError: ctypes objects containing pointers cannot be pickled

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
Cell In[3], line 1
----> 1 submitted = client.submit(cs.nchuber, 1.0, 4.0)
      2 submitted.result()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:1883, in Client.submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
   1880 else:
   1881     dsk = {skey: (func,) + tuple(args)}
-> 1883 futures = self._graph_to_futures(
   1884     dsk,
   1885     [skey],
   1886     workers=workers,
   1887     allow_other_workers=allow_other_workers,
   1888     priority={skey: 0},
   1889     user_priority=priority,
   1890     resources=resources,
   1891     retries=retries,
   1892     fifo_timeout=fifo_timeout,
   1893     actors=actor,
   1894 )
   1896 logger.debug("Submit %s(...), %s", funcname(func), key)
   1898 return futures[skey]

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:3010, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   3008 # Pack the high level graph before sending it to the scheduler
   3009 keyset = set(keys)
-> 3010 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   3012 # Create futures before sending graph (helps avoid contention)
   3013 futures = {key: Future(key, self, inform=False) for key in keyset}

File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/highlevelgraph.py:1078, in HighLevelGraph.__dask_distributed_pack__(self, client, client_keys, annotations)
   1072 layers = []
   1073 for layer in (self.layers[name] for name in self._toposort_layers()):
   1074     layers.append(
   1075         {
   1076             "__module__": layer.__module__,
   1077             "__name__": type(layer).__name__,
-> 1078             "state": layer.__dask_distributed_pack__(
   1079                 self.get_all_external_keys(),
   1080                 self.key_dependencies,
   1081                 client,
   1082                 client_keys,
   1083             ),
   1084             "annotations": layer.__dask_distributed_annotations_pack__(
   1085                 annotations
   1086             ),
   1087         }
   1088     )
   1089 return {"layers": layers}

File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/highlevelgraph.py:432, in Layer.__dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    427 merged_hlg_keys = all_hlg_keys | dsk.keys()
    428 dsk = {
    429     stringify(k): stringify(v, exclusive=merged_hlg_keys)
    430     for k, v in dsk.items()
    431 }
--> 432 dsk = toolz.valmap(dumps_task, dsk)
    433 return {"dsk": dsk, "dependencies": dependencies}

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cytoolz/dicttoolz.pyx:178, in cytoolz.dicttoolz.valmap()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cytoolz/dicttoolz.pyx:203, in cytoolz.dicttoolz.valmap()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/worker.py:2957, in dumps_task(task)
   2955         return d
   2956     elif not any(map(_maybe_complex, task[1:])):
-> 2957         return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   2958 return to_serialize(task)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/worker.py:2921, in dumps_function(func)
   2919         result = cache_dumps[func]
   2920 except KeyError:
-> 2921     result = pickle.dumps(func)
   2922     if len(result) < 100000:
   2923         with _cache_lock:

File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/protocol/pickle.py:58, in dumps(x, buffer_callback, protocol)
     56 try:
     57     buffers.clear()
---> 58     result = cloudpickle.dumps(x, **dump_kwargs)
     59 except Exception as e:
     60     logger.info("Failed to serialize %s. Exception: %s", x, e)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     69 with io.BytesIO() as file:
     70     cp = CloudPickler(
     71         file, protocol=protocol, buffer_callback=buffer_callback
     72     )
---> 73     cp.dump(obj)
     74     return file.getvalue()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py:632, in CloudPickler.dump(self, obj)
    630 def dump(self, obj):
    631     try:
--> 632         return Pickler.dump(self, obj)
    633     except RuntimeError as e:
    634         if "recursion" in e.args[0]:

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:106, in _pickle__CustomPickled(cp)
    101 def _pickle__CustomPickled(cp):
    102     """standard pickling for `_CustomPickled`.
    103 
    104     Uses `NumbaPickler` to dump.
    105     """
--> 106     serialized = dumps((cp.ctor, cp.states))
    107     return _unpickle__CustomPickled, (serialized,)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/core/serialize.py:57, in dumps(obj)
     55 with io.BytesIO() as buf:
     56     p = pickler(buf, protocol=4)
---> 57     p.dump(obj)
     58     pickled = buf.getvalue()
     60 return pickled

File /srv/conda/envs/notebook/lib/python3.10/site-packages/numba/cloudpickle/cloudpickle_fast.py:568, in CloudPickler.dump(self, obj)
    566 def dump(self, obj):
    567     try:
--> 568         return Pickler.dump(self, obj)
    569     except RuntimeError as e:
    570         if "recursion" in e.args[0]:

ValueError: ctypes objects containing pointers cannot be pickled