Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions cads_adaptors/adaptors/cadsobs/char_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Tuple

import h5netcdf
import numpy
from fsspec.implementations.http import HTTPFileSystem

from cads_adaptors.adaptors.cadsobs.codes import get_code_mapping
from cads_adaptors.adaptors.cadsobs.utils import get_url_ncobj


def handle_string_dims(
char_sizes: dict[str, int],
chunksize: Tuple[int, ...],
dimensions: Tuple[str, ...],
ivar: str,
oncobj: h5netcdf.File,
) -> Tuple[Tuple[int, ...], Tuple[str, ...]]:
"""Add dimensions for character variables."""
ivar_str_dim = ivar + "_stringdim"
ivar_str_dim_size = char_sizes[ivar]
if ivar_str_dim not in oncobj.dimensions:
oncobj.dimensions[ivar_str_dim] = ivar_str_dim_size
dimensions += (ivar_str_dim,)
chunksize += (ivar_str_dim_size,)
return chunksize, dimensions


def get_char_sizes(fs: HTTPFileSystem, object_urls: list[str]) -> dict[str, int]:
"""
Iterate over the input files to get the size of the string variables.

We need to know this beforehand so we can stream to the output file.
"""
char_sizes = {}
for url in object_urls:
with get_url_ncobj(fs, url) as incobj:
for var, varobj in incobj.items():
if varobj.dtype.kind == "S":
char_size = varobj.shape[1]
else:
continue
if var not in char_sizes:
char_sizes[var] = char_size
else:
char_sizes[var] = max(char_sizes[var], char_size)

return char_sizes


def concat_str_array(iarray: numpy.ndarray) -> numpy.ndarray:
"""Concatenate an array of strings to get a 1D array."""
field_len, strlen = iarray.shape
return iarray.view(f"S{strlen}").reshape(field_len)


def dump_char_variable(
current_size: int,
incobj: h5netcdf.File,
ivar: str,
ivarobj: h5netcdf.Variable,
mask: numpy.typing.NDArray,
new_size: int,
ovar: h5netcdf.Variable,
download_all_chunk: bool,
):
if ivar != "observed_variable":
actual_str_dim_size = ivarobj.shape[-1]
if download_all_chunk:
data = ivarobj[:, 0:actual_str_dim_size][mask, :]
else:
data = ivarobj[mask, 0:actual_str_dim_size]
ovar[current_size:new_size, 0:actual_str_dim_size] = data
else:
# For observed variable, we use the attributes to decode the integers.
if download_all_chunk:
data = ivarobj[:][mask]
else:
data = ivarobj[mask]
code2var = get_code_mapping(incobj, inverse=True)
codes_in_data, inverse = numpy.unique(data, return_inverse=True)
variables_in_data = numpy.array(
[code2var[c].encode("utf-8") for c in codes_in_data]
)
data_decoded = variables_in_data[inverse]
data_decoded = data_decoded.view("S1").reshape(data.size, -1)
actual_str_dim_size = data_decoded.shape[-1]
ovar[current_size:new_size, 0:actual_str_dim_size] = data_decoded
37 changes: 37 additions & 0 deletions cads_adaptors/adaptors/cadsobs/codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import h5netcdf
import numpy
import xarray

from cads_adaptors.exceptions import CadsObsRuntimeError


def get_code_mapping(
incobj: h5netcdf.File | xarray.Dataset, inverse: bool = False
) -> dict:
import h5netcdf

if isinstance(incobj, h5netcdf.File):
attrs = incobj.variables["observed_variable"].attrs
elif isinstance(incobj, xarray.Dataset):
attrs = incobj["observed_variable"].attrs
else:
raise CadsObsRuntimeError("Unsupported input type")
# Take into account that if there is only one value, these attrs are not iterable.
if isinstance(attrs["codes"], numpy.ndarray):
labels, codes = attrs["labels"], attrs["codes"]
else:
labels = numpy.array(
[
attrs["labels"],
]
)
codes = numpy.array(
[
attrs["codes"],
]
)
if inverse:
mapping = {c: v for v, c in zip(labels, codes)}
else:
mapping = {v: c for v, c in zip(labels, codes)}
return mapping
3 changes: 3 additions & 0 deletions cads_adaptors/adaptors/cadsobs/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MAX_NUMBER_OF_GROUPS = 10
TIME_UNITS_REFERENCE_DATE = "1900-01-01 00:00:00"
SPATIAL_COORDINATES = ["latitude", "longitude"]
9 changes: 2 additions & 7 deletions cads_adaptors/adaptors/cadsobs/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
import xarray

from cads_adaptors.adaptors.cadsobs.models import RetrieveArgs
from cads_adaptors.adaptors.cadsobs.utils import _get_output_path
from cads_adaptors.tools.general import ensure_list

logger = logging.getLogger(__name__)


def to_csv(
output_dir: Path, output_path_netcdf: Path, retrieve_args: RetrieveArgs
output_path: Path, output_path_netcdf: Path, retrieve_args: RetrieveArgs
) -> Path:
"""Transform the output netCDF to CSV format."""
output_path = _get_output_path(output_dir, retrieve_args.dataset, "csv")
# Beware xarray will silently ignore the chunk size if the dimension does not exist
cdm_lite_dataset = xarray.open_dataset(
output_path_netcdf, chunks=dict(index=50000), decode_times=True
Expand Down Expand Up @@ -101,11 +99,8 @@ def get_csv_header(
return header


def to_zip(input_file_path: Path) -> Path:
def to_zip(input_file_path: Path, output_zip_path: Path) -> Path:
"""Zips the given file into a .zip archive."""
# Determine output zip path
output_zip_path = input_file_path.with_suffix(".zip")

# Create zip archive
with zipfile.ZipFile(output_zip_path, "w") as zipf:
zipf.write(input_file_path, arcname=input_file_path.name)
Expand Down
Loading