Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions servicex_analysis_utils/file_peeking.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import awkward as ak
import json
import logging
from servicex.dataset_identifier import DataSetIdentifier


def run_query(input_filenames):
Expand Down Expand Up @@ -85,8 +86,8 @@ def build_deliver_spec(datasets):
Supports multiple inputs for multiple sample queries.

Parameters:
datasets (str, [str], or dict): Rucio DIDs to be checked by the servicex workers.
If dict, custom names can be inputed
datasets (str, [str], dict, DataSetIdentifier): Rucio DIDs (str) or DataSetIdentifier object.
If dict, custom names can be inputed for each dataset

Returns:
spec_python (dict): The specification for the python function query containing Name, Query, Dataset, NFiles
Expand All @@ -98,23 +99,29 @@ def build_deliver_spec(datasets):
dataset_dict = {}
user_in = type(datasets)

# Rucio DID as str
if user_in == str:
dataset_dict.update({datasets: datasets})
dataset_dict.update({datasets: datasets}) # Use dataset ID as sample name
elif user_in == list and type(datasets[0]) is str:
for ds in datasets:
dataset_dict.update({ds: ds})
elif user_in == dict:
elif user_in == dict: # Custom sample names
dataset_dict = datasets
# Single DataSetIdentifier object
elif isinstance(datasets, DataSetIdentifier):
dataset_dict.update({"Dataset": datasets})
else:
raise ValueError(
f"Unsupported dataset input type: {user_in}.\nInput must be dict ('sample_name':'dataset_id'), str or list of str"
f"Unsupported dataset input type: {user_in}.\n"
"Input must be str or list of str of Rucio DIDs, "
"a DataSetIdentifier object or a dict ('sample_name':'dataset_id')"
)

sample_list = [
{
"NFiles": 1,
"Name": name,
"Dataset": dataset.Rucio(did),
"Dataset": dataset.Rucio(did) if isinstance(did, str) else did,
"Query": query_PythonFunction,
}
for name, did in dataset_dict.items()
Expand Down
68 changes: 61 additions & 7 deletions tests/test_file_peeking.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@
import re
import filecmp
from servicex_analysis_utils import file_peeking
from servicex import dataset
from servicex.python_dataset import PythonFunction
from servicex.dataset_identifier import (
RucioDatasetIdentifier,
FileListDataset,
CERNOpenDataDatasetIdentifier,
XRootDDatasetIdentifier,
)


@pytest.fixture
Expand Down Expand Up @@ -146,14 +154,10 @@ def test_spec_builder():
assert isinstance(first_entry["NFiles"], int), "'NFiles' should be an integer"
assert isinstance(first_entry["Name"], str), "'Name' should be a string"

from servicex.dataset_identifier import RucioDatasetIdentifier

assert isinstance(
first_entry["Dataset"], RucioDatasetIdentifier
), "'Dataset' should be a RucioDatasetIdentifier"

from servicex.python_dataset import PythonFunction

assert isinstance(
first_entry["Query"], PythonFunction
), "'Query' should be a PythonFunction"
Expand All @@ -173,15 +177,65 @@ def test_spec_builder():

# wrong input type
wrong_did = 1234
expected_msg = (
f"Unsupported dataset input type: {type(wrong_did)}.\n"
"Input must be str or list of str of Rucio DIDs, "
"a DataSetIdentifier object or a dict "
"('sample_name':'dataset_id')"
)

with pytest.raises(
ValueError,
match=re.escape(
f"Unsupported dataset input type: {type(wrong_did)}.\nInput must be dict ('sample_name':'dataset_id'), str or list of str"
),
match=re.escape(expected_msg),
):
file_peeking.build_deliver_spec(wrong_did)


def test_spec_builder_with_dataset_identifier():
# Build multiple types of dataset identifiers
ds1 = dataset.Rucio("random_space:did")
ds2 = dataset.XRootD("root://server/file.root")
ds3 = dataset.CERNOpenData("cernopendata:12345")
ds4 = dataset.FileList(["file1.root", "file2.root"])

ds_list = [ds1, ds2, ds3, ds4]
ds_types = [
RucioDatasetIdentifier,
XRootDDatasetIdentifier,
CERNOpenDataDatasetIdentifier,
FileListDataset,
]
for did, did_type in zip(ds_list, ds_types):
spec = file_peeking.build_deliver_spec(did)

# Check return type
assert isinstance(spec, dict), "build_deliver_spec does not return a dict"
assert "Sample" in spec, "Key 'Sample' is missing in the returned dict"
assert isinstance(spec["Sample"], list), "'Sample' should be a list"

# Get return size
size = len(spec["Sample"])
assert (
size == 1
), f"Only one did given but sample item of spec is not len 1: {size}"

# Check first sample
first_entry = spec["Sample"][0]
assert isinstance(first_entry, dict), "Each entry in 'Sample' should be a dict"

# Check each key type
assert isinstance(first_entry["NFiles"], int), "'NFiles' should be an integer"
assert isinstance(first_entry["Name"], str), "'Name' should be a string"

assert isinstance(
first_entry["Query"], PythonFunction
), "'Query' should be a PythonFunction"

assert isinstance(
first_entry["Dataset"], did_type
), f"Input Dataset identifier {did} should be a {did_type} but is {type(first_entry['Dataset'])}"


def test_decoding_to_array(build_test_samples, array_out=True):
path = build_test_samples
query_output = file_peeking.run_query(path)
Expand Down