Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures
import json
import math
import multiprocessing
import os
import pickle
import time
Expand Down Expand Up @@ -501,6 +502,28 @@ def __call__(
)


# this class changes the default pickler of ProcessPoolExecutor to the default cloudpickle.Pickler
class _CloudPickleProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
def __init__(
self,
max_workers=None,
mp_context=None,
initializer=None,
initargs=(),
max_tasks_per_child=None,
):
if mp_context is None:
mp_context = multiprocessing.get_context()
mp_context.reduction.ForkingPickler = cloudpickle.Pickler
super().__init__(
max_workers=max_workers,
mp_context=mp_context,
initializer=initializer,
initargs=initargs,
max_tasks_per_child=max_tasks_per_child,
)


@dataclass
class FuturesExecutor(ExecutorBase):
"""Execute using multiple local cores using python futures
Expand All @@ -514,7 +537,7 @@ class FuturesExecutor(ExecutorBase):
accumulator : Accumulatable
An accumulator to collect the output of the function
pool : concurrent.futures.Executor class or instance, optional
The type of futures executor to use, defaults to ProcessPoolExecutor.
The type of futures executor to use, defaults to _CloudPickleProcessPoolExecutor.
You can pass an instance instead of a class to reuse an executor
workers : int, optional
Number of parallel processes for futures (default 1)
Expand Down Expand Up @@ -553,7 +576,7 @@ class FuturesExecutor(ExecutorBase):

pool: Union[
Callable[..., concurrent.futures.Executor], concurrent.futures.Executor
] = concurrent.futures.ProcessPoolExecutor # fmt: skip
] = _CloudPickleProcessPoolExecutor # fmt: skip
mergepool: Optional[
Union[
Callable[..., concurrent.futures.Executor],
Expand Down
21 changes: 21 additions & 0 deletions tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,24 @@ def postprocess(self, accumulator):

acc = None
super(test, proc).postprocess(acc)


def test_issue1408():
from coffea import processor

class P(processor.ProcessorABC):
def process(self, events):
return True

def postprocess(self, accumulator):
pass

fileset = {"dy": {"files": {"tests/samples/nano_dy.root": "Events"}}}
run = processor.Runner(executor=processor.FuturesExecutor())
print(
run(
fileset=fileset,
processor_instance=P(),
iteritems_options={"filter_name": lambda name: True},
)
)
Loading