Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions servicex/dataset_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def hash(self):
sha = hashlib.sha256(str([self.dataset]).encode("utf-8"))
return sha.hexdigest()

def describe(self) -> str:
"""Return a human readable description of this dataset."""

return self.did


class RucioDatasetIdentifier(DataSetIdentifier):
def __init__(self, dataset: str, num_files: Optional[int] = None):
Expand Down Expand Up @@ -107,6 +112,17 @@ def populate_transform_request(self, transform_request: TransformRequest) -> Non
def did(self):
return None

def describe(self) -> str:
"""Return a human readable description of the configured file list."""

file_count: int = len(self.files)
file_word: str = "file" if file_count == 1 else "files"
preview_count: int = min(3, file_count)
preview: str = ", ".join(self.files[:preview_count])
suffix: str = ", ..." if file_count > preview_count else ""

return f"{file_count} {file_word}: {preview}{suffix}"

yaml_tag = "!FileList"

@classmethod
Expand Down
16 changes: 16 additions & 0 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def generate_selection_string(self) -> str:
raise RuntimeError("query string generator not set")
return self.query_string_generator.generate_selection_string()

def _describe_dataset(self) -> str:
"""Return a human-readable description of the dataset for error messages."""

return self.dataset_identifier.describe()

@property
def transform_request(self):
if not self.result_format:
Expand Down Expand Up @@ -355,6 +360,17 @@ def transform_complete(task: Task):
downloaded_files = []

download_result = await download_files_task
if (
not download_result
and self.current_status
and self.current_status.status == Status.complete
and self.current_status.files_completed == 0
and self.current_status.files_failed == 0
):
dataset_description = self._describe_dataset()
raise ServiceXException(
f"Dataset {dataset_description} does not exist or has zero files."
)
if signed_urls_only:
signed_urls = download_result
if cached_record:
Expand Down
56 changes: 49 additions & 7 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import datetime

from unittest.mock import AsyncMock, Mock, patch
from servicex.dataset_identifier import FileListDataset
from servicex.dataset_identifier import FileListDataset, RucioDatasetIdentifier
from servicex.configuration import Configuration
from servicex.minio_adapter import MinioAdapter
from servicex.query_core import Query
Expand Down Expand Up @@ -320,7 +320,7 @@ async def test_submit_and_download_cache_miss(python_dataset, completed_status):
python_dataset.servicex.get_transform_status.return_value = completed_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["1.parquet"]
python_dataset.cache.update_transform_request_id = Mock()

signed_urls_only = False
Expand Down Expand Up @@ -353,7 +353,7 @@ async def test_submit_and_download_cache_miss_overall_progress(
python_dataset.servicex.get_transform_status.return_value = completed_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["1.parquet"]
python_dataset.cache.update_transform_request_id = Mock()

signed_urls_only = False
Expand Down Expand Up @@ -390,7 +390,7 @@ async def test_submit_and_download_no_result_format(python_dataset, completed_st
python_dataset.servicex.get_transform_status.return_value = completed_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["1.parquet"]
signed_urls_only = False
expandable_progress = ExpandableProgress()
await python_dataset.submit_and_download(
Expand Down Expand Up @@ -422,7 +422,7 @@ async def test_submit_and_download_cache_miss_signed_urls_only(
python_dataset.servicex.get_transform_status.return_value = completed_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["signed://url"]
python_dataset.cache.update_transform_request_id = Mock()

signed_urls_only = True
Expand All @@ -436,6 +436,48 @@ async def test_submit_and_download_cache_miss_signed_urls_only(
cache.close()


@pytest.mark.asyncio
async def test_submit_and_download_raises_for_empty_results(
python_dataset, completed_status
):
with tempfile.TemporaryDirectory() as temp_dir:
python_dataset.dataset_identifier = RucioDatasetIdentifier(
"scope:missing_dataset"
)
python_dataset.current_status = None
python_dataset.servicex = AsyncMock()
config = Configuration(cache_path=temp_dir, api_endpoints=[])
cache = QueryCache(config)
python_dataset.cache = cache
python_dataset.configuration = config
python_dataset.cache.get_transform_by_hash = Mock(return_value=None)
python_dataset.servicex.get_transform_status = AsyncMock(id="12345")
complete_status = completed_status.model_copy(deep=True)
complete_status.status = Status.complete
complete_status.files = 0
complete_status.files_completed = 0
complete_status.files_failed = 0
python_dataset.servicex.get_transform_status.return_value = complete_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock(return_value=[])
python_dataset.cache.update_transform_request_id = Mock()

signed_urls_only = False
expandable_progress = ExpandableProgress()

with pytest.raises(
ServiceXException,
match=(
r"Dataset rucio://scope:missing_dataset "
r"does not exist or has zero files."
),
):
await python_dataset.submit_and_download(
signed_urls_only, expandable_progress
)
cache.close()


@pytest.mark.asyncio
async def test_submit_and_download_cache_files_request_urls(
python_dataset, transformed_result
Expand Down Expand Up @@ -525,7 +567,7 @@ async def test_network_loss(python_dataset, transformed_result):
python_dataset.servicex.get_transform_status.return_value = status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["1.parquet"]

signed_urls_only = False
expandable_progress = ExpandableProgress()
Expand Down Expand Up @@ -556,7 +598,7 @@ async def test_submit_and_download_get_request_id_from_previous_submitted_reques
python_dataset.servicex.get_transform_status.return_value = completed_status
python_dataset.servicex.submit_transform = AsyncMock()
python_dataset.download_files = AsyncMock()
python_dataset.download_files.return_value = []
python_dataset.download_files.return_value = ["signed://url"]
python_dataset.cache.is_transform_request_submitted = Mock(return_value=True)
python_dataset.cache.get_transform_request_id = Mock(
return_value="b8c508d0-ccf2-4deb-a1f7-65c839eebabf"
Expand Down
45 changes: 44 additions & 1 deletion tests/test_dataset_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from servicex.dataset_identifier import (
CERNOpenDataDatasetIdentifier,
DataSetIdentifier,
RucioDatasetIdentifier,
FileListDataset,
RucioDatasetIdentifier,
XRootDDatasetIdentifier,
)
import pytest

Expand All @@ -41,6 +43,12 @@ def test_did():
def test_rucio():
did = RucioDatasetIdentifier("abc:123-456")
assert did.did == "rucio://abc:123-456"
assert did.describe() == "rucio://abc:123-456"


def test_rucio_with_file_limit():
did = RucioDatasetIdentifier("abc:123-456", num_files=10)
assert did.describe() == "rucio://abc:123-456?files=10"


def test_rucio_no_namespace():
Expand All @@ -51,11 +59,26 @@ def test_rucio_no_namespace():
def test_file_list():
did = FileListDataset(["c:/foo.bar"])
assert did.files == ["c:/foo.bar"]
assert did.describe() == "1 file: c:/foo.bar"


def test_single_file():
did = FileListDataset("c:/foo.bar")
assert did.files == ["c:/foo.bar"]
assert did.describe() == "1 file: c:/foo.bar"


def test_file_list_multiple_preview():
did = FileListDataset(["file1.root", "file2.root", "file3.root"])

assert did.describe() == "3 files: file1.root, file2.root, file3.root"


def test_file_list_ellipsis_for_many_files():
files = ["file1.root", "file2.root", "file3.root", "file4.root"]
did = FileListDataset(files)

assert did.describe() == "4 files: file1.root, file2.root, file3.root, ..."


def test_populate_transform_request(transform_request):
Expand All @@ -66,3 +89,23 @@ def test_populate_transform_request(transform_request):
did2 = RucioDatasetIdentifier("abc:123-456")
did2.populate_transform_request(transform_request)
assert transform_request.did == "rucio://abc:123-456"


def test_cern_open_data_description():
did = CERNOpenDataDatasetIdentifier(12345)
assert did.describe() == "cernopendata://12345"


def test_cern_open_data_description_with_file_limit():
did = CERNOpenDataDatasetIdentifier(12345, num_files=2)
assert did.describe() == "cernopendata://12345?files=2"


def test_xrootd_description():
did = XRootDDatasetIdentifier("root://server//data/*.root")
assert did.describe() == "xrootd://root://server//data/*.root"


def test_xrootd_description_with_file_limit():
did = XRootDDatasetIdentifier("root://server//data/*.root", num_files=1)
assert did.describe() == "xrootd://root://server//data/*.root?files=1"
Loading