Skip to content

Commit dbc4c81

Browse files
Add models and routes for reporting dataset lookup errors (#1134)
* Add models and routes for reporting dataset lookup errors --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3dd7020 commit dbc4c81

File tree

4 files changed

+269
-0
lines changed

4 files changed

+269
-0
lines changed

servicex_app/servicex_app/models.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ class TransformStatus(Enum):
157157
complete = ("Complete", True)
158158
fatal = ("Fatal", True)
159159
canceled = ("Canceled", True)
160+
bad_dataset = ("Bad Dataset", True)
160161

161162
def __init__(self, string_name, is_complete):
162163
self.string_name = string_name
@@ -404,6 +405,9 @@ class DatasetStatus(str, Enum):
404405
created = "created"
405406
looking = "looking"
406407
complete = "complete"
408+
does_not_exist = "does_not_exist"
409+
bad_name = "bad_name"
410+
internal_failure = "internal_failure"
407411

408412

409413
class Dataset(db.Model):
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Copyright (c) 2025, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
from flask import request, current_app
29+
30+
from servicex_app.models import (
31+
Dataset,
32+
db,
33+
TransformRequest,
34+
TransformStatus,
35+
DatasetStatus,
36+
)
37+
from servicex_app.resources.servicex_resource import ServiceXResource
38+
39+
from datetime import datetime, timezone
40+
41+
42+
class FilesetError(ServiceXResource):
43+
@classmethod
44+
def make_api(cls, lookup_result_processor, transformer_manager):
45+
cls.lookup_result_processor = lookup_result_processor
46+
cls.transformer_manager = transformer_manager
47+
return cls
48+
49+
def put(self, dataset_id):
50+
summary = request.get_json()
51+
dataset = Dataset.find_by_id(int(dataset_id))
52+
53+
if dataset is None:
54+
current_app.logger.info(
55+
"Dataset lookup error received for unknown dataset",
56+
extra={
57+
"dataset_id": dataset_id,
58+
"elapsed-time": summary["elapsed-time"],
59+
"error-type": summary["error-type"],
60+
"_message": summary["message"],
61+
},
62+
)
63+
return "", 422
64+
65+
current_app.logger.info(
66+
"Error in file lookup",
67+
extra={
68+
"dataset_id": dataset_id,
69+
"elapsed-time": summary["elapsed-time"],
70+
"error-type": summary["error-type"],
71+
"_message": summary["message"],
72+
},
73+
)
74+
75+
dataset.lookup_status = DatasetStatus(summary["error-type"])
76+
dataset.stale = True # Repeat lookup if we try again
77+
db.session.commit()
78+
79+
# shut down related transformations. Nothing good can come of letting them
80+
# continue to run
81+
namespace = current_app.config["TRANSFORMER_NAMESPACE"]
82+
for running_request in TransformRequest.lookup_running_by_dataset_id(
83+
int(dataset_id)
84+
):
85+
running_request.status = TransformStatus.bad_dataset
86+
running_request.finish_time = datetime.now(tz=timezone.utc)
87+
self.transformer_manager.shutdown_transformer_job(
88+
running_request.request_id, namespace
89+
)
90+
current_app.logger.info(
91+
"Shutting down transformer because of dataset lookup problem",
92+
extra={
93+
"dataset_id": dataset_id,
94+
"elapsed-time": summary["elapsed-time"],
95+
"error-type": summary["error-type"],
96+
"_message": summary["message"],
97+
"requestId": running_request.request_id,
98+
},
99+
)
100+
101+
# Tell any other transform that was waiting for the lookup to complete
102+
# not to expect to run
103+
for pending_transform in TransformRequest.lookup_pending_on_dataset(
104+
int(dataset_id)
105+
):
106+
pending_transform.status = TransformStatus.bad_dataset
107+
pending_transform.finish_time = datetime.now(tz=timezone.utc)
108+
current_app.logger.info(
109+
"Shutting down transformer because of dataset lookup problem",
110+
extra={
111+
"dataset_id": dataset_id,
112+
"elapsed-time": summary["elapsed-time"],
113+
"error-type": summary["error-type"],
114+
"_message": summary["message"],
115+
"requestId": pending_transform.request_id,
116+
},
117+
)
118+
119+
db.session.commit()

servicex_app/servicex_app/routes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def add_routes(
4949

5050
from servicex_app.resources.internal.add_file_to_dataset import AddFileToDataset
5151
from servicex_app.resources.internal.fileset_complete import FilesetComplete
52+
from servicex_app.resources.internal.fileset_error import FilesetError
5253
from servicex_app.resources.internal.transform_status import (
5354
TransformationStatusInternal,
5455
)
@@ -183,6 +184,12 @@ def add_routes(
183184
"/servicex/internal/transformation/<string:dataset_id>/complete",
184185
)
185186

187+
FilesetError.make_api(lookup_result_processor, transformer_manager)
188+
api.add_resource(
189+
FilesetError,
190+
"/servicex/internal/transformation/<string:dataset_id>/error",
191+
)
192+
186193
TransformerFileComplete.make_api(transformer_manager)
187194
api.add_resource(
188195
TransformerFileComplete,
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# Copyright (c) 2019, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
from datetime import timezone, datetime
29+
30+
from servicex_app import LookupResultProcessor, TransformerManager
31+
from servicex_app_test.resource_test_base import ResourceTestBase
32+
33+
from servicex_app.models import (
34+
DatasetStatus,
35+
Dataset,
36+
TransformRequest,
37+
TransformStatus,
38+
)
39+
from pytest import fixture, mark
40+
41+
42+
class TestFilesetError(ResourceTestBase):
43+
@fixture
44+
def mock_find_dataset_by_id(self, mocker):
45+
dm = mocker.Mock()
46+
dm.dataset = Dataset(
47+
name="rucio://my-did?files=1",
48+
did_finder="rucio",
49+
lookup_status=DatasetStatus.looking,
50+
last_used=datetime.now(tz=timezone.utc),
51+
last_updated=datetime.fromtimestamp(0),
52+
)
53+
54+
dm.name = "rucio://my-did?files=1"
55+
dm.id = 42
56+
57+
mock_find_by_id = mocker.patch.object(Dataset, "find_by_id", return_value=dm)
58+
return mock_find_by_id
59+
60+
@mark.parametrize("error", ["does_not_exist", "bad_name", "internal_failure"])
61+
def test_put_fileset_error(self, mocker, mock_find_dataset_by_id, error):
62+
dataset = mock_find_dataset_by_id.return_value
63+
64+
pending_request = TransformRequest()
65+
pending_request.status = TransformStatus.pending_lookup
66+
mock_lookup_pending = mocker.patch.object(
67+
TransformRequest,
68+
"lookup_pending_on_dataset",
69+
return_value=[pending_request],
70+
)
71+
72+
lookup_request = TransformRequest()
73+
lookup_request.status = TransformStatus.lookup
74+
mock_lookup_running = mocker.patch.object(
75+
TransformRequest,
76+
"lookup_running_by_dataset_id",
77+
return_value=[lookup_request],
78+
)
79+
mock_processor = mocker.MagicMock(LookupResultProcessor)
80+
mock_transformer_manager = mocker.MagicMock(TransformerManager)
81+
mock_transformer_manager.shutdown_transformer_job = mocker.Mock()
82+
83+
client = self._test_client(
84+
lookup_result_processor=mock_processor,
85+
transformation_manager=mock_transformer_manager,
86+
)
87+
88+
response = client.put(
89+
"/servicex/internal/transformation/1234/error",
90+
json={"elapsed-time": 0, "error-type": error, "message": "honk"},
91+
)
92+
assert response.status_code == 200
93+
mock_find_dataset_by_id.assert_called_once_with(1234)
94+
assert dataset.lookup_status == DatasetStatus(error)
95+
assert dataset.stale
96+
97+
mock_lookup_pending.assert_called_once_with(1234)
98+
mock_lookup_running.assert_called_once_with(1234)
99+
assert pending_request.status == TransformStatus.bad_dataset
100+
assert lookup_request.status == TransformStatus.bad_dataset
101+
102+
def test_put_fileset_error_invalid_did(self, mocker):
103+
pending_request = TransformRequest()
104+
pending_request.status = TransformStatus.pending_lookup
105+
mock_lookup_pending = mocker.patch.object(
106+
TransformRequest,
107+
"lookup_pending_on_dataset",
108+
return_value=[pending_request],
109+
)
110+
111+
lookup_request = TransformRequest()
112+
lookup_request.status = TransformStatus.lookup
113+
mock_lookup_running = mocker.patch.object(
114+
TransformRequest,
115+
"lookup_running_by_dataset_id",
116+
return_value=[lookup_request],
117+
)
118+
119+
mock_find_dataset_by_id = mocker.patch.object(
120+
Dataset, "find_by_id", return_value=None
121+
)
122+
mock_processor = mocker.MagicMock(LookupResultProcessor)
123+
mock_transformer_manager = mocker.MagicMock(TransformerManager)
124+
mock_transformer_manager.shutdown_transformer_job = mocker.Mock()
125+
126+
client = self._test_client(
127+
lookup_result_processor=mock_processor,
128+
transformation_manager=mock_transformer_manager,
129+
)
130+
131+
response = client.put(
132+
"/servicex/internal/transformation/1234/error",
133+
json={"elapsed-time": 0, "error-type": "bad_dataset", "message": "honk"},
134+
)
135+
assert response.status_code == 422
136+
mock_find_dataset_by_id.assert_called_once_with(1234)
137+
138+
mock_lookup_pending.assert_not_called()
139+
mock_lookup_running.assert_not_called()

0 commit comments

Comments
 (0)