Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = []
__all__ = ["time_since", "is_processable", "process_visit"]

import collections.abc
import contextlib
Expand Down Expand Up @@ -47,7 +47,7 @@
get_group_id_from_oid,
)
from shared.visit import FannedOutVisit
from .exception import GracefulShutdownInterrupt, IgnorableVisit, \
from .exception import GracefulShutdownInterrupt, TimeoutInterrupt, IgnorableVisit, \
NonRetriableError, RetriableError
from .middleware_interface import get_central_butler, \
make_local_repo, make_local_cache, MiddlewareInterface, ButlerWriter, DirectButlerWriter
Expand All @@ -59,6 +59,8 @@
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# The skymap to use in the central repo
skymap = os.environ["SKYMAP"]
# The maximum total time a worker can spend processing one visit.
global_timeout = int(os.environ["WORKER_TIMEOUT"])
# URI to the main repository to contain processing results
write_repo = os.environ["CENTRAL_REPO"]
# URI to the main repository containing calibs and templates
Expand Down Expand Up @@ -279,6 +281,29 @@ def _graceful_shutdown(signum: int, stack_frame):
raise GracefulShutdownInterrupt(f"Received signal {signame}.")


def _handle_timeout(signum: int, stack_frame):
"""Signal handler for externally enforced timeouts.

Parameters
----------
signum : `int`
The signal received.
stack_frame : `frame` or `None`
The "current" stack frame.

Raises
------
activator.exception.TimeoutInterrupt
Raised unconditionally.
"""
signame = signal.Signals(signum).name
_log.info("Signal %s detected, aborting processing.", signame)
# Raising in signal handlers is dangerous, but since one of the cases for a
# timeout is the program getting stuck in I/O code, we can't rely on any
# sort of polling.
raise TimeoutInterrupt("Processing timed out and had to be aborted.")


def with_signal(signum: int,
handler: collections.abc.Callable | signal.Handlers,
) -> collections.abc.Callable:
Expand Down Expand Up @@ -490,15 +515,10 @@ def _try_export(mwi: MiddlewareInterface, exposures: set[int], log: logging.Logg
return False


@with_signal(signal.SIGHUP, _graceful_shutdown)
@with_signal(signal.SIGTERM, _graceful_shutdown)
@with_signal(signal.SIGALRM, _handle_timeout)
def process_visit(expected_visit: FannedOutVisit):
"""Prepare and run a pipeline on a nextVisit message.

This function should not make any assumptions about the execution framework
for the Prompt Processing system; in particular, it should not assume it is
running on a web server.

Parameters
----------
expected_visit : `shared.visit.FannedOutVisit`
Expand All @@ -512,6 +532,12 @@ def process_visit(expected_visit: FannedOutVisit):
`~activator.exception.NonRetriableError` or
`~activator.exception.RetriableError`, depending on the program state
at the time.
activator.exception.TimeoutInterrupt
Raised if the process timed out at an unexpected point. Terminations
during preprocessing or processing are chained by
`~activator.exception.NonRetriableError` or
`~activator.exception.RetriableError`, depending on the program state
at the time.
activator.exception.IgnorableVisit
Raised if the service is configured to not process ``expected_visit``.
activator.exception.InvalidVisitError
Expand All @@ -527,6 +553,35 @@ def process_visit(expected_visit: FannedOutVisit):
exception is always chained to another exception representing the
original error.
"""
prev = signal.alarm(global_timeout)
if prev:
_log.warning("Previously scheduled alarm for %s seconds overwritten.", prev)
try:
_process_visit_or_cancel(expected_visit)
finally:
# Don't let the alarm trip while waiting for visits
signal.alarm(0)


@with_signal(signal.SIGHUP, _graceful_shutdown)
@with_signal(signal.SIGTERM, _graceful_shutdown)
def _process_visit_or_cancel(expected_visit: FannedOutVisit):
"""Implementation of preparing and running a pipeline on a nextVisit
message.

This function should not make any assumptions about the execution framework
for the Prompt Processing system; in particular, it should not assume it is
running on a web server.

Parameters
----------
expected_visit : `shared.visit.FannedOutVisit`
The visit to process.

Raises
------
As for `process_visit`
"""
with contextlib.ExitStack() as cleanups:
consumer = _get_notification_consumer()
consumer.subscribe([raw_bucket_topic])
Expand Down
5 changes: 2 additions & 3 deletions python/activator/driver_gunicorn.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ def create_app():
app.register_error_handler(500, server_error)
_log.info("Worker ready to handle requests.")
return app
except Exception as e:
_log.critical("Failed to start worker; aborting.")
_log.exception(e)
except Exception:
_log.critical("Failed to start worker; aborting.", exc_info=True)
# gunicorn assumes exit code 3 means "Worker failed to boot", though this is not documented
sys.exit(3)

Expand Down
15 changes: 7 additions & 8 deletions python/activator/driver_keda.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from shared.run_utils import get_day_obs
from shared.visit import FannedOutVisit
from .activator import time_since, is_processable, process_visit
from .exception import GracefulShutdownInterrupt, IgnorableVisit, \
from .exception import GracefulShutdownInterrupt, TimeoutInterrupt, IgnorableVisit, \
NonRetriableError, RetriableError
from .repo_tracker import LocalRepoTracker
from .setup import ServiceSetup
Expand Down Expand Up @@ -297,9 +297,8 @@ def keda_start():

_log.info("Worker ready to handle requests.")

except Exception as e:
_log.critical("Failed to start worker; aborting.")
_log.exception(e)
except Exception:
_log.critical("Failed to start worker; aborting.", exc_info=True)
sys.exit(1)

fan_out_listen_start_time = time.time()
Expand All @@ -311,6 +310,7 @@ def keda_start():
and (time_since(fan_out_listen_start_time) < fanned_out_msg_listen_timeout):

# Ensure consistent day_obs for whole run, even if it crosses the boundary
# This is needed for Grafana dashboards that compile statistics by day_obs
with logging_context(day_obs=get_day_obs(astropy.time.Time.now())):
try:
redis_streams_message_id, fan_out_visit_decoded = redis_session.read_message()
Expand All @@ -335,9 +335,8 @@ def keda_start():
_log.debug("Seconds since fan out message delivered %r", fan_out_to_prompt_time)

# TODO Review Redis Errors and determine what should be retriable.
except redis.exceptions.RedisError as e:
_log.critical("Redis Streams error; aborting.")
_log.exception(e)
except redis.exceptions.RedisError:
_log.critical("Redis Streams error; aborting.", exc_info=True)
sys.exit(1)
except ValueError as e:
_log.error("Invalid redis stream message %s", e)
Expand Down Expand Up @@ -408,7 +407,7 @@ def handle_keda_visit(visit):
else:
_log.exception("Processing failed:")
processing_result = "Error"
except Exception:
except (Exception, TimeoutInterrupt):
_log.exception("Processing failed:")
processing_result = "Error"
finally:
Expand Down
12 changes: 11 additions & 1 deletion python/activator/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt",
__all__ = ["NonRetriableError", "RetriableError", "GracefulShutdownInterrupt", "TimeoutInterrupt",
"InvalidVisitError", "IgnorableVisit",
"InvalidPipelineError", "NoGoodPipelinesError",
"PipelinePreExecutionError", "PipelineExecutionError",
Expand Down Expand Up @@ -89,6 +89,16 @@ class GracefulShutdownInterrupt(BaseException):
"""


class TimeoutInterrupt(BaseException):
"""An interrupt indicating that an operation took too long and should be
aborted.

Like all interrupts, ``TimeoutInterrupt`` can be raised between *any* two
bytecode instructions, and handling it requires special care. See
`the Python docs <https://docs.python.org/3.11/library/signal.html#handlers-and-exceptions>`__.
"""


class InvalidVisitError(ValueError):
"""An exception raised if a visit object has invalid or inappropriate
fields.
Expand Down
22 changes: 7 additions & 15 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import shared.run_utils as runs
from shared.visit import FannedOutVisit
from .caching import DatasetCache
from .exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError, \
from .exception import GracefulShutdownInterrupt, TimeoutInterrupt, NonRetriableError, RetriableError, \
InvalidPipelineError, NoGoodPipelinesError, PipelinePreExecutionError, PipelineExecutionError
from .timer import enforce_schema, time_this_to_bundle

Expand Down Expand Up @@ -1540,30 +1540,22 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
data_ids=where,
label="main",
)
except GracefulShutdownInterrupt as e:
try:
state_changed = self._check_permanent_changes(where)
except Exception:
# Failure in registry or APDB queries
_log.exception("Could not determine APDB state, assuming modified.")
raise NonRetriableError("APDB potentially modified") from e
else:
if state_changed:
raise NonRetriableError("APDB modified") from e
else:
raise RetriableError("External interrupt") from e
# Catch Exception just in case there's a surprise -- raising
# NonRetriableError on *all* irrevocable changes is important.
except Exception as e:
except (Exception, GracefulShutdownInterrupt, TimeoutInterrupt) as e:
try:
state_changed = self._check_permanent_changes(where)
except (Exception, GracefulShutdownInterrupt):
except (Exception, GracefulShutdownInterrupt, TimeoutInterrupt):
# Failure in registry or APDB queries
_log.exception("Could not determine APDB state, assuming modified.")
raise NonRetriableError("APDB potentially modified") from e
else:
if state_changed:
raise NonRetriableError("APDB modified") from e
elif isinstance(e, GracefulShutdownInterrupt):
raise RetriableError("External interrupt") from e
elif isinstance(e, TimeoutInterrupt):
raise RetriableError("Processing timed out, assuming transient problem.") from e
else:
raise

Expand Down
1 change: 1 addition & 0 deletions tests/test_activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# Mandatory envvars are loaded at import time
os.environ["RUBIN_INSTRUMENT"] = "LSSTCam"
os.environ["SKYMAP"] = "lsst_cells_v1"
os.environ["WORKER_TIMEOUT"] = "60"
os.environ["CENTRAL_REPO"] = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data", "central_repo")
os.environ["S3_ENDPOINT_URL"] = "https://this.is.a.test"
os.environ["IMAGE_BUCKET"] = "test-bucket-test"
Expand Down
20 changes: 19 additions & 1 deletion tests/test_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

import unittest

from activator.exception import GracefulShutdownInterrupt, NonRetriableError, RetriableError
from activator.exception import GracefulShutdownInterrupt, TimeoutInterrupt, \
NonRetriableError, RetriableError


class NonRetriableErrorTest(unittest.TestCase):
Expand Down Expand Up @@ -122,3 +123,20 @@ def test_uncatchable(self):
raise GracefulShutdownInterrupt("Last call!")
except Exception:
pass # assertRaises should fail


class TimeoutInterruptTest(unittest.TestCase):
def test_catchable(self):
try:
raise TimeoutInterrupt("What's the holdup?")
except TimeoutInterrupt:
pass
else:
self.fail("Did not catch TimeoutInterrupt.")

def test_uncatchable(self):
with self.assertRaises(TimeoutInterrupt):
try:
raise TimeoutInterrupt("What's the holdup?")
except Exception:
pass # assertRaises should fail