Pickling multiprocessing Connection objects
For safe message-based communication between threads and processes in Python, I tend to use multiprocessing’s Queue
and Pipe
. A pattern often seen is using a queue for sending messages from multiple producers to a single consumer.
When a producer wants a response to its message, I create a Pipe
and piggy-back one end of the Pipe
(a Connection
object) to the message. I use Python dicts as messages, and use the string “reply_to” as the dictionary key for the connection objects.
When the queue consumer processes a message, it doesn’t know who the sender is or how to reach him. Though, if the message has an attached Connection
object, the consumer can–almost magically–respond to the sender, across thread and process boundaries.
All good? Nope.
Any message sent through the queues and pipes must be serializable, or picklable as we say in Pythonesque. The multiprocessing.Connection
objects can be serialized, but not unserialized, which means that you will not see an exception when you create your message, but some time later in the consumer that tries to respond. The exception does not tell you much, unless you’ve seen it before:
Traceback (most recent call last):
File "pipetest2.py", line 10, in <module>;
print c1.recv();
TypeError: function takes at least 1 argument (0 given)
This has been a known bug in Python for two years. Googling the exception leads to StackOverflow question asking for a workaround.
I’ve usually added a version of the workaround to some util package in my projects; one function for pickling a connection, and one function for unpickling a connection. In my code I’ve been forced to manually pickle/unpickle Connection
objects before putting them on a Queue
or Pipe
.
from multiprocessing.reduction import reduce_connection
import pickle
def pickle_connection(connection):
return pickle.dumps(reduce_connection(connection))
def unpickle_connection(pickled_connection):
(func, args) = pickle.loads(pickled_connection)
return func(*args)
This works great most of the time, but not this time. In the Python actor model library Pykka I use Connection
objects to implement futures for thread-based actors, similar to how I use gevent’s AsyncResult
for gevent-based actors. When someone sets a value on the future, it is written to one end of a Pipe
. When someone tries to read the future’s value, they block on the other end of the Pipe
until there is something to get or a timeout is reached. The problem appeared when I tried to nest futures, which is likely to happen if an actor, in response to your message, returns a future result from another actor. I no longer have the opportunity to babysit every Connection
object that goes into or comes out of another Connection
. They need to be able to watch over themselves. As the Connection
class is implemented in C and is rather closed to changes, my solution was to wrap the Connection
objects:
import multiprocessing.reduction
class ConnectionWrapper(object):
"""
Wrapper for :class:`multiprocessing.Connection` objects to make them
picklable.
"""
def __init__(self, connection):
self._connection = connection
def __reduce__(self):
(conn_func, conn_args) = multiprocessing.reduction.reduce_connection(
self._connection)
wrapper_func = _ConnectionWrapperRebuilder(conn_func)
return (wrapper_func, conn_args)
def __getattr__(self, name):
return getattr(self._connection, name)
class _ConnectionWrapperRebuilder(object):
"""
Internal class used by :class:`ConnectionWrapper` to rewrap
:class:`multiprocessing.Connection` objects when they are depickled.
A function defined inside :meth:`ConnectionWrapper.__reduce__` which takes
:attr:`conn_func` from its scope cannot be used, as functions must be
defined at the module's top level to be picklable.
"""
def __init__(self, inner_func):
self._inner_func = inner_func
def __call__(self, *args):
connection = self._inner_func(*args)
return _ConnectionWrapper(connection)
The ConnectionWrapper
class simply implements __reduce__
on the wrapped Connection
object using multiprocessing’s own reduce_connection
function. To work like a real Connection
object, it dispatches any attribute access to the wrapped connection by implementing __getattr__
.
To make sure the connection remains wrapped even after a trip through pickle.dumps()
and pickle.loads()
, _ConnectionWrapperBuilder
is used for rebuilding the connection and rewrapping it on deserialization.
Given this wrapper, you can make your own Pipe
function which creates a new pipe and wraps the connection objects for you.
Hopefully this trick will be of help until the bug is fixed in Python.