Skip to content

🎉 Online saving with appropriate arborescence #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- add python 3.13 support
- (constants.py) locate additional constants to this file for clarity
- (dataset.py, sample.py) initiate get/set feature_identifiers mechanisms
- (dataset.py) add method `add_to_dir` to iteratively save `Sample` objects to a directory

### Changed

- Update repo configuration (actions)
- Update readme
- Update README
- Update documentation (including configuration and replacing data challenges page with PLAID benchmark one)
- (types/*) improve typing factorization
- (stats.py) improve OnlineStatistics and Stats classes
Expand Down Expand Up @@ -89,4 +90,3 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[0.1.3]: https://github.com/PLAID-lib/plaid/compare/0.1.2...0.1.3
[0.1.2]: https://github.com/PLAID-lib/plaid/compare/0.1.1...0.1.2
[0.1.1]: https://github.com/PLAID-lib/plaid/releases/tag/0.1.1

154 changes: 111 additions & 43 deletions src/plaid/containers/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
from typing import TypeVar

Self = TypeVar("Self")

import logging
import os
import shutil
import subprocess
from multiprocessing import Pool
from pathlib import Path
from typing import Iterator, Union
from typing import Iterator, Optional, Union

import numpy as np
import yaml
Expand Down Expand Up @@ -66,7 +65,7 @@ class Dataset(object):

def __init__(
self,
directory_path: Union[str, Path] = None,
directory_path: Optional[Union[str, Path]] = None,
verbose: bool = False,
processes_number: int = 0,
) -> None:
Expand Down Expand Up @@ -126,7 +125,7 @@ def __init__(

# -------------------------------------------------------------------------#
def get_samples(
self, ids: list[int] = None, as_list: bool = False
self, ids: Optional[list[int]] = None, as_list: bool = False
) -> dict[int, Sample]:
"""Return dictionnary of samples with ids corresponding to :code:`ids` if specified, else all samples.

Expand All @@ -144,7 +143,7 @@ def get_samples(
else:
return {id: self._samples[id] for id in ids}

def add_sample(self, sample: Sample, id: int = None) -> int:
def add_sample(self, sample: Sample, id: Optional[int] = None) -> int:
"""Add a new :class:`Sample <plaid.containers.sample.Sample>` to the :class:`Dataset <plaid.containers.dataset.Dataset>.`.

Args:
Expand Down Expand Up @@ -215,7 +214,9 @@ def del_sample(self, sample_id: int) -> None:

return deleted_sample

def add_samples(self, samples: list[Sample], ids: list[int] = None) -> list[int]:
def add_samples(
self, samples: list[Sample], ids: Optional[list[int]] = None
) -> list[int]:
"""Add new :class:`Samples <plaid.containers.sample.Sample>` to the :class:`Dataset <plaid.containers.dataset.Dataset>`.

Args:
Expand Down Expand Up @@ -336,7 +337,7 @@ def get_sample_ids(self) -> list[int]:
return list(self._samples.keys())

# -------------------------------------------------------------------------#
def get_scalar_names(self, ids: list[int] = None) -> list[str]:
def get_scalar_names(self, ids: Optional[list[int]] = None) -> list[str]:
"""Return union of scalars names in all samples with id in ids.

Args:
Expand All @@ -358,7 +359,7 @@ def get_scalar_names(self, ids: list[int] = None) -> list[str]:
return scalars_names

# -------------------------------------------------------------------------#
def get_time_series_names(self, ids: list[int] = None) -> list[str]:
def get_time_series_names(self, ids: Optional[list[int]] = None) -> list[str]:
"""Return union of time series names in all samples with id in ids.

Args:
Expand All @@ -381,7 +382,10 @@ def get_time_series_names(self, ids: list[int] = None) -> list[str]:

# -------------------------------------------------------------------------#
def get_field_names(
self, ids: list[int] = None, zone_name: str = None, base_name: str = None
self,
ids: Optional[list[int]] = None,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
) -> list[str]:
"""Return union of fields names in all samples with id in ids.

Expand Down Expand Up @@ -410,7 +414,9 @@ def get_field_names(
return fields_names

# -------------------------------------------------------------------------#
def add_tabular_scalars(self, tabular: np.ndarray, names: list[str] = None) -> None:
def add_tabular_scalars(
self, tabular: np.ndarray, names: Optional[list[str]] = None
) -> None:
"""Add tabular scalar data to the summary.

Args:
Expand Down Expand Up @@ -449,8 +455,8 @@ def add_tabular_scalars(self, tabular: np.ndarray, names: list[str] = None) -> N

def get_scalars_to_tabular(
self,
scalar_names: list[str] = None,
sample_ids: list[int] = None,
scalar_names: Optional[list[str]] = None,
sample_ids: Optional[list[int]] = None,
as_nparray=False,
) -> Union[dict[str, np.ndarray], np.ndarray]:
"""Return a dict containing scalar values as tabulars/arrays.
Expand Down Expand Up @@ -716,24 +722,24 @@ def save(self, fname: Union[str, Path]) -> None:
"""
fname = Path(fname)

# First : creates a directory <savedir> to save everything in an
# First : creates a directory <save_dir> to save everything in an
# arborescence on disk
savedir = fname.parent / f"tmpsavedir_{generate_random_ASCII()}"
if savedir.is_dir(): # pragma: no cover
save_dir = fname.parent / f"tmpsavedir_{generate_random_ASCII()}"
if save_dir.is_dir(): # pragma: no cover
raise ValueError(
f"temporary intermediate directory <{savedir}> already exits"
f"temporary intermediate directory <{save_dir}> already exits"
)
savedir.mkdir(parents=True)
save_dir.mkdir(parents=True)

self._save_to_dir_(savedir)
self._save_to_dir_(save_dir)

# Then : tar dir in file <fname>
# TODO: avoid using subprocess by using lib tarfile
ARGUMENTS = ["tar", "-cf", fname, "-C", savedir, "."]
ARGUMENTS = ["tar", "-cf", fname, "-C", save_dir, "."]
subprocess.call(ARGUMENTS)

# Finally : removes directory <savedir>
shutil.rmtree(savedir)
# Finally : removes directory <save_dir>
shutil.rmtree(save_dir)

@classmethod
def load_from_file(
Expand All @@ -758,7 +764,7 @@ def load_from_file(
def load_from_dir(
cls,
dname: Union[str, Path],
ids: list[int] = None,
ids: Optional[list[int]] = None,
verbose: bool = False,
processes_number: int = 0,
) -> Self:
Expand Down Expand Up @@ -819,21 +825,81 @@ def load(
shutil.rmtree(inputdir)

# -------------------------------------------------------------------------#
def _save_to_dir_(self, savedir: Union[str, Path], verbose: bool = False) -> None:
"""Saves the dataset into a created sample directory and creates an 'infos.yaml' file to store additional information about the dataset.
def add_to_dir(
self,
sample: Sample,
save_dir: Optional[Union[str, Path]] = None,
verbose: bool = False,
) -> None:
"""Add a sample to the dataset and save it to the specified directory.

Notes:
If `save_dir` is None, will look for `self.save_dir` which will be retrieved from last previous call to load or save.
`save_dir` given in argument will take precedence over `self.save_dir` and overwrite it.

Args:
sample (Sample): The sample to add.
save_dir (Union[str,Path], optional): The directory in which to save the sample. Defaults to None.
verbose (bool, optional): If True, will print additional information. Defaults to False.

Raises:
ValueError: If both self.save_dir and save_dir are None.
"""
if save_dir is not None:
save_dir = Path(save_dir)
self.save_dir = save_dir
else:
if not hasattr(self, "save_dir") or self.save_dir is None:
raise ValueError(
"self.save_dir and save_dir are None, we don't know where to save, specify one of them before"
)

# --- sample is not only saved to dir, but also added to the dataset
# self.add_sample(sample)
# --- if dataset already contains other Samples, they will all be saved to save_dir
# self._save_to_dir_(self.save_dir)

if not self.save_dir.is_dir():
self.save_dir.mkdir(parents=True)

if verbose:
print(f"Saving database to: {self.save_dir}")

samples_dir = self.save_dir / "samples"
if not samples_dir.is_dir():
samples_dir.mkdir(parents=True)

# find i_sample
# if there are already samples in the instance, we should not take an already existing id
# if there are already samples in the path, we should not take an already existing id
sample_ids_in_path = [
int(d.name.split("_")[-1])
for d in samples_dir.glob("sample_*")
if d.is_dir()
]
i_sample = max(sample_ids_in_path) + 1 if len(sample_ids_in_path) > 0 else 0
i_sample = max(len(self), i_sample)

sample_fname = samples_dir / f"sample_{i_sample:09d}"
sample.save(sample_fname)

def _save_to_dir_(self, save_dir: Union[str, Path], verbose: bool = False) -> None:
"""Saves the dataset into a sub-directory `samples` and creates an 'infos.yaml' file to store additional information about the dataset.

Args:
savedir (Union[str,Path]): The path in which to save the files.
save_dir (Union[str,Path]): The path in which to save the files.
verbose (bool, optional): Explicitly displays the operations performed. Defaults to False.
"""
savedir = Path(savedir)
if not (savedir.is_dir()):
savedir.mkdir(parents=True)
save_dir = Path(save_dir)
if not (save_dir.is_dir()):
save_dir.mkdir(parents=True)

self.save_dir = save_dir

if verbose: # pragma: no cover
print(f"Saving database to: {savedir}")
print(f"Saving database to: {save_dir}")

samples_dir = savedir / "samples"
samples_dir = save_dir / "samples"
if not (samples_dir.is_dir()):
samples_dir.mkdir(parents=True)

Expand All @@ -844,29 +910,29 @@ def _save_to_dir_(self, savedir: Union[str, Path], verbose: bool = False) -> Non

# ---# save infos
if len(self._infos) > 0:
infos_fname = savedir / "infos.yaml"
infos_fname = save_dir / "infos.yaml"
with open(infos_fname, "w") as file:
yaml.dump(self._infos, file, default_flow_style=False, sort_keys=False)

# #---# save stats
# stats_fname = savedir / 'stats.yaml'
# stats_fname = save_dir / 'stats.yaml'
# self._stats.save(stats_fname)

# #---# save flags
# flags_fname = savedir / 'flags.yaml'
# flags_fname = save_dir / 'flags.yaml'
# self._flags.save(flags_fname)

def _load_from_dir_(
self,
savedir: Union[str, Path],
ids: list[int] = None,
save_dir: Union[str, Path],
ids: Optional[list[int]] = None,
verbose: bool = False,
processes_number: int = 0,
) -> None:
"""Loads a dataset from a sample directory and retrieves additional information about the dataset from an 'infos.yaml' file, if available.

Args:
savedir (Union[str,Path]): The path from which to load files.
save_dir (Union[str,Path]): The path from which to load files.
ids (list, optional): The specific sample IDs to load from the dataset. Defaults to None.
verbose (bool, optional): Explicitly displays the operations performed. Defaults to False.
processes_number (int, optional): Number of processes used to load files (-1 to use all available ressources, 0 to disable multiprocessing). Defaults to 0.
Expand All @@ -876,20 +942,22 @@ def _load_from_dir_(
FileExistsError: Triggered if the provided path is a file instead of a directory.
ValueError: Triggered if the number of processes is < -1.
"""
savedir = Path(savedir)
if not savedir.is_dir():
save_dir = Path(save_dir)
if not save_dir.is_dir():
raise FileNotFoundError(
f'"{savedir}" is not a directory or does not exist. Abort'
f'"{save_dir}" is not a directory or does not exist. Abort'
)

if processes_number < -1:
raise ValueError("Number of processes cannot be < -1")

self.save_dir = save_dir

if verbose: # pragma: no cover
print(f"Reading database located at: {savedir}")
print(f"Reading database located at: {save_dir}")

sample_paths = sorted(
[path for path in (savedir / "samples").glob("sample_*") if path.is_dir()]
[path for path in (save_dir / "samples").glob("sample_*") if path.is_dir()]
)

if ids is not None:
Expand Down Expand Up @@ -951,7 +1019,7 @@ def update(self, *a):
self.set_sample(id, sample)
"""

infos_fname = savedir / "infos.yaml"
infos_fname = save_dir / "infos.yaml"
if infos_fname.is_file():
with open(infos_fname, "r") as file:
self._infos = yaml.safe_load(file)
Expand All @@ -967,7 +1035,7 @@ def _load_number_of_samples_(_savedir: Union[str, Path]) -> int: # pragma: no c
useful for determining the total number of samples in a dataset.

Args:
savedir (Union[str,Path]): The path to the directory where sample files are stored.
save_dir (Union[str,Path]): The path to the directory where sample files are stored.

Returns:
int: The number of sample files found in the specified directory.
Expand Down
34 changes: 34 additions & 0 deletions tests/containers/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,40 @@ def test_load_from_dir(self, dataset_with_samples, tmp_path):
loaded_dataset = Dataset.load_from_dir(dname)
assert len(loaded_dataset) == len(dataset_with_samples)

# -------------------------------------------------------------------------#
def test_add_to_dir_creates_and_saves(self, empty_dataset, sample, tmp_path):
# Should create the directory and save the sample
save_dir = tmp_path / "my_dataset_dir"
empty_dataset.add_to_dir(sample, save_dir)
samples_dir = save_dir / "samples"
# Check directory and sample folder exist
assert samples_dir.is_dir()
sample_dirs = list(samples_dir.glob("sample_*"))
assert len(sample_dirs) == 1
assert sample_dirs[0].is_dir()

def test_add_to_dir_uses_self_save_dir(self, empty_dataset, sample, tmp_path):
# Set save_dir by first call, then call without argument
save_dir = tmp_path / "dataset_dir2"
empty_dataset.add_to_dir(sample, save_dir)
# Add another sample without specifying save_dir
sample2 = Sample()
empty_dataset.add_to_dir(sample2)
samples_dir = save_dir / "samples"
sample_dirs = list(samples_dir.glob("sample_*"))
assert len(sample_dirs) == 2

def test_add_to_dir_raises_if_no_save_dir(self, empty_dataset, sample):
# Should raise ValueError if no save_dir is set
with pytest.raises(ValueError):
empty_dataset.add_to_dir(sample)

def test_add_to_dir_verbose(self, empty_dataset, sample, tmp_path, capsys):
save_dir = tmp_path / "dataset_verbose"
empty_dataset.add_to_dir(sample, save_dir, verbose=True)
captured = capsys.readouterr()
assert "Saving database to" in captured.out

# -------------------------------------------------------------------------#
def test__save_to_dir_(self, dataset_with_samples, tmp_path):
savedir = tmp_path / "testdir"
Expand Down
Loading