diff --git a/.gitignore b/.gitignore index 3c5855dbe..031461ff0 100644 --- a/.gitignore +++ b/.gitignore @@ -141,6 +141,11 @@ _version.py *.to_upload tempCodeRunnerFile.python Untitled-*.py +*.zip +*.json +*.db +*.tgz +_api/ trace.txt ?/ *.prof diff --git a/docs/building/code/using-python-1.py b/docs/building/code/using-python-1.py new file mode 100644 index 000000000..196a25f42 --- /dev/null +++ b/docs/building/code/using-python-1.py @@ -0,0 +1,3 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() diff --git a/docs/building/code/using-python-10.py b/docs/building/code/using-python-10.py new file mode 100644 index 000000000..1a9488927 --- /dev/null +++ b/docs/building/code/using-python-10.py @@ -0,0 +1,13 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = ( + (a := r.grib(path="dir1/*.grib")) + & r.grib(path="dir2/*.grib") + & r.forcings(param=["cos_latitude", "sin_latitude"], template=a) +) + +r.dump() diff --git a/docs/building/code/using-python-11.py b/docs/building/code/using-python-11.py new file mode 100644 index 000000000..735d3e8dc --- /dev/null +++ b/docs/building/code/using-python-11.py @@ -0,0 +1,14 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = r.concat( + { + ("2023-01-01T00:00:00", "2023-06-30T18:00:00", "12h"): r.grib(path="gribs/*.grib"), + ("2023-07-01T00:00:00", "2023-12-31T18:00:00", "12h"): r.netcdf(path="ncdfs/*.nc"), + } +) + +r.dump() diff --git a/docs/building/code/using-python-12.py b/docs/building/code/using-python-12.py new file mode 100644 index 000000000..8b65664ad --- /dev/null +++ b/docs/building/code/using-python-12.py @@ -0,0 +1,249 @@ +import datetime + +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.description = """ +This is a complex example of a dataset recipe written in Python. +It uses data from two different ECMWF research experiments for atmospheric and wave data, +from ECMWF's MARS archive. For the atmospheric data, it combines data from two +12-hourly data streams (oper and lwda) to create a dataset with a 6-hourly frequency. +""" + +r.name = "aifs-rd-an-oper-ioku-mars-n320-2024-2024-6h-v1" +r.licence = "CC-BY-4.0" +r.attribution = "ECMWF" + +start_date = datetime.datetime(2024, 5, 2, 0, 0) +end_date = datetime.datetime(2024, 9, 8, 18, 0) + +r.dates = { + "start": start_date, + "end": end_date, + "frequency": "6h", +} + +r.build = {"use_grib_paramid": True} +r.statistics = {"allow_nans": True} + + +grid = "n320" + +ioku = { + "class": "rd", + "grid": grid, + "expver": "ioku", +} + +ikdi = { + "class": "rd", + "grid": grid, + "expver": "ikdi", +} + +accumulations_stream = { + "oper": "lwda", + "lwda": "oper", +} + + +def accumulations(stream): + return r.accumulations( + levtype="sfc", + param=["cp", "tp", "sf", "strd", "ssrd"], + stream=accumulations_stream[stream], + **ioku, + ) + + +def pressure_levels(stream): + return r.mars( + stream=stream, + level=[ + 1, + 10, + 30, + 50, + 70, + 100, + 150, + 200, + 250, + 300, + 400, + 500, + 600, + 700, + 850, + 925, + 1000, + ], + levtype="pl", + param=["t", "u", "v", "w", "z"], + **ioku, + ) + + +def pressure_levels_q(stream): + return r.mars( + levtype="pl", + param=["q"], + level=[50, 100, 150, 200, 250, 300, 400, 500, 600, 700, 850, 925, 1000], + stream=stream, + **ioku, + ) + + +def sfc_fields(stream): + return r.mars( + levtype="sfc", + param=[ + "10u", + "10v", + "2d", + "2t", + "lsm", + "msl", + "sdor", + "skt", + "slor", + "tcw", + "z", + # Land parameters below + "stl1", + "stl2", + "tcc", + "mcc", + "hcc", + "lcc", + "100u", + "100v", + ], + stream=stream, + **ioku, + ) + + +def surface_pressure(stream): + return ( + r.mars( + levtype="ml", + levelist=1, + param="lnsp", + stream=stream, + **ioku, + ) + | r.lnsp_to_sp() + ) + + +def apply_mask(): + return r.apply_mask( + path="/data/climate.v015/319_3/lsm.grib", + mask_value=0, + ) + + +def land_params(stream): + soil_params = r.mars( + levtype="sfc", + param=["swvl1", "swvl2", "sd"], + stream=stream, + **ioku, + ) + + snow_cover = ( + r.mars( + levtype="sfc", + param=["sd", "rsn"], + stream=stream, + **ioku, + ) + | r.snow_cover() + ) + + run_off = r.accumulations( + levtype="sfc", + param=["ro"], + stream=accumulations_stream[stream], + **ioku, + ) + + return (soil_params & snow_cover & run_off) | apply_mask() + + +def constants(template): + return r.constants( + param=[ + "cos_latitude", + "cos_longitude", + "sin_latitude", + "sin_longitude", + "cos_julian_day", + "cos_local_time", + "sin_julian_day", + "sin_local_time", + "insolation", + ], + template=template, + ) + + +def wave_data(): + return ( + r.mars( + param=[ + "swh", + "cdww", + "mwp", + "mwd", + "wmb", + "h1012", + "h1214", + "h1417", + "h1721", + "h2125", + "h2530", + ], + stream="wave", + **ikdi, + ) + | r.cos_sin_mean_wave_direction() + ) + + +def atmos_data(stream): + return ( + (a := sfc_fields(stream)) + & surface_pressure(stream) + & pressure_levels(stream) + & pressure_levels_q(stream) + & accumulations(stream) + & land_params(stream) + & constants(template=a) + ) + + +def dates(hour): + s = start_date.replace(hour=hour) + e = end_date.replace(hour=hour + 12) + while s > start_date: + s -= datetime.timedelta(hours=24) + while e < end_date: + e += datetime.timedelta(hours=24) + return (s, e, "12h") + + +def input_data(): + return r.concat( + { + dates(0): atmos_data("oper"), + dates(6): atmos_data("lwda"), + } + ) + + +r.input = input_data() & wave_data() + +r.dump() diff --git a/docs/building/code/using-python-2.py b/docs/building/code/using-python-2.py new file mode 100644 index 000000000..717129592 --- /dev/null +++ b/docs/building/code/using-python-2.py @@ -0,0 +1,8 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe( + description="Example dataset recipe", + name="example-dataset", + licence="CC-BY-4.0", + attribution="my-organisation", +) diff --git a/docs/building/code/using-python-3.py b/docs/building/code/using-python-3.py new file mode 100644 index 000000000..173ce89ea --- /dev/null +++ b/docs/building/code/using-python-3.py @@ -0,0 +1,12 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.description = """ +Example dataset recipe using Python, with attributes set one by one +and a multi-line description. +""" + +r.name = "example-dataset" +r.licence = "CC-BY-4.0" +r.attribution = "my-organisation" diff --git a/docs/building/code/using-python-4.py b/docs/building/code/using-python-4.py new file mode 100644 index 000000000..169203146 --- /dev/null +++ b/docs/building/code/using-python-4.py @@ -0,0 +1,22 @@ +from datetime import datetime + +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +# As a tuple (start, end, frequency) +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +# As a dictionary +r.dates = { + "start": "2023-01-01T00:00:00", + "end": "2023-12-31T18:00:00", + "frequency": "12h", +} + +# You can also provide datetime objects +r.dates = { + "start": datetime(2023, 1, 1, 0, 0), + "end": datetime(2023, 12, 31, 18, 0), + "frequency": "12h", +} diff --git a/docs/building/code/using-python-5.py b/docs/building/code/using-python-5.py new file mode 100644 index 000000000..a6c5881c4 --- /dev/null +++ b/docs/building/code/using-python-5.py @@ -0,0 +1,7 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = r.grib(path="data/*.grib") | r.clip(minimum=0, maximum=100) diff --git a/docs/building/code/using-python-6.py b/docs/building/code/using-python-6.py new file mode 100644 index 000000000..02b18afdf --- /dev/null +++ b/docs/building/code/using-python-6.py @@ -0,0 +1,7 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = r.grib(path="dir1/*.grib") & r.grib(path="dir2/*.grib") diff --git a/docs/building/code/using-python-7.py b/docs/building/code/using-python-7.py new file mode 100644 index 000000000..2f5042f79 --- /dev/null +++ b/docs/building/code/using-python-7.py @@ -0,0 +1,7 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = (r.grib(path="dir1/*.grib") & r.grib(path="dir2/*.grib")) | r.clip(minimum=0, maximum=100) diff --git a/docs/building/code/using-python-8.py b/docs/building/code/using-python-8.py new file mode 100644 index 000000000..0c24a626c --- /dev/null +++ b/docs/building/code/using-python-8.py @@ -0,0 +1,9 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +r.input = (r.grib(path="dir1/*.grib") & r.grib(path="dir2/*.grib")) | r.clip(minimum=0, maximum=100) + +r.dump() diff --git a/docs/building/code/using-python-9.py b/docs/building/code/using-python-9.py new file mode 100644 index 000000000..6aa86608e --- /dev/null +++ b/docs/building/code/using-python-9.py @@ -0,0 +1,13 @@ +from anemoi.datasets.recipe import Recipe + +r = Recipe() + +r.dates = ("2023-01-01T00:00:00", "2023-12-31T18:00:00", "12h") + +a = r.grib(path="dir1/*.grib") +b = r.grib(path="dir2/*.grib") +c = r.forcings(param=["cos_latitude", "sin_latitude"], template=a) + +r.input = a & b & c + +r.dump() diff --git a/docs/building/using-python.rst b/docs/building/using-python.rst new file mode 100644 index 000000000..b84c1921f --- /dev/null +++ b/docs/building/using-python.rst @@ -0,0 +1,111 @@ +############################## + Using Python defined recipes +############################## + +You can use Python to define recipes for building datasets. This allows +for more complex logic and flexibility compared to using static +configuration files. + +When executed, the Python code will generate a YAML configuration that +can be used by the dataset building tool. + +Here is an example of how to define a dataset recipe using Python. + +First create a ``Recipe`` object, which will hold the configuration: + +.. literalinclude:: code/using-python-1.py + :language: python + +you can pass parameters to the ``Recipe`` constructor: + +.. literalinclude:: code/using-python-2.py + :language: python + +or set them later: + +.. literalinclude:: code/using-python-3.py + :language: python + +You need to select which dates to use for building the dataset: + +.. literalinclude:: code/using-python-4.py + :language: python + +All data sources and filters are defined as method calls on the +``Recipe`` (any hyphen is replaced by an underscore): + +So the :ref:`grib ` source is defined as +``Recipe.grib(...)`` and the :ref:`clip ` +filter as ``Recipe.clip(...)``. + +Source and filter methods can be combined together and assigned to +``Recipe.input``. + +Use the pipe operator ``|`` to chain sources and filters: + +.. literalinclude:: code/using-python-5.py + :language: python + +Use the ampersand operator ``&`` to combine multiple inputs: + +.. literalinclude:: code/using-python-6.py + :language: python + +And you can combine both operators: + +.. literalinclude:: code/using-python-7.py + :language: python + +To generate the YAML configuration, call the ``Recipe.dump()`` method: + +.. literalinclude:: code/using-python-8.py + :language: python + +Which will output: + +.. literalinclude:: yaml/using-python-1.yaml + :language: yaml + +Sometimes you need to refer to part of the input in a source or a +filter, such as when using the :ref:`forcing_variables` source. + +You can do this by assigning the result of a source or filter to a +variable, and use that variable later in the recipe. + +.. literalinclude:: code/using-python-9.py + :language: python + +Or you can assigning the result of a source or filter to a variable +using the walrus operator ``:=`` to both assign and use the variable in +the same expression: + +.. literalinclude:: code/using-python-10.py + :language: python + +Finally, if you need different inputs for different dates, you can use +the ``Recipe.concat()`` method, which takes a dictionary mapping dates +to inputs: + +.. literalinclude:: code/using-python-11.py + :language: python + +Note that the dates can also be :class:`datetime.datetime` objects and +the frequency can be a :class:`datetime.timedelta` object. + +.. note:: + + To get you started quickly, you can use the :ref:`anemoi-datasets + recipe --python recipe.yaml ` to transform an + existing YAML recipe into a Python script. + +Below is the complete example. It uses the :ref:`mars-source` and +:ref:`accumulations-source` source to get data from the ECMWF's MARS +archive. In addition, it uses :ref:`lnsp-to-sp +` to convert the logarithm of the +surface pressure to the surface pressure, :ref:`snow-cover +` to compute the snow cover from the +snow depth and snow density and :ref:`apply-mask +` to replace zeros with `NaNs`. + +.. literalinclude:: code/using-python-12.py + :language: python diff --git a/docs/building/yaml/using-python-1.yaml b/docs/building/yaml/using-python-1.yaml new file mode 100644 index 000000000..cff307b90 --- /dev/null +++ b/docs/building/yaml/using-python-1.yaml @@ -0,0 +1,15 @@ +dates: + start: 2023-01-01T00:00:00Z + end: 2023-12-31T18:00:00Z + frequency: 12h + +input: + pipe: + - join: + - grib: + path: dir1/*.grib + - grib: + path: dir2/*.grib + - clip: + minimum: 0 + maximum: 100 diff --git a/docs/cli/recipe.rst b/docs/cli/recipe.rst new file mode 100644 index 000000000..0fb325ffa --- /dev/null +++ b/docs/cli/recipe.rst @@ -0,0 +1,20 @@ +.. _recipe_command: + +Recipe Command +============== + + +Anemoi datasets are stored in a zarr format and can be located on a local file system or on a remote server. +The `inspect` command is used to inspect the contents of a dataset. +This command will output the metadata of the dataset, including the variables, dimensions, and attributes. + +.. code:: console + + $ anemoi-datasets recipe [options] recipe.yaml + + +.. argparse:: + :module: anemoi.datasets.__main__ + :func: create_parser + :prog: anemoi-datasets + :path: recipe diff --git a/src/anemoi/datasets/commands/copy.py b/src/anemoi/datasets/commands/copy.py index 5020a208d..406c13de7 100644 --- a/src/anemoi/datasets/commands/copy.py +++ b/src/anemoi/datasets/commands/copy.py @@ -504,6 +504,7 @@ def add_arguments(self, command_parser: Any) -> None: default=100, help="For optimisation purposes, data is transfered by blocks. Default is 100.", ) + command_parser.add_argument("--workdir", help="Working directory for the copy operation.", default=".") command_parser.add_argument("source", help="Source location.") command_parser.add_argument("target", help="Target location.") @@ -533,6 +534,7 @@ def run(self, args: Any) -> None: resume=args.resume, verbosity=args.verbosity, threads=args.transfers, + workdir=args.workdir, ) copier.run() return diff --git a/src/anemoi/datasets/commands/recipe/__init__.py b/src/anemoi/datasets/commands/recipe/__init__.py index 45400806c..773e9e4ab 100644 --- a/src/anemoi/datasets/commands/recipe/__init__.py +++ b/src/anemoi/datasets/commands/recipe/__init__.py @@ -37,6 +37,7 @@ def add_arguments(self, command_parser: Any) -> None: command_parser.add_argument("--validate", action="store_true", help="Validate recipe.") command_parser.add_argument("--format", action="store_true", help="Format the recipe.") command_parser.add_argument("--migrate", action="store_true", help="Migrate the recipe to the latest version.") + command_parser.add_argument("--python", action="store_true", help="Convert the recipe to a Python script.") group = command_parser.add_mutually_exclusive_group() group.add_argument("--inplace", action="store_true", help="Overwrite the recipe file in place.") @@ -49,7 +50,7 @@ def add_arguments(self, command_parser: Any) -> None: def run(self, args: Any) -> None: - if not args.validate and not args.format and not args.migrate: + if not args.validate and not args.format and not args.migrate and not args.python: args.validate = True with open(args.path) as file: @@ -58,10 +59,10 @@ def run(self, args: Any) -> None: assert isinstance(config, dict) if args.validate: - if args.inplace and (not args.format and not args.migrate): + if args.inplace and (not args.format and not args.migrate and not args.python): argparse.ArgumentError(None, "--inplace is not supported with --validate.") - if args.output and (not args.format and not args.migrate): + if args.output and (not args.format and not args.migrate and not args.python): argparse.ArgumentError(None, "--output is not supported with --validate.") validate_config(config) @@ -69,6 +70,8 @@ def run(self, args: Any) -> None: return if args.migrate: + from .migrate import migrate_recipe + config = migrate_recipe(args, config) if config is None: LOG.info(f"{args.path}: No changes needed.") @@ -77,6 +80,8 @@ def run(self, args: Any) -> None: args.format = True if args.format: + from .format import format_recipe + formatted = format_recipe(args, config) assert "dates" in formatted f = sys.stdout @@ -89,5 +94,20 @@ def run(self, args: Any) -> None: print(formatted, file=f) f.close() + if args.python: + from anemoi.datasets.create import config_to_python + + if args.inplace: + argparse.ArgumentError(None, "Inplace conversion to Python is not supported.") + + if args.format: + raise argparse.ArgumentError(None, "Formatting is not supported when converting to Python.") + + if args.output: + with open(args.output, "w") as file: + file.write(config_to_python(config)) + else: + print(config_to_python(config)) + command = Recipe diff --git a/src/anemoi/datasets/create/__init__.py b/src/anemoi/datasets/create/__init__.py index 5600cb254..b60258d02 100644 --- a/src/anemoi/datasets/create/__init__.py +++ b/src/anemoi/datasets/create/__init__.py @@ -1657,3 +1657,27 @@ def _tidy(d): LOG.error("❌ Config validation failed (jsonschema):") LOG.error(e.message) raise + + +def config_to_python(config: Any) -> Any: + + from ..create.python import PythonScript + + raw_config = config + + config = loader_config(config) + + input = InputBuilder(config.input, data_sources=config.get("data_sources", {})) + + code = PythonScript() + x = input.python_code(code) + code = code.source_code(x, raw_config) + + try: + import black + + return black.format_str(code, mode=black.Mode()) + # except ImportError: + except Exception: + LOG.warning("Black not installed, skipping formatting") + return code diff --git a/src/anemoi/datasets/create/config.py b/src/anemoi/datasets/create/config.py index bbeaee83a..b551c6f4d 100644 --- a/src/anemoi/datasets/create/config.py +++ b/src/anemoi/datasets/create/config.py @@ -10,6 +10,8 @@ import datetime import logging import os +import subprocess +import sys from copy import deepcopy from typing import Any @@ -403,6 +405,11 @@ def loader_config(config: dict, is_test: bool = False) -> LoadersConfig: LoadersConfig The validated configuration object. """ + + if isinstance(config, str) and config.endswith(".py"): + result = subprocess.run([sys.executable, config], capture_output=True, text=True, check=True) + config = yaml.safe_load(result.stdout) + config = Config(config) if is_test: set_to_test_mode(config) diff --git a/src/anemoi/datasets/create/input/action.py b/src/anemoi/datasets/create/input/action.py index 7808ae717..5f21503d5 100644 --- a/src/anemoi/datasets/create/input/action.py +++ b/src/anemoi/datasets/create/input/action.py @@ -8,13 +8,15 @@ # nor does it submit to any jurisdiction. import logging +from abc import ABC +from abc import abstractmethod from anemoi.datasets.dates import DatesProvider LOG = logging.getLogger(__name__) -class Action: +class Action(ABC): """An "Action" represents a single operation described in the yaml configuration, e.g. a source, a filter, pipe, join, etc. @@ -30,6 +32,17 @@ def __init__(self, config, *path): "data_sources", ), f"{self.__class__.__name__}: path must start with 'input' or 'data_sources': {path}" + @abstractmethod + def __call__(self, context, argument): + pass + + @abstractmethod + def python_code(self, code): + pass + + def __repr__(self): + return f"{self.__class__.__name__}({'.'.join(str(x) for x in self.path)}, {self.config})" + class Concat(Action): """The Concat contruct is used to concat different actions that are responsible @@ -85,6 +98,11 @@ def __call__(self, context, argument): return context.register(results, self.path) + def python_code(self, code): + return code.concat( + {filtering_dates.to_python(): action.python_code(code) for filtering_dates, action in self.choices} + ) + class Join(Action): """Implement the join operation to combine results from multiple actions. @@ -121,6 +139,9 @@ def __call__(self, context, argument): return context.register(results, self.path) + def python_code(self, code) -> None: + return code.sum(a.python_code(code) for a in self.actions) + class Pipe(Action): """Implement the pipe operation to chain results from a @@ -159,6 +180,9 @@ def __call__(self, context, argument): return context.register(result, self.path) + def python_code(self, code) -> None: + return code.pipe(a.python_code(code) for a in self.actions) + class Function(Action): """Base class for sources and filters.""" @@ -176,6 +200,13 @@ def __call__(self, context, argument): return context.register(self.call_object(context, source, argument), self.path) + def python_code(self, code) -> str: + # For now... + if "source" in self.config: + source = action_factory(self.config["source"], *self.path, "source") + self.config["source"] = source.python_code(code) + return code.call(self.name, self.config) + class DatasetSourceMixin: """Mixin class for sources defined in anemoi-datasets""" @@ -250,6 +281,9 @@ def __init__(self, config, *path): else: self.sources = {i: action_factory(v, *path, str(i)) for i, v in enumerate(config)} + def python_code(self, code): + return code.sources({k: v.python_code(code) for k, v in self.sources.items()}) + def __call__(self, context, argument): for name, source in self.sources.items(): context.register(source(context, argument), self.path + (name,)) @@ -262,6 +296,12 @@ def __init__(self, input, data_sources): self.input = input self.data_sources = data_sources + def python_code(self, code): + return code.recipe( + self.input.python_code(code), + self.data_sources.python_code(code), + ) + def __call__(self, context, argument): # Load data_sources self.data_sources(context, argument) diff --git a/src/anemoi/datasets/create/input/data_sources.py b/src/anemoi/datasets/create/input/data_sources.py index 31bf3d8cc..9aa2429dd 100644 --- a/src/anemoi/datasets/create/input/data_sources.py +++ b/src/anemoi/datasets/create/input/data_sources.py @@ -84,6 +84,11 @@ def __repr__(self) -> str: content = "\n".join([str(i) for i in self.sources]) return self._repr(content) + def python_code(self, code) -> str: + for n, s in zip(self.names, self.sources): + code.source(n, s.python_code(code)) + return code + class DataSourcesResult(Result): """Class to represent the result of data sources actions in the dataset creation process.""" diff --git a/src/anemoi/datasets/create/python.py b/src/anemoi/datasets/create/python.py new file mode 100644 index 000000000..29b8c611d --- /dev/null +++ b/src/anemoi/datasets/create/python.py @@ -0,0 +1,578 @@ +# (C) Copyright 2025 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +import logging +import re +from collections import defaultdict +from functools import cached_property + +LOG = logging.getLogger(__name__) + +RESERVED_KEYWORDS = ( + "and", + "or", + "not", + "is", + "in", + "if", + "else", + "elif", + "for", + "while", + "return", + "class", + "def", + "with", + "as", + "import", + "from", + "try", + "except", + "finally", + "raise", + "assert", + "break", + "continue", + "pass", +) + + +def _sanitize_name(name): + name = name.replace("-", "_") + if name in RESERVED_KEYWORDS: + name = f"{name}_" + return name + + +def _un_dotdict(x): + if isinstance(x, dict): + return {k: _un_dotdict(v) for k, v in x.items()} + + if isinstance(x, (list, tuple, set)): + return [_un_dotdict(a) for a in x] + + return x + + +class PythonCode: + + def __init__(self, top): + self.top = top + self.top.register(self) + self.key = str(id(self)) + + def call(self, name, argument): + return PythonCall(self.top, name, argument) + + def sum(self, actions): + return PythonChain(self.top, "join", "&", actions) + + def pipe(self, actions): + return PythonChain(self.top, "pipe", "|", actions) + + def concat(self, argument): + return PythonConcat(self.top, argument) + + def source_code(self): + return self.top.source_code(self) + + def combine(self, nodes): + return None + + def recipe(self, input, data_sources): + return PythonRecipe(self.top, input, data_sources) + + def prelude(self): + return None + + def sources(self, sources): + return PythonSources(self.top, sources) + + def update_anchor(self): + pass + + +class Variable(PythonCode): + def __init__(self, name, node): + super().__init__(top=node.top) + self.name = name + self.node = node + + def __repr__(self): + return "" + + def replace_node(self, old, new): + pass + + def prelude(self): + return [f"{self.name} = {repr(self.node)}", ""] + + +class InLine(PythonCode): + def __init__(self, node): + super().__init__(top=node.top) + self.node = node + + @cached_property + def name(self): + n = self.top.counter["_anchor"] + self.top.counter["_anchor"] += 1 + return f"_a{n}" + + def __repr__(self): + return f"({self.name} := {repr(self.node)})" + + def replace_node(self, old, new): + pass + + +class PythonRecipe(PythonCode): + def __init__(self, top, input, data_sources): + super().__init__(top) + self.input = input + self.data_sources = data_sources + + def apply_references(self, *path): + self.data_sources.apply_references(*path, "data_sources") + self.input.apply_references(*path, "input") + + def replace_node(self, old, new): + if self.input is old: + self.input = new + return + + if self.data_sources is old: + self.data_sources = new + return + + self.input.replace_node(old, new) + self.data_sources.replace_node(old, new) + + def __repr__(self): + return repr(self.input) + + def prelude(self): + return self.data_sources.prelude() + + +class Argument(PythonCode): + + def __init__(self, top, name): + super().__init__(top=top) + self.name = _sanitize_name(name) + + def __repr__(self): + return self.name + + def replace_node(self, old, new): + pass + + +class Anchor(PythonCode): + + def __init__(self, identifier): + super().__init__(top=identifier.node.top) + self.identifier = identifier + + @property + def name(self): + return self.identifier.name + + def __repr__(self): + # assert False + return repr(self.identifier) + + def replace_node(self, old, new): + pass + + +class Reference(PythonCode): + + def __init__(self, top, path): + super().__init__(top) + self.path = tuple(path) + self.anchor = None + + def update_anchor(self): + + node = self.top.by_reference.get(self.path, None) + if node is None: + LOG.warning(f"Reference {self.path} not found") + for p in sorted(self.top.by_reference): + LOG.warning(f" - {p}") + else: + self.anchor = Anchor(node) + self.top.replace_nodes([(node.node, self.anchor)]) + + def __repr__(self): + if self.anchor is not None: + return self.anchor.name + + return f"'${{{'.'.join(self.path)}}}'" + + def replace_node(self, old, new): + pass + + +class Function(PythonCode): + def __init__(self, name, node, counter): + super().__init__(top=node.top) + self._name = name + self.node = node + self.used = False + self.counter = counter + + def __repr__(self): + return self.name + + def prelude(self): + if self.used: + return None + + self.used = True + + node_prelude = self.node.prelude() + + arguments = self.node.free_arguments() + + return [ + *(node_prelude if node_prelude else []), + f"def {self.name}({','.join(repr(p) for p in arguments)}):", + f" return {self.node}", + ] + + def free_arguments(self): + return self.node.free_arguments() + + @cached_property + def name(self): + n = self.counter[self._name] + self.counter[self._name] += 1 + if n == 0: + return _sanitize_name(self._name) + return _sanitize_name(f"{self._name}_{n}") + + def replace_node(self, old, new): + if self.node is old: + self.node = new + + +class PythonSources(PythonCode): + def __init__(self, top, sources): + super().__init__(top) + self.sources = sources + + def __repr__(self): + return "" + + def prelude(self): + pass + + def replace_node(self, old, new): + for k, v in list(self.sources.items()): + if v is old: + self.sources[k] = new + else: + v.replace_node(old, new) + + def apply_references(self, *path): + for k, v in self.sources.items(): + self.top.by_reference[path + (k,)] = Variable(k, v) + + +class PythonConcat(PythonCode): + def __init__(self, top, argument): + super().__init__(top=top) + self.argument = _un_dotdict(argument) + + def __repr__(self): + return f"r.concat({self.argument})" + + def replace_node(self, old, new): + for k, v in list(self.argument.items()): + if v is old: + self.argument[k] = new + else: + v.replace_node(old, new) + + def apply_references(self, *path): + assert "concat" not in path, path + self.top.by_reference[path + ("concat",)] = InLine(self) + for i, node in enumerate(self.argument.values()): + node.apply_references(*path, "concat", str(i)) + + +class PythonChain(PythonCode): + def __init__(self, top, kind, op, actions): + super().__init__(top=top) + self.op = op + self.kind = kind + self.actions = list(actions) + self.key = op + + def __repr__(self): + return "(" + self.op.join(repr(x) for x in self.actions) + ")" + + def replace_node(self, old, new): + + for i, node in enumerate(self.actions): + + if node is old: + self.actions[i] = new + else: + node.replace_node(old, new) + + def apply_references(self, *path): + self.top.by_reference[path + (self.kind,)] = InLine(self) + for i, node in enumerate(self.actions): + node.apply_references(*path, self.kind, str(i)) + + +class PythonCall(PythonCode): + def __init__(self, top, name, argument): + super().__init__(top=top) + self.name = name + self.argument = _un_dotdict(argument) + self.key = name + + def free_arguments(self): + result = [] + for k, v in self.argument.items(): + if isinstance(v, Argument): + result.append(v) + return result + + def __repr__(self): + name = self.name.replace("-", "_") + config = dict(**self.argument) + + params = [] + + for k, v in config.items(): + k = _sanitize_name(k) + + if not k.isidentifier(): + return f"r.{name}({config})" + + params.append(f"{k}={repr(v)}") + + if params: + params.append("") # For a trailing comma + + params = ",".join(params) + return f"r.{name}({params})" + + def replace_node(self, old, new): + pass + + def combine(self, nodes): + + # Exact similarity + + changes = self._combine0(nodes) + if changes: + return changes + + # On key difference + changes = self._combine1(nodes) + if changes: + return changes + + def _combine0(self, nodes): + + x = defaultdict(list) + for node in nodes: + key = {k2: v2 for k2, v2 in sorted(node.argument.items())} + x[str(key)].append(node) + + for i in sorted(x.values(), key=len, reverse=True): + node = i[0] + if len(i) < 2: + return + + call = PythonCall(self.top, self.name, node.argument) + + func = self.top.function(call) + changes = [] + for node in i: + + new = PythonFunction(top=self.top, func=func) + + changes.append((node, new)) + + return changes + + def _combine1(self, nodes): + + x = defaultdict(list) + for node in nodes: + argument = node.argument + for k, v in argument.items(): + rest = {k2: v2 for k2, v2 in sorted(argument.items()) if k2 != k} + x[str(rest)].append((k, v, node)) + + for i in sorted(x.values(), key=len, reverse=True): + key, value, node = i[0] + if len(i) < 2: + return + + rest = {k: v for k, v in node.argument.items() if k != key} + rest[key] = Argument(self.top, key) + call = PythonCall(self.top, self.name, rest) + + func = self.top.function(call) + changes = [] + for key, value, node in i: + + new = PythonFunction( + top=self.top, + func=func, + **{key: value}, + ) + + changes.append((node, new)) + + return changes + + def apply_references(self, *path): + self.top.by_reference[path + (self.name,)] = InLine(self) + + for k, v in self.argument.items(): + if isinstance(v, str) and (m := re.match(r"^\${(\w+(?:\.\w+)+)}$", v)): + path = m.group(1).split(".") + self.argument[k] = Reference(self.top, path) + + +class PythonFunction(PythonCode): + def __init__(self, top, func, **kwargs): + super().__init__(top=top) + self.func = func + self.kwargs = kwargs + + def __repr__(self): + + params = [] + for a in self.func.free_arguments(): + name = _sanitize_name(a.name) + if a.name in self.kwargs: + v = self.kwargs[a.name] + params.append(f"{name}={repr(v)}") + else: + params.append(f"{name}={name}") + + return f"{self.func}({', '.join(params)})" + + def replace_node(self, old, new): + self.func.replace_node(old, new) + + def prelude(self): + return self.func.prelude() + + def free_arguments(self): + return [a for a in self.func.free_arguments() if a.name not in self.kwargs] + + def apply_references(self, *path): + pass + + +class PythonScript(PythonCode): + + def __init__(self): + self.nodes = [] + self.counter = defaultdict(int) + self.by_reference = {} + super().__init__(top=self) + + def register(self, child): + if child is not self: + self.nodes.append(child) + + def prelude(self, config): + + from anemoi.datasets.recipe import Recipe + + SKIP = ( + "input", + "data_sources", + "common", + "aliases", + ) + + result = [] + + for k, v in config.items(): + + if k in SKIP: + continue + + if not hasattr(Recipe, k): + LOG.warning(f"Unknown key in recipe: {k}") + assert False, f"Unknown key in recipe: {k}" + continue + + result.append(f"r.{k} = {repr(v)}") + + for node in self.nodes: + prelude = node.prelude() + if prelude: + if not isinstance(prelude, (list, tuple)): + prelude = list(prelude) + result.extend(prelude) + return "\n".join(result) + + def source_code(self, first, config): + + which = self.nodes.index(first) + first.apply_references() + for node in self.nodes: + node.update_anchor() + + more = True + while more: + more = False + + by_class = defaultdict(list) + for node in self.nodes: + by_class[(node.__class__, node.key)].append(node) + + for nodes in by_class.values(): + if len(nodes) > 1: + changes = nodes[0].combine(nodes) + if changes: + self.replace_nodes(changes) + more = True + + first = self.nodes[which] + + return "\n\n".join( + [ + "# Generated Python code for Anemoi dataset creation", + "import datetime", + "from anemoi.datasets.recipe import Recipe", + "r = Recipe()", + self.prelude(config), + f"r.input = {repr(first)}", + "r.dump()", + ] + ) + + def function(self, node): + return Function(node.name, node, self.counter) + + def replace_nodes(self, changes): + + for old, new in changes: + assert old in self.nodes, f"Node {old} not found in {self.nodes}" + for i, node in enumerate(self.nodes): + + if node is old: + self.nodes[i] = new + else: + node.replace_node(old, new) diff --git a/src/anemoi/datasets/recipe.py b/src/anemoi/datasets/recipe.py new file mode 100644 index 000000000..9acaadfdb --- /dev/null +++ b/src/anemoi/datasets/recipe.py @@ -0,0 +1,493 @@ +# (C) Copyright 2025 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import logging +import os +import sys +from collections import defaultdict +from tempfile import TemporaryDirectory + +from anemoi.transform.filters import filter_registry as transform_filter_registry +from anemoi.utils.config import DotDict +from anemoi.utils.dates import as_datetime +from anemoi.utils.dates import frequency_to_string +from anemoi.utils.dates import frequency_to_timedelta + +from anemoi.datasets.create.sources import source_registry + +LOG = logging.getLogger(__name__) + + +def _un_dotdict(x): + if isinstance(x, dict): + return {k: _un_dotdict(v) for k, v in x.items()} + + if isinstance(x, (list, tuple, set)): + return [_un_dotdict(a) for a in x] + + return x + + +class Index: + def __init__(self, index): + self.name = str(index) + + def __repr__(self): + return f"Index({self.name})" + + def same(self, other): + if not isinstance(other, Index): + return False + return self.name == other.name + + +class Step: + + def __or__(self, other): + return Pipe(self, other) + + def __and__(self, other): + return Join(self, other) + + def same(self, other): + return self is other + + +class Chain(Step): + def __init__(self, *args): + if len(args) > 0 and isinstance(args[0], self.__class__): + args = args[0].steps + args[1:] + + self.steps = args + self.index = [Index(i) for i in range(len(self.steps))] + + def as_dict(self, recipe): + if len(self.steps) == 1: + return self.steps[0].as_dict(recipe) + return {self.name: [s.as_dict(recipe) for s in self.steps]} + + def path(self, target, result, *path): + + if target is self: + result.append([*path, self]) + return + + for i, s in enumerate(self.steps): + s.path(target, result, *path, self, self.index[i]) + + def collocated(self, a, b): + return True + + +class Pipe(Chain): + name = "pipe" + + +class Join(Chain): + name = "join" + + +class Concat(Step): + name = "concat" + + def __init__(self, args): + assert isinstance(args, dict), f"Invalid argument {args}" + self.params = args + + def __setitem__(self, key, value): + self.params[key] = value + + def as_dict(self, recipe): + + result = [] + + for k, v in sorted(self.params.items()): + + key = dict(start=as_datetime(k[0]), end=as_datetime(k[1])) + if len(k) == 3: + key["frequency"] = k[2] + + result.append({"dates": key, **v.as_dict(recipe)}) + + return {"concat": result} + + def collocated(self, a, b): + return a[0].same(b[0]) + + def path(self, target, result, *path): + if target is self: + result.append([*path, self]) + return + for i, (k, v) in enumerate(sorted(self.params.items())): + v.path(target, result, *path, self, Index(i)) + + +class Base(Step): + def __init__(self, owner, *args, **kwargs): + self.owner = owner + self.name = owner.name + self.params = {} + for a in args: + assert isinstance(a, dict), f"Invalid argument {a}" + self.params.update(a) + self.params.update(kwargs) + + def as_dict(self, recipe): + + def resolve(params, recipe, name=None): + if isinstance(params, dict): + + def _(k): + if isinstance(k, str) and k.endswith("_"): + return k[:-1] + return k + + return {_(k): resolve(v, recipe, name=_(k)) for k, v in params.items()} + + if isinstance(params, (list, tuple)): + return [resolve(v, recipe) for v in params] + + if isinstance(params, list): + return [resolve(v, recipe) for v in params] + + if isinstance(params, Step): + return recipe.resolve(self, params, name=name) + + return params + + return {self.owner.name: resolve(self.params, recipe)} + + def path(self, target, result, *path): + if self is target: + result.append([*path, self]) + + +class Source(Base): + pass + + +class Filter(Base): + pass + + +class SourceMaker: + def __init__(self, name, factory): + self.name = name + self.factory = factory + + def __call__(self, *args, **kwargs): + return Source(self, *args, **kwargs) + + +class FilterMaker: + def __init__(self, name, factory): + self.name = name + self.factory = factory + + def __call__(self, *args, **kwargs): + if len(args) > 0 and isinstance(args[0], Step): + prev = args[0] + args = args[1:] + return Pipe(prev, Filter(self, *args, **kwargs)) + return Filter(self, *args, **kwargs) + + +class Recipe: + + def __init__(self, name=None, description=None, attribution=None, licence=None): + + self._description = description + self._attribution = attribution + self._licence = licence + self._name = name + self._dates = None + self._statistics = None + self._build = None + self._env = None + self._dataset_status = None + self._output = None + self._platform = None + + self.input = Join() + self.output = DotDict() + self.statistics = DotDict() + self.build = DotDict() + + self._data_sources = {} + self._counter = defaultdict(int) + + sources = source_registry.factories.copy() + filters = transform_filter_registry.factories.copy() + + for key, factory in sources.items(): + if key in filters: + LOG.warning( + f"Source `{key}` is registered in anemoi.datasets source registry and in anemoi.transform filter registry" + ) + del filters[key] + + for key, factory in sources.items(): + key = key.replace("-", "_") + assert not hasattr(self, key) + setattr(self, key, SourceMaker(key, factory)) + + for key, factory in filters.items(): + key = key.replace("-", "_") + assert not hasattr(self, key) + setattr(self, key, FilterMaker(key, factory)) + + self.repeated_dates = SourceMaker("repeated_dates", None) + + def as_dict(self): + result = { + "name": self.name, + "description": self.description, + "attribution": self.attribution, + "licence": self.licence, + "dates": self.dates, + "statistics": self.statistics, + "build": self.build, + } + + if self._data_sources: + result["data_sources"] = self._data_sources + + for k, v in list(result.items()): + if v is None: + del result[k] + + return result + + def concat(self, *args, **kwargs): + return Concat(*args, **kwargs) + + def make_data_source(self, name, target): + + target = target.as_dict(self) + + name = name or "source" + if name in self._data_sources: + if self._data_sources[name] == target: + return f"${{data_sources.{name}}}" + + n = self._counter[name] + self._counter[name] += 1 + + name = f"{name}_{n}" if n > 0 else name + + self._data_sources[name] = target.copy() + return f"${{data_sources.{name}}}" + + def resolve(self, source, target, name=None): + + top = Index("input") # So we have 'input' first in the path + + path_to_source = [] + self.input.path(source, path_to_source, top) + if len(path_to_source) == 0: + raise ValueError(f"Source {source} not found in recipe") + if len(path_to_source) > 1: + raise ValueError(f"Source {source} found in multiple locations {path_to_source}") + path_to_source = path_to_source[0] + + path_to_target = [] + self.input.path(target, path_to_target, top) + if len(path_to_target) > 1: + raise ValueError(f"Target {target} found in multiple locations {path_to_target}") + + if len(path_to_target) == 0: + # Add a `data_sources` entry + return self.make_data_source(name, target) + + path_to_target = path_to_target[0] + + a = [s for s in path_to_target] + b = [s for s in path_to_source] + common_ancestor = None + while a[0] is b[0]: + common_ancestor = a[0] + a = a[1:] + b = b[1:] + + assert common_ancestor is not None, f"Common ancestor not found between {source} and {target}" + + if not common_ancestor.collocated(a, b): + source = ".".join(s.name for s in path_to_source) + target = ".".join(s.name for s in path_to_target) + raise ValueError( + f"Source ${{{source}}} and target ${{{target}}} are not collocated (i.e. they are not branch of a 'concat')" + ) + + target = ".".join(s.name for s in path_to_target) + return f"${{{target}}}" + + @property + def description(self): + return self._description + + @description.setter + def description(self, value): + self._description = value.strip() + + @property + def attribution(self): + return self._attribution + + @attribution.setter + def attribution(self, value): + self._attribution = value.strip() + + @property + def licence(self): + return self._licence + + @licence.setter + def licence(self, value): + self._licence = value.strip() + + @property + def name(self): + return self._name + + @name.setter + def name(self, value): + self._name = value.strip() + + @property + def dates(self): + return self._dates + + def _parse_dates(self, value): + + if isinstance(value, dict): + return value + + start = None + end = None + frequency = 1 + + if isinstance(value, (list, tuple)): + if len(value) in [2, 3]: + start = value[0] + end = value[1] + + if len(value) == 3: + frequency = frequency_to_string(frequency_to_timedelta(value[2])) + if isinstance(frequency, int): + frequency = f"{frequency}h" + + if start is None or end is None: + raise ValueError(f"Invalid dates {value}") + + if isinstance(frequency, int): + frequency = f"{frequency}h" + + return dict( + start=as_datetime(start), + end=as_datetime(end), + frequency=frequency, + ) + + @dates.setter + def dates(self, value): + self._dates = self._parse_dates(value) + + @property + def output(self): + return self._output + + @output.setter + def output(self, value): + self._output = value + + @property + def statistics(self): + return self._statistics + + @statistics.setter + def statistics(self, value): + self._statistics = value + + @property + def build(self): + return self._build + + @build.setter + def build(self, value): + self._build = value + + @property + def env(self): + return self._env + + @env.setter + def env(self, value): + self._env = value + + @property + def dataset_status(self): + return self._dataset_status + + @dataset_status.setter + def dataset_status(self, value): + self._dataset_status = value + + @property + def platform(self): + return self._platform + + @platform.setter + def platform(self, value): + self._platform = value + + def dump(self, file=sys.stdout): + input = self.input.as_dict(self) # First so we get the data_sources + + result = self.as_dict() + + result["input"] = input + + result["output"] = self.output + + result["statistics"] = self.statistics + + result["build"] = self.build + + result["env"] = self.env + + result["dataset_status"] = self.dataset_status + + result["platform"] = self.platform + + for k, v in list(result.items()): + if v is None or v == {} or v == []: + del result[k] + + from .dumper import yaml_dump + + yaml_dump(_un_dotdict(result), stream=file) + + def test(self, output="recipe.zarr"): + from argparse import ArgumentParser + + from anemoi.datasets.commands.create import command + + parser = ArgumentParser() + parser.add_argument("command", help="Command to run") + + cmd = command() + cmd.add_arguments(parser) + + with TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "recipe.yaml") + with open(path, "w") as file: + self.dump(file) + + args = parser.parse_args(["create", path, output, "--overwrite", "--test"]) + cmd.run(args)