Skip to content

Commit d1f69d3

Browse files
committed
Merge branch 'tickets/DM-49204'
2 parents 76dc557 + 2a0cd19 commit d1f69d3

File tree

6 files changed

+50
-49
lines changed

6 files changed

+50
-49
lines changed

python/activator/activator.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def _config_from_yaml(yaml_string):
111111
112112
Returns
113113
-------
114-
config : `activator.config.PipelineConfig`
114+
config : `shared.config.PipelineConfig`
115115
The corresponding config object.
116116
"""
117117
return PipelinesConfig(yaml.safe_load(yaml_string))
@@ -186,47 +186,42 @@ def _get_local_cache():
186186
return make_local_cache()
187187

188188

189-
def _get_redis_streams_client():
190-
"""Setup of Redis Client.
189+
def _make_redis_streams_client():
190+
"""Create a new Redis client.
191191
192192
Returns
193193
-------
194-
redis_client : `Redis`
195-
Initialized redis client.
194+
redis_client : `redis.Redis`
195+
Initialized Redis client.
196196
"""
197197
redis_host = redis_stream_host
198198
redis_client = redis.Redis(host=redis_host)
199199
return redis_client
200200

201201

202-
def _calculate_time_since_last_message(fan_out_listen_start_time):
203-
"""Calculates time since last redis streams message poll
204-
received by this pod.
202+
def _time_diff(start_time):
203+
"""Calculates time since a reference timestamp.
205204
206205
Parameters
207206
----------
208-
fan_out_listen_start_time : `float`
209-
Time when listening for redis stream message started. The time is
210-
since Unix epoch.
207+
start_time : `float`
208+
Time since a reference point, in seconds since Unix epoch.
211209
212210
Returns
213211
-------
214-
fan_out_listen_time : `float`
215-
Time in seconds since last fan out message received by this pod.
212+
duration : `float`
213+
Time in seconds since ``start_time``.
216214
"""
217-
fan_out_listen_finish_time = time.time()
218-
fan_out_listen_time = fan_out_listen_finish_time - fan_out_listen_start_time
219-
return fan_out_listen_time
215+
return time.time() - start_time
220216

221217

222218
def _decode_redis_streams_message(fan_out_message):
223-
"""Decoded redis streams message from binary. Redis Streams
224-
returns a list of dicts.
219+
"""Decode redis streams message from binary.
225220
226221
Parameters
227222
----------
228-
fan_out_message : `dict`
229-
Fan out message.
223+
fan_out_message
224+
Fan out message, as a list of dicts.
230225
231226
Returns
232227
-------
@@ -246,13 +241,13 @@ def _decode_redis_streams_message(fan_out_message):
246241

247242
def _calculate_time_since_fan_out_message_delivered(redis_streams_message_id):
248243
"""Calculates time from fan out message to when message is unpacked
249-
in prompt processing. The redis stream message ID includes
250-
the timestamp in UTC suffixed with the message number.
244+
in prompt processing.
251245
252246
Parameters
253247
----------
254-
redis_streams_message_id : `string`
255-
Fan out message.
248+
redis_streams_message_id : `str`
249+
Fan out message ID. It includes the timestamp in milliseconds since
250+
Unix epoch suffixed with the message number.
256251
257252
Returns
258253
-------
@@ -261,8 +256,7 @@ def _calculate_time_since_fan_out_message_delivered(redis_streams_message_id):
261256
in prompt processing.
262257
"""
263258
message_timestamp = float(redis_streams_message_id.split('-', 1)[0].strip())
264-
fan_out_to_prompt_time = time.time() - message_timestamp/1000
265-
return fan_out_to_prompt_time
259+
return _time_diff(message_timestamp/1000.0)
266260

267261

268262
def create_app():
@@ -326,7 +320,7 @@ def keda_start():
326320

327321
# Setup redis client connection. Setup before while loop to avoid performance
328322
# issues of constantly resetting up client connection
329-
redis_client = _get_redis_streams_client()
323+
redis_client = _make_redis_streams_client()
330324
try:
331325
redis_client.ping()
332326
except redis.exceptions.RedisError:
@@ -355,6 +349,8 @@ def keda_start():
355349
groupname=redis_stream_consumer_group,
356350
count=1 # Read one message at a time
357351
)
352+
processing_start = time.time()
353+
processing_result = "Unknown"
358354

359355
if not fan_out_message:
360356
continue
@@ -389,8 +385,7 @@ def keda_start():
389385

390386
consumer_polls_with_message += 1
391387
if consumer_polls_with_message >= 1:
392-
fan_out_listen_time = _calculate_time_since_last_message(
393-
fan_out_listen_start_time)
388+
fan_out_listen_time = _time_diff(fan_out_listen_start_time)
394389
_log.debug(
395390
"Seconds since last redis streams message received %r for consumer poll %r",
396391
fan_out_listen_time, consumer_polls_with_message)
@@ -402,23 +397,28 @@ def keda_start():
402397

403398
# Process fan out visit
404399
process_visit(expected_visit)
400+
processing_result = "Success"
405401
except GracefulShutdownInterrupt:
406402
_log.error(
407403
"Service interrupted.Shutting down *without* syncing to the central repo.")
404+
processing_result = "Interrupted"
408405
sys.exit(1)
409406
except IgnorableVisit as e:
410407
_log.info("Skipping visit: %s", e)
408+
processing_result = "Ignore"
411409
except Exception:
412410
_log.exception("Processing failed:")
411+
processing_result = "Error"
413412
finally:
414-
_log.info(
415-
"Processing completed for %s. Starting next fan out event consumer poll",
416-
socket.gethostname())
413+
_log.debug("Request took %.3f s. Result: %s",
414+
_time_diff(processing_start), processing_result)
415+
_log.info("Processing completed for %s.", socket.gethostname())
417416

418417
# Reset timer for fan out message polling and start redis client for next poll
418+
_log.info("Starting next visit fan out event consumer poll")
419419
fan_out_listen_start_time = time.time()
420420
try:
421-
redis_client = _get_redis_streams_client()
421+
redis_client = _make_redis_streams_client()
422422
redis_client.ping()
423423
_log.info("Redis Streams client setup for continued polling")
424424
except Exception as e:
@@ -492,7 +492,7 @@ def parse_next_visit(http_request):
492492
493493
Returns
494494
-------
495-
next_visit : `activator.visit.FannedOutVisit`
495+
next_visit : `shared.visit.FannedOutVisit`
496496
The next_visit message contained in the request.
497497
498498
Raises
@@ -633,7 +633,7 @@ def process_visit(expected_visit: FannedOutVisit):
633633
634634
Parameters
635635
----------
636-
expected_visit : `activator.visit.FannedOutVisit`
636+
expected_visit : `shared.visit.FannedOutVisit`
637637
The visit to process.
638638
639639
Raises

python/activator/middleware_interface.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def make_local_cache():
208208
"uw_stars_20240524": refcat_factor * base_keep_limit,
209209
"uw_stars_20240228": refcat_factor * base_keep_limit,
210210
"uw_stars_20240130": refcat_factor * base_keep_limit,
211+
"cal_ref_cat_2_2": refcat_factor * base_keep_limit,
211212
"ps1_pv3_3pi_20170110": refcat_factor * base_keep_limit,
212213
"gaia_dr3_20230707": refcat_factor * base_keep_limit,
213214
"gaia_dr2_20200414": refcat_factor * base_keep_limit,
@@ -262,11 +263,11 @@ class MiddlewareInterface:
262263
image_bucket : `str`
263264
Storage bucket where images will be written to as they arrive.
264265
See also ``prefix``.
265-
visit : `activator.visit.FannedOutVisit`
266+
visit : `shared.visit.FannedOutVisit`
266267
The visit-detector combination to be processed by this object.
267-
pre_pipelines : `activator.config.PipelinesConfig`
268+
pre_pipelines : `shared.config.PipelinesConfig`
268269
Information about which pipelines to run before a visit arrives.
269-
main_pipelines : `activator.config.PipelinesConfig`
270+
main_pipelines : `shared.config.PipelinesConfig`
270271
Information about which pipelines to run on ``visit``'s raws.
271272
skymap : `str`
272273
Name of the skymap in the central repo for querying templates.

python/shared/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ def matches(self, visit: FannedOutVisit) -> bool:
286286
287287
Parameters
288288
----------
289-
visit : `activator.visit.FannedOutVisit`
289+
visit : `shared.visit.FannedOutVisit`
290290
The visit to test against this spec.
291291
292292
Returns
@@ -365,7 +365,7 @@ def get_pipeline_files(self, visit: FannedOutVisit) -> list[str]:
365365
366366
Parameters
367367
----------
368-
visit : `activator.visit.FannedOutVisit`
368+
visit : `shared.visit.FannedOutVisit`
369369
The visit for which a pipeline must be selected.
370370
371371
Returns

python/shared/raw.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def is_path_consistent(oid: str, visit: FannedOutVisit) -> bool:
147147
----------
148148
oid : `str`
149149
The object store path to the snap image.
150-
visit : `activator.visit.FannedOutVisit`
150+
visit : `shared.visit.FannedOutVisit`
151151
The visit from which snaps were expected.
152152
153153
Returns

python/tester/upload.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ def process_group(kafka_url, visit_infos, uploader):
7474
----------
7575
kafka_url : `str`
7676
The URL of the Kafka REST Proxy to send ``next_visit`` messages to.
77-
visit_infos : `set` [`activator.FannedOutVisit`]
77+
visit_infos : `set` [`shared.visit.FannedOutVisit`]
7878
The visit-detector combinations to be observed; each object may
7979
represent multiple snaps. Assumed to represent a single group, and to
8080
share instrument, nimages, filters, and survey.
81-
uploader : callable [`activator.FannedOutVisit`, int]
81+
uploader : callable [`shared.visit.FannedOutVisit`, int]
8282
A callable that takes an exposure spec and a snap ID, and uploads the
8383
visit's data.
8484
"""
@@ -158,12 +158,12 @@ def _add_to_raw_pool(raw_pool, snap_num, visit, blob):
158158
159159
Parameters
160160
----------
161-
raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]]
161+
raw_pool : mapping [`str`, mapping [`int`, mapping [`shared.visit.FannedOutVisit`, `s3.ObjectSummary`]]]
162162
Available raws as a mapping from group IDs to a mapping of snap ID.
163163
The value of the innermost mapping is the observation metadata for
164164
each detector, and a Blob representing the image taken in that
165165
detector-snap.
166-
visit : `activator.visit.FannedOutVisit`
166+
visit : `shared.visit.FannedOutVisit`
167167
The visit-detector combination to be added with this raw.
168168
snap_num : `int`
169169
The snap number for this raw.
@@ -200,7 +200,7 @@ def get_samples_non_lsst(bucket, instrument):
200200
201201
Returns
202202
-------
203-
raws : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]]
203+
raws : mapping [`str`, mapping [`int`, mapping [`shared.visit.FannedOutVisit`, `s3.ObjectSummary`]]]
204204
A mapping from group IDs to a mapping of snap ID. The value of the
205205
innermost mapping is the observation metadata for each detector,
206206
and a Blob representing the image taken in that detector-snap.
@@ -273,7 +273,7 @@ def get_samples_lsst(bucket, instrument):
273273
274274
Returns
275275
-------
276-
raws : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]]
276+
raws : mapping [`str`, mapping [`int`, mapping [`shared.visit.FannedOutVisit`, `s3.ObjectSummary`]]]
277277
A mapping from group IDs to a mapping of snap ID. The value of the
278278
innermost mapping is the observation metadata for each detector,
279279
and a Blob representing the image taken in that detector-snap.
@@ -343,7 +343,7 @@ def upload_from_raws(kafka_url, instrument, raw_pool, src_bucket, dest_bucket, n
343343
The URL of the Kafka REST Proxy to send ``next_visit`` messages to.
344344
instrument : `str`
345345
The short name of the instrument carrying out the observation.
346-
raw_pool : mapping [`str`, mapping [`int`, mapping [`activator.FannedOutVisit`, `s3.ObjectSummary`]]]
346+
raw_pool : mapping [`str`, mapping [`int`, mapping [`shared.visit.FannedOutVisit`, `s3.ObjectSummary`]]]
347347
Available raws as a mapping from group IDs to a mapping of snap ID.
348348
The value of the innermost mapping is the observation metadata for
349349
each detector, and a Blob representing the image taken in that

python/tester/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def send_next_visit(url, group, visit_infos):
319319
The URL of the Kafka REST Proxy to send ``next_visit`` messages to.
320320
group : `str`
321321
The group ID for the message to send.
322-
visit_infos : `set` [`activator.SummitVisit`]
322+
visit_infos : `set` [`shared.visit.SummitVisit`]
323323
The ``next_visit`` message to be sent; each object may
324324
represent multiple snaps.
325325
"""

0 commit comments

Comments
 (0)