-
Notifications
You must be signed in to change notification settings - Fork 55
DATAUP-729 job ts implementation #2960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
784009e
1a03f4e
02ae2de
96b748e
678cc6f
0ad40e5
a995eb2
fcf4be8
9020680
6bbd36d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ | |
| from biokbase.narrative.common import kblogging | ||
| from biokbase.narrative.exception_util import JobRequestException, NarrativeException | ||
| from biokbase.narrative.jobs.jobmanager import JobManager | ||
| from biokbase.narrative.jobs.util import load_job_constants | ||
| from biokbase.narrative.jobs.util import load_job_constants, time_ns | ||
|
|
||
| (PARAM, MESSAGE_TYPE) = load_job_constants() | ||
|
|
||
|
|
@@ -116,7 +116,11 @@ def cell_id_list(self): | |
|
|
||
| @property | ||
| def ts(self): | ||
| """This param is completely optional""" | ||
| """ | ||
| Optional field sent with STATUS requests indicating to filter out | ||
| job states in the STATUS response that have not been updated since | ||
| this epoch time (in ns) | ||
n1mus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| return self.rq_data.get(PARAM["TS"]) | ||
|
|
||
|
|
||
|
|
@@ -199,10 +203,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]: | |
| if req.has_batch_id(): | ||
| return self._jm.update_batch_job(req.batch_id) | ||
|
|
||
| try: | ||
| return req.job_id_list | ||
| except Exception as ex: | ||
| raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex | ||
| return req.job_id_list | ||
n1mus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def start_job_status_loop( | ||
| self, | ||
|
|
@@ -514,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: | |
| Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type | ||
| and content. These just get encoded into the message itself. | ||
| """ | ||
| # For STATUS responses, add a last_checked field | ||
| # to each output_state. Note: error states will have | ||
| # the last_checked field too | ||
| if msg_type == MESSAGE_TYPE["STATUS"]: | ||
| now = time_ns() | ||
| for output_state in content.values(): | ||
| output_state["last_checked"] = now | ||
|
Comment on lines
+521
to
+524
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not add this timestamp when the job manager is putting together the list of jobs, instead of adding an extra iteration through the job state data here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wasn't sure since the CANCEL_JOBS request also responds with a STATUS message.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I decided not to filter the STATUS response for CANCEL_JOBS though because I figured in theory they should all get updated, whether successfully or just coming back with an error
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because everything is asynchronous, the FE doesn't have any way of knowing what triggered a job status message -- whether it was a cancel request, a status request, or the BE job loop. That's why I say it's better to put the timestamp on in the job manager, so that all job state objects that the FE receives have a timestamp on them.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one allure of putting everything into JobComm is less tests surgery ... But putting it deep into the stack, at the origin of the STATUS response ds, seems less googly-eyed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I tried putting all the filtering/last_checked logic at the source |
||
|
|
||
| msg = {"msg_type": msg_type, "content": content} | ||
| self._comm.send(msg) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.