diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 2078aab61..bfafb9fc8 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1,6 +1,7 @@ import concurrent.futures import json import math +import multiprocessing import os import pickle import time @@ -501,6 +502,28 @@ def __call__( ) +# this class changes the default pickler of ProcessPoolExecutor to the default cloudpickle.Pickler +class _CloudPickleProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor): + def __init__( + self, + max_workers=None, + mp_context=None, + initializer=None, + initargs=(), + max_tasks_per_child=None, + ): + if mp_context is None: + mp_context = multiprocessing.get_context() + mp_context.reduction.ForkingPickler = cloudpickle.Pickler + super().__init__( + max_workers=max_workers, + mp_context=mp_context, + initializer=initializer, + initargs=initargs, + max_tasks_per_child=max_tasks_per_child, + ) + + @dataclass class FuturesExecutor(ExecutorBase): """Execute using multiple local cores using python futures @@ -514,7 +537,7 @@ class FuturesExecutor(ExecutorBase): accumulator : Accumulatable An accumulator to collect the output of the function pool : concurrent.futures.Executor class or instance, optional - The type of futures executor to use, defaults to ProcessPoolExecutor. + The type of futures executor to use, defaults to _CloudPickleProcessPoolExecutor. You can pass an instance instead of a class to reuse an executor workers : int, optional Number of parallel processes for futures (default 1) @@ -553,7 +576,7 @@ class FuturesExecutor(ExecutorBase): pool: Union[ Callable[..., concurrent.futures.Executor], concurrent.futures.Executor - ] = concurrent.futures.ProcessPoolExecutor # fmt: skip + ] = _CloudPickleProcessPoolExecutor # fmt: skip mergepool: Optional[ Union[ Callable[..., concurrent.futures.Executor], diff --git a/tests/test_processor.py b/tests/test_processor.py index b5d836b67..a114e016e 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -25,3 +25,24 @@ def postprocess(self, accumulator): acc = None super(test, proc).postprocess(acc) + + +def test_issue1408(): + from coffea import processor + + class P(processor.ProcessorABC): + def process(self, events): + return True + + def postprocess(self, accumulator): + pass + + fileset = {"dy": {"files": {"tests/samples/nano_dy.root": "Events"}}} + run = processor.Runner(executor=processor.FuturesExecutor()) + print( + run( + fileset=fileset, + processor_instance=P(), + iteritems_options={"filter_name": lambda name: True}, + ) + )