Skip to content

Conversation

@nsmith-
Copy link
Member

@nsmith- nsmith- commented Nov 10, 2025

🗒️ Status of this PR is tracked at #1482 since otherwise the discussion will get quite long.

This pull request introduces the initial implementation of the new coffea.compute module, envisioned as the successor of the Processor/Executor/Runner interface. Currently it is all new additions in the src/coffea/compute/ directory.

To get started, the best place to look is at protocol.py, which is sketched below:

class ResultT:
    def __add__(self, other: Self) -> Self: ...

class DataElement:
    def load(self) -> InputT: ...

class WorkElement:
    func: Callable[[InputT], ResultT]
    item: DataElement

    def __call__(self) -> ResultT:
        return self.func(self.item.load())

class Computable:
    def __iter__(self) -> Iterator[WorkElement[InputT, ResultT]]: ...

class Task:
    def result(self) -> ResultT | EmptyResult: ...

    def partial_result(
        self,
    ) -> tuple[ResultT | EmptyResult, Computable]: ...

    def wait(self) -> None: ...

class RunningBackend:
    def compute(self, item: Computable, /) -> Task[InputT, ResultT]: ...

class Backend(ContextManager[RunningBackend]): ...

This protocol allows us to keep things very modular.

Backends

Backends are essentially the replacement for Executors. The Computable defines the map part, and the ResultT's add function defines reduction. This is actually very similar to what we have in BaseExecutor:

def __call__(
self,
items: Iterable,
function: Callable,
accumulator: Accumulatable,
):
raise NotImplementedError(
"This class serves as a base class for executors, do not instantiate it!"
)

So I'm hoping it is enough for our use. Already with the partial_result we are extending what we could do before. The other big change is that RunningBackend.compute is not blocking, but rather returns a Task handle (really, a future). A trivial implementation of a backend would be:

@dataclass
class LazyTask:
    item: Computable

    def result(self):
        return sum((f() for f in self.item), EmptyResult())

    def partial_result(self):
        return EmptyResult(), self.item


class TrivialBackend:
    def compute(self, item: Computable, /):
        return LazyTask(item)

To help prototype, a complete single-threaded backend is implemented in backends/threaded.py, and a generic backend test is implemented in test_compute_backends.py.

Error handling

Although not yet part of the protocol, the SingleThreadedBackend already handles errors and enables re-tries, using a wrapper around the WorkElement to keep track of exceptions. See errors.py for details. A TODO for the protocol is to define exactly what the user interface should be for fetching the exceptions from a failed task. A quick example of re-running a failed job is as follows:

with SingleThreadedBackend() as backend:
    computable = dataset.map_steps(BuggyProcessor())
    task = backend.compute(
        computable, error_policy=ErrorPolicy(continue_on=(ValueError,))
    )
    task.wait()
    part, resumable = task.partial_result()

    rest = backend.compute(resumable).result()
    total = part + rest

Data objects

To build a Computable, we need a user interface that binds some function to some data. Hence the func.py and data.py modules. So far, func.py is not very complex, but this is eventually the place where the details of preprocessing (preparing?) would go, as well as whatever NanoEvents adapters are needed. For the data, the structure is heavily inspired and far from feature-parity with #1453. To apply a function, the user interface looks something like:

from coffea.compute.context import ContextInput
from coffea.compute.data import ContextDataset, Dataset, File, StepContextDataset
from coffea.compute.func import EventsArray


dataset = Dataset(
    files=[
        File(path="file1.root", steps=[(0, 100), (100, 200), (200, 300)]),
        File(path="file2.root", steps=[(0, 100), (100, 200), (200, 300)]),
    ],
    metadata=ContextDataset(dataset_name="singlemuon", cross_section=None),
)


def process(input: ContextInput[EventsArray, StepContextDataset]) -> int:
    return len(input.data)


computable = dataset.map_steps(process)

All this Context business is a way to get a typed metadata into the user function. Details are in context.py. Of course, we wrap this so the classic interface still works:

class MyProcessor:
    def process(self, events: EventsArray) -> int:
        return len(events)

computable2 = dataset.map_steps(MyProcessor())

Abstractions

Power users may define new Dataset and File classes. Two ABCs exist to define the protocol for iterating over steps (chunks) or files:

class StepIterable(ABC):
    @abstractmethod
    def iter_steps(self): ...

    def map_steps(self, func): ...

class FileIterable(ABC):
    @abstractmethod
    def iter_files(self) -> Iterator[DataElement]: ...

    def map_files(self, func: Callable[[InputT], ResultT]): ...

    def map_files_by(
        self,
        func: Callable[[InputT], ResultT],
        grouper: Callable[[Context], str],
    ): ...

Preprocessing

The goal, much like in the executor case, is to re-use the same protocol also for preprocessing. A sketch of what this might look like is in test_compute_data.py::test_prepare. Some more work needs to be done to understand the UI for joining the prepared files with the metadata of the input specification. For now, to get the ball rolling, there is a generic GroupedResult (see group.py) that uses information in the Context to derive a key for the addable output dictionary.

This is another departure from Processor: we try to handle the hierarchy of DataGroup -> Dataset -> Result for the user, letting them define the process function without the need to build a Result object that splits out the different datasets into dictionaries. The process function can still respond to the dataset name via the context, with the nice bonus that it is indicated in the type signature: a process function like

def process(input: ContextInput[EventsArray, Any]) -> Hist: ...

will not fill any dataset-dependent axes, whereas

def process(input: ContextInput[EventsArray, StepContextDataset]) -> Hist: ...

could.

Typing

Clearly there is a lot of fun to be had here! For example, in my IDE I get hints like:
Screenshot 2025-11-10 at 1 59 51 AM

i.e. the return type of the user function makes it all the way through the backend

What's next

Status of this PR tracked at #1482

There are about 20 TODOs and several design questions left. Please feel free to comment on this draft PR.
@btovar @pfackeldey @ikrommyd @lgray @alexander-held @NJManganelli in particular I'd be interested in your thoughts.

@pfackeldey
Copy link
Collaborator

pfackeldey commented Nov 10, 2025

This is great to see @nsmith-, I've worked with the current executor interface quite a bit now, and you've added many high-quality improvements to this new executor implementation.
In particular I like:

  • error handling (as far as I understand) becomes the job of the backend. That makes sense because they may introduce new errors that could need special handling, e.g. dask's KilledWorker exception.
  • typing makes it more clear to understand how things work (helps the analysts and the developer)
  • preprocessing becomes part the job of the dataset, i.e.: dataset{group}.map_steps. Here my only question is: was/is there ever the scale of datasets where one wanted to use e.g. Dask to do this? I think @alexander-held wants to run custom functions during this step, and that may not scale well if done purely in python for loops over the files.
  • backend.compute returns a future and is non-blocking - great!
  • group.py: if I understand this correctly this allows to reduce/merge e.g. within datasets and return a GroupedResult (without merging those further into a single output). That should be very handy for large scale analyses, because the cross-dataset merge explodes the output-histogram size a lot. (group.map_files_by is cool!)

These are some comments about ideas for improvement:

  • For all scale-out backends it is trivial to submit Processor.process(events) jobs and return a future for each of them. The next step (reduction/merge) is where smart logic can be useful. I would propose to extend the backend interface to something like this:
class RunningBackend(Protocol):
    """A RunningBackend represents an active backend context.
    This is the type returned by Backend.__enter__.
    It may be distinct from Backend to separate resource management
    from computation submission.
    """

    def compute(self, item: Computable[InputT, ResultT], /) -> Task[InputT, ResultT]:
        return self.reduce(*self.map(item))
        
    def map(self, item: Computable[InputT, ResultT], /) -> Iterable[Task[InputT, ResultT]]:
        ...  # e.g. `return (f() for f in item)`
       
    def reduce(self, tasks: Iterable[Task[InputT, ResultT]], /) -> Task[InputT, ResultT]:
        ...  # e.g. `return sum(tasks, EmptyResult())`

Then we can implement custom backends and just swap out the reduce step. It also clearly separates the map and reduce - people may want to use backend.map directly for things like skimming.
We could even think about a finalize method that runs e.g. after reduce and may do things like: write task results to disk as soon as they arrive. This could be handy for a GroupedResult where you want write the result of dataset A immediately to disk to free memory.

  • The Context mechanism is (if I understand it correctly) a way to add metadata to a WorkItem. I had a hard time to understand how this is used, maybe it would be helpful to add more doc strings or concrete examples in the tests? We want to extend the metadata for things like preloading such that you can specify per dataset a list of branches that we preload in a single read - it's not clear to me how to add this here? (also I'm missing the json-serializability here: it's good if you can do this preprocessing once and then write it to disk)
  • I'm not sure why we need the separation of result and partial_result. Can't we only have a non-blocking result() that gives you back: the current result, an iterable over unfinished workitems (computable), and the status, i.e.:
out, unfinished, status = task.result() # non-blocking (could e.g. filter the tasks by status without waiting for them)
if status == TaskStatus.COMPLETE:
  assert len(unfinished) == 0
  print("Success")

Some minor comments:


I would like to contribute to this PR (and with that deprecate the developed of #1468 to add this here instead). I think I can add the futures and dask backend, and implement 2 strategies for both:

  1. a map-reduce that submits the process function and then merges them as they arrive/finish (more-or-less what the current executors do - they just submit all futures immediately instead of waiting for them to arrive and handle errors appropriately)
  2. a map-reduce that submits the process function and then does a hopefully more robust merging as described in feat: improved DaskExecutor scheduling logic #1468 where the reduction happens first within datasets and then at the end across them. This could optionally skip the last cross-dataset merge and return a GroupedResult to avoid a potentially large memory spike, and/or make use of the finalize method (that I described above for the RunningBackend protocol) to write merged dataset results directly to disk.

@btovar
Copy link
Contributor

btovar commented Nov 10, 2025

Looks nice! How do you picture the backends interacting with the computable objects? For example, if I want a backend that wants to dynamically modify the chunksize, would it be ok to have an option where the backend gets a computable object rather than the iterator? In that way, the backend can modify attributes of the computable object that may change the next calls of the iterator.

@nsmith-
Copy link
Member Author

nsmith- commented Nov 10, 2025

@pfackeldey thanks for the quick turnaround!

preprocessing becomes part the job of the dataset, i.e.: dataset{group}.map_steps. Here my only question is: was/is there ever the scale of datasets where one wanted to use e.g. Dask to do this? I think @alexander-held wants to run custom functions during this step, and that may not scale well if done purely in python for loops over the files.

So, the idea is that dataset.map_files(preprocessor) produces a computable, so the preprocessing can still be submitted to a backend of choice. Indeed it can be quite expensive to do locally, often just due to the 2 seconds per file it takes to open a NanoAOD with uproot.

Then we can implement custom backends and just swap out the reduce step. It also clearly separates the map and reduce - people may want to use backend.map directly for things like skimming.

I had originally thought we would handle skimming / writing in a function wrapper and simply return a list of what was written as the ResultT, e.g. to transform into an input dataset for use with a subsequent task. But there is probably some merit in making the map and reduce steps more customizable by the backend. Currently the backend basically only gets the opportunity to pre-load data

To your general suggestion on splitting map/reduce, currently the Task is a high-level task representing all items. I'm not sure what that means in terms of Backend complexity if it is fine-grained. Building on your sketch, I could see something like:

from typing import Callable, Generic, Iterable, Iterator, Protocol, Self, TypeVar

from coffea.compute.protocol import (
    Addable,
    Computable,
    InputT,
    ResultT,
    WorkElement,
)


class Future(Generic[ResultT]):
    def __init__(self, item: WorkElement[InputT, ResultT]) -> None:
        "This should schedule the work element for execution"
        ...

    def result(self) -> ResultT: ...

    def add_done_callback(self, fn: Callable[[Self], None]) -> None: ...


R = TypeVar("R", bound=Addable)


class ResultFuture(Generic[R]):
    def add(self, item: Future[R]) -> None: ...

    def result(self) -> R: ...


class RunningBackend(Protocol):
    def compute(self, item: Computable[InputT, ResultT], /) -> ResultFuture[ResultT]:
        return self.reduce(self.map(item))

    def map(self, item: Computable[InputT, ResultT], /) -> Iterator[Future[ResultT]]:
        "Example implementation"
        return (Future(f) for f in item)

    def reduce(self, futures: Iterable[Future[ResultT]], /) -> ResultFuture[ResultT]:
        "Example implementation"
        out = ResultFuture[ResultT]()
        for f in futures:
            f.add_done_callback(out.add)
        return out

The tricky bit is for distributed backends: how will the data of the Future's result get shipped to the ResultFuture?

extend the metadata for things like preloading such that you can specify per dataset a list of branches that we preload in a single read

I would implement pre-loading by subclassing the StepElement with a list of preloaded columns, and then wrapping Dataset with a type that intercepts map_steps and adds the pre-loaded column list. This is because in the protocol, the idea is that DataElement.load should collect any IO-bound work that needs to happen before computation can start.

I'm missing the json-serializability here

For now, Context is a dataclass. What is should be is a pydantic data model. But that's a TODO. Same for Dataset and File objects.

I'm not sure why we need the separation of result and partial_result.

The happy path should be simple: my_histos = task.result(). If something goes wrong, we make a nice exception explaining how to recover. Because otherwise, I think many will do my_histos, *some_junk = task.result() and not understand we have to wait() the task if there is to be any hope of having some result. The reason I didn't call it a Future (even though that is what it is) is that is likely too CS jargon for users.

I think it's good to introduce a Err wrapper

Agreed. That's the idea behind FailedTaskElement.

I would like to contribute to this PR

Yes! Feel free to make a PR to this PR. Though, maybe some more discussion on map/reduce is warranted before committing too much effort.

@NJManganelli
Copy link
Collaborator

@pfackeldey The json serializable metadata is handled via the pydantic filespecs, currently there's an InputFiles component to a dataset which gets replaced by differently typed PreprocessedFiles (still some differentiation and lfn/pfn to add) #1398.

Also @ncsmith @lgray @ikrommyd would we all agree on Fileset -> DataGroup?

@nsmith-
Copy link
Member Author

nsmith- commented Nov 10, 2025

@btovar thanks for bringing up the dynamic chunking, I now remember you had implemented that at some point in the TaskVine Executor. So then we re-conceptualize Computable not as in iterator but perhaps more a Generator with a send channel for the Backend to request changes to the iterator in flight. Maybe something like:

from typing import Generator, Protocol, TypeAlias

from coffea.compute.protocol import (
    Computable,
    EmptyResult,
    InputT,
    ResultT,
    WorkElement,
)


class SizedWorkElement(WorkElement[InputT, ResultT], Protocol):
    def __len__(self) -> int:
        "Return the size of this work element in some unit (e.g., number of events)"
        ...


NewSizeRequest: TypeAlias = int


class ResizableComputable(Computable[InputT, ResultT], Protocol):
    def generate(
        self,
    ) -> Generator[SizedWorkElement[InputT, ResultT], NewSizeRequest, None]:
        "Generate work elements, possibly adapting their size based on external factors"
        ...


def compute_now(items: ResizableComputable[InputT, ResultT]) -> ResultT | EmptyResult:
    out = EmptyResult()
    work_gen = items.generate()
    # Let it tell us the initial size
    work_element = next(work_gen, None)
    if work_element is None:
        return out
    while True:
        result = work_element()
        out += result
        # Here we could adapt the size of future work elements based on performance metrics
        # For simplicity, we just request the same size
        try:
            work_element = work_gen.send(len(work_element))
        except StopIteration:
            break
    return out

(a real implementation would have to wrap this into a Task)

This would require a bit of re-imagining how the FailedTaskElement is implemented, it could not anymore just keep track of the index in the iteratable, but the whole materialized WorkElement for later re-computation.

@NJManganelli
Copy link
Collaborator

On preloading columns, one thing I have been thinking about is making the form required for preprocessed files. If you guarantee that, then at the file or taskelement level (so long as you have the datasetspec which stores the union form), you can store indices into the form for preloading or just which columns/branches are in the file (because the union form has the superset). It could be useful to have the latter just to eg filter on files where trigger branches in cms have evolved, I think that's a potential energy activation barrier for debugging we could smooth with such a mechanism. We'd have to pass down to the subelement the datasetspec, but it's metadata, or maybe in line with resizable compute task elements, you can retrieve updated metadata via messages

@pfackeldey
Copy link
Collaborator

Hi @nsmith-,
I'll just reply to a few comments in the following:

So, the idea is that dataset.map_files(preprocessor) produces a computable

I missed that you define this using map etc (which only lazily evaluates). Thus, the Computable object is fast to build, but it is actually only doing something when iterating over it - got it 👍

I had originally thought we would handle skimming / writing in a function wrapper and simply return a list of what was written as the ResultT, e.g. to transform into an input dataset for use with a subsequent task. But there is probably some merit in making the map and reduce steps more customizable by the backend. Currently the backend basically only gets the opportunity to pre-load data.

My main motivation would be to reuse logic in map and implement different reduce logics through inheritance. In theory it would also then be possible to use e.g. Dask for the map step, but concurrent.futures for the reduce step (e.g. when accumulation isn't possible on the cluster due to memory constraints).

To your general suggestion on splitting map/reduce, currently the Task is a high-level task representing all items. I'm not sure what that means in terms of Backend complexity if it is fine-grained.

I don't think it is necessary to concretely define different Future types. We just need a single Future protocol and every backend implements that based on its own concept of futures, e.g. concurrent.futures.Future or dask.distributed.client.Future and fulfills that protocol.

The tricky bit is for distributed backends: how will the data of the Future's result get shipped to the ResultFuture?

Isn't every backend we have currently using futures (except for IterativeExecutor)?

I was just imagining something like this, which would separate a bit the Task/Future concept (that every backend already provides afaik) from additional info, e.g. failures/continuation, using some sort of State that's just a lightweight collection of a backend-future type and this additional info, i.e.:

import typing as tp

class Future(tp.Protocol[ResultT]):
    """this protocol supports the minimal subset of features we need from https://docs.python.org/3/library/concurrent.futures.html#future-objects (and other backend future types)"""
    def result(self) -> ResultT: ...
    def add_done_callback(self, fn: Callable[[Self], None]) -> None: ...
   
@dataclass(frozen=True, slots=True) 
class State(Generic[InputT, ResultT]):
    out: Future[ResultT]
    failures: Computable[InputT, ResultT]

class SimpleDaskBackend(RunningBackend):
    def compute(self, item: Computable[InputT, ResultT], /) -> State[InputT, ResultT]:
        return self.reduce(self.map(item))

    def map(self, item: Computable[InputT, ResultT], /) -> Iterator[Future[ResultT]]:
        return self.client.map(lambda f: f(), item)
       
    def reduce(self, futures: Iterable[Future[ResultT]], /) -> State[InputT, ResultT]:
        out_f = self.client.submit(lambda: EmptyResult())
        failures = []
        for f in as_completed(futures):
            # handle failures
            if issubclass(f.type, BaseException) and f.key.startswith("Processor-"):
                failures.append(f) # <- translate them back to work elements, eg like `Continuation` does.
            else:
                out_f = self.client.submit(operator.add, out_f, f)
        return State(out_f, failures)
        
class FancyDaskBackend(SimpleDaskBackend):
    def reduce(self, futures: Iterable[Future[ResultT]], /) -> State[InputT, ResultT]:
        """A fancier reduce implementation"""

@ikrommyd
Copy link
Collaborator

Hello, I have skimmed through the discussion (not looked at actual code) and this is all looking very good!

I was wondering about two things for starters. When I originally heard the term coffea.compute, I thought it's going to be a function (and the namespace would have a different name).
I was imagining something like coffea.compute(some computable, executor=something) sort of like what dask.compute does where you tell it what to compute and some executor/scheduler.
Is this out of the question? Is there a strong reason why you preferred coffea.compute be a namespace and have backend.compute do the execution?
On a similar note, is there a reason to have an even higher level function that does "map across chunks + compute" in one go?

Regarding filesets, I haven't really been following the fileset spec/pydantic stuff but I saw this in your example above?

dataset = Dataset(
    files=[
        File(path="file1.root", steps=[(0, 100), (100, 200), (200, 300)]),
        File(path="file2.root", steps=[(0, 100), (100, 200), (200, 300)]),
    ],
    metadata=ContextDataset(dataset_name="singlemuon", cross_section=None),
)

Will the ability to pass good old fashioned json filesets remain? People tend to have such filesets already stored somewhere.

@NJManganelli
Copy link
Collaborator

The original seed idea was indeed a function akin to dask.compute,

For jsons, no, we should not allow that. Part of this was all motivated by not having to handle 3-4 different levels of dictionary nesting that may or may not (accidentally) get passed into a function (which is the current level of inception-ness in coffea's FIlesets + additional layer in column-joining paradigm). And we shouldn't handle it because it's a one-liner to convert to the pydantic input types:

[_starting_fileset, FilesetSpec(_starting_fileset)],

Literally just wrap your python dict in FilesetSpec(<dict>) or DatasetSpec(<dict>) etc. If anything fails, people should update, then save the pydantic version (i.e. that should be a one-time conversion).

@ikrommyd
Copy link
Collaborator

ikrommyd commented Nov 10, 2025

The original seed idea was indeed a function akin to dask.compute,

For jsons, no, we should not allow that. Part of this was all motivated by not having to handle 3-4 different levels of dictionary nesting that may or may not (accidentally) get passed into a function (which is the current level of inception-ness in coffea's FIlesets + additional layer in column-joining paradigm). And we shouldn't handle it because it's a one-liner to convert to the pydantic input types:

[_starting_fileset, FilesetSpec(_starting_fileset)],

Literally just wrap your python dict in FilesetSpec(<dict>) or DatasetSpec(<dict>) etc. If anything fails, people should update, then save the pydantic version (i.e. that should be a one-time conversion).

Oh yeah, that was a mistake on my part. I did not mean to literally allow jsons, but an easy way (one-liner) for people to get a proper fileset structure from an existing json that they have. But you're saying that this will be supported.

@nsmith-
Copy link
Member Author

nsmith- commented Nov 10, 2025

When I originally heard the term coffea.compute, I thought it's going to be a function (and the namespace would have a different name).
I was imagining something like coffea.compute(some computable, executor=something) sort of like what dask.compute does where you tell it what to compute and some executor/scheduler.
Is this out of the question? Is there a strong reason why you preferred coffea.compute be a namespace and have backend.compute do the execution?

I thought about that originally, but context managers are very nice things to have. I see bugs in clearing the progress bar, for example, that are probably as a result of the lack of these currently that would be nice to get past once and for all.

@nsmith-
Copy link
Member Author

nsmith- commented Nov 10, 2025

different reduce logics through inheritance

My preference would be to capture any backend-agnostic reduce logic in ResultT.__add__ and any backend-specific reduce logic in backend-internal wrappers. The backend defines the Task (what you are calling State above) so it is free to key on the type of output and follow different paths, such as detecting GroupedResult and keeping the groups separate in some way as it builds the result, returing the full one only at the end. I'm imagining that a DaskBackend might have options such as reduce_local: bool to decide whether or not to keep the partial result in the cluster or locally, and partition_groups: bool to decide whether to hold separate futures for each group in a GroupedResult if it is returned.

What you have looks like most of a initial dask backend, at least once the reduce call is refactored to be non-blocking. You will have to make a background thread to handle the as_completed loop, but I think the SingleThreadedBackend gives all the needed recipes. Do you want to make a PR with it?

edit: note, the DaskBackend.map erases the input type. You might wonder why we even keep it around, which is fair. It basically is only to give the option described in

def __call__(self) -> ResultT:
"""Execute the work element by loading the data and applying the function.
Concrete implementations should define this method rather than inheriting the protocol.
They MUST implement the method exactly as specified here.
Backends MAY call `item.load()` to preload data while working on another element,
as DataElement.load is assumed to be IO-bound.
"""
return self.func(self.item.load())

Assuming #1444 will be merged before this one
@pfackeldey
Copy link
Collaborator

different reduce logics through inheritance

My preference would be to capture any backend-agnostic reduce logic in ResultT.__add__ and any backend-specific reduce logic in backend-internal wrappers. The backend defines the Task (what you are calling State above) so it is free to key on the type of output and follow different paths, such as detecting GroupedResult and keeping the groups separate in some way as it builds the result, returing the full one only at the end. I'm imagining that a DaskBackend might have options such as reduce_local: bool to decide whether or not to keep the partial result in the cluster or locally, and partition_groups: bool to decide whether to hold separate futures for each group in a GroupedResult if it is returned.

What you have looks like most of a initial dask backend, at least once the reduce call is refactored to be non-blocking. You will have to make a background thread to handle the as_completed loop, but I think the SingleThreadedBackend gives all the needed recipes. Do you want to make a PR with it?

edit: note, the DaskBackend.map erases the input type. You might wonder why we even keep it around, which is fair. It basically is only to give the option described in

def __call__(self) -> ResultT:
"""Execute the work element by loading the data and applying the function.
Concrete implementations should define this method rather than inheriting the protocol.
They MUST implement the method exactly as specified here.
Backends MAY call `item.load()` to preload data while working on another element,
as DataElement.load is assumed to be IO-bound.
"""
return self.func(self.item.load())

sounds good, I can give it a try 👍

@alexander-held
Copy link
Member

Lots of material here, this is great to see! There are a few API layers and I suspect the test_compute_integration.py example is closest to the high-level API users would be expected to default to? I don't follow how schemas would get attached here. I find myself now semi-regularly writing a Runner replacement to get access to the uproot.open handle instead of only the NanoEventsFactory.events() content. That makes me wonder if the information arriving in a process function could default to going through a NanoEventsFactory call but also be modified to instead be an open file, or perhaps the file could be optional / customizable additional information being exposed additionally? Perhaps also interesting to consider a a use case where no schema for my data exists at all and I want to use the same infrastructure to run some uproot / awkward functions over my data?

@nsmith-
Copy link
Member Author

nsmith- commented Nov 19, 2025

There are a few API layers and I suspect the test_compute_integration.py example is closest to the high-level API users would be expected to default to?

Yeah, maybe to riff on that example, it might look something like

dataset = InputDataset(
    files=["file1.root", "file2.root"],
    metadata=ContextDataset(dataset_name="my_dataset", cross_section=1.0),
)


def get_histo_metadata(file_element: OpenROOTFile) -> MyMetadata:
    # Extract some metadata from the file
    histo = file_element.root_dir["histo"].values()
    return MyMetadata(histo_data=list(histo))


def do_analysis(input: ContextInput[EventsArray, MyMetadata]) -> int:
    # Perform some analysis using the events and metadata
    return len(input.data) + sum(input.context.histo_data)


with Backend() as backend:
    preparer = PrepareFile(metadata_extractor=get_histo_metadata)
    prepared = backend.compute(dataset.map_files(preparer)).result()
    assert isinstance(prepared, PreparedDataset)

    prepared.to_json("processable_dataset.json")

    prepared = PreparedDataset.from_json("processable_dataset.json")

    task = backend.compute(prepared.map_steps(do_analysis))
    out = task.result()
    coffea.util.save(out, "output.coffea")

where the definitions of all the extra classes are as below:

Click to expand
from dataclasses import dataclass
from typing import Any, Callable

import fsspec
from pydantic import BaseModel

import coffea.util
from coffea.compute.context import Context, ContextDataElement, ContextInput
from coffea.compute.data.inputdata import InputDataset
from coffea.compute.data.processable_data import (
    ContextDataset,
    StepElement,
    StepIterable,
)
from coffea.compute.data.rootfile import OpenROOTFile
from coffea.compute.func import EventsArray
from coffea.compute.protocol import Backend


@dataclass(frozen=True)
class MyMetadata(Context):
    histo_data: list[int]


class FileInfo(BaseModel):
    file_path: str
    metadata: MyMetadata
    # lineage/provenance?

    def iter_steps(self):
        # For simplicity, assume a single step covering the whole file
        yield ContextDataElement(StepElement((0, -1), self.file_path), self.metadata)


class PreparedDataset(BaseModel, StepIterable[MyMetadata]):
    files: list[FileInfo]

    def iter_steps(self):
        for fileinfo in self.files:
            yield from fileinfo.iter_steps()

    def __add__(self, other: "PreparedDataset") -> "PreparedDataset":
        return PreparedDataset(files=self.files + other.files)

    def to_json(self, path: str) -> None:
        with fsspec.open(path, mode="w", compression="infer") as f:
            f.write(self.model_dump_json())  # type: ignore

    @classmethod
    def from_json(cls, path: str) -> "PreparedDataset":
        with fsspec.open(path, mode="r", compression="infer") as f:
            return cls.model_validate_json(f.read())  # type: ignore


@dataclass
class PrepareFile:
    metadata_extractor: Callable[[OpenROOTFile], MyMetadata]

    def __call__(self, item: ContextInput[OpenROOTFile, Any]) -> PreparedDataset:
        metadata = self.metadata_extractor(item.data)
        return PreparedDataset(
            files=[FileInfo(file_path=item.data.file_path, metadata=metadata)],
        )

I'm not sure how much of that would be user-driven, it's a lot of classes to define, but may be useful for framework developers that live on top of coffea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants