diff --git a/lib/galaxy/config/sample/job_conf.sample.yml b/lib/galaxy/config/sample/job_conf.sample.yml index 21387e6dda21..1cfbf132b62a 100644 --- a/lib/galaxy/config/sample/job_conf.sample.yml +++ b/lib/galaxy/config/sample/job_conf.sample.yml @@ -338,6 +338,27 @@ runners: #app: {} #pulsar_config: path/to/pulsar/app.yml + # Using Pulsar and co-execution to run jobs against a TES service. + # - https://www.ga4gh.org/product/task-execution-service-tes/ + # - https://pulsar.readthedocs.io/en/latest/containers.html#ga4gh-tes + pulsar_tes: + load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner + # RabbitMQ URL from Galaxy server (include credentials). + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + # Using Pulsar and co-execution to run jobs against Google Cloud Platform's Batch service. + # - https://cloud.google.com/batch/docs/get-started + # - https://pulsar.readthedocs.io/en/latest/containers.html#google-cloud-platform-batch + pulsar_gcp: + load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner + # RabbitMQ URL from Galaxy server (include credentials). + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + # Job handler configuration - for a full discussion of job handlers, see the documentation at: # https://docs.galaxyproject.org/en/latest/admin/scaling.html handling: @@ -916,6 +937,55 @@ execution: # Path to Kubernetes configuration fil (see Kubernetes runner description.) #k8s_config_path: /path/to/kubeconfig + pulsar_tes_environment: + runner: pulsar_tes + # A tes_url is required + tes_url: "" + #basic_auth: + # username: + # password: + #cpu_cores: 1 + # Define if the task is allowed to run on preemptible compute instances,\nfor example, AWS Spot. This option may have no effect when utilized\non some backends that don't have the concept of preemptible jobs. + #preemptible: false + #ram_gb: 8 + #disk_gb: 40 + # Request that the task be run in these compute zones. How this string is utilized will be dependent on the backend system. For example, a\nsystem based on a cluster queueing system may use this string to define\npriorty queue to which the job is assigned. + #zones: us-west-1 + #backend_parameters: {} + #backend_parameters_strict: false + # Configure the embedded Pulsar app, only message_queue_url is required but + # other options may be useful (unsure). + pulsar_app_config: + # This needs to be the RabbitMQ server, but this should be the host + # and port that your TES nodes would connect to the server via. + message_queue_url: "" + + pulsar_gcp_environment: + runner: pulsar_gcp + # required + project_id: + # Path to GCP service account credentials file. (not sure if ~ would be implicitly respected in this example) + #credentials_file: ~/.config/gcloud/application_default_credentials.json + # GCP region or zone to use (optional) + #region: us-central1 + # Max walltime to use in seconds (defaults to 60 * 60 * 24) + #walltime_limit: 216000 + # Maximum number of retries for the job. Maps to TaskSpec.max_retry_count. + #retry_count: 2 + # Name of the SSD volume to be mounted in the task. Shared among all containers in job. + #ssd_name: pulsar_staging + # Size of the shared local SSD disk in GB (must be a multiple of 375). + #disk_size: 375 + # Machine type for the job's VM. + #machine_type: n1-standard-1 + #labels: {} + # Configure the embedded Pulsar app, only message_queue_url is required but + # other options may be useful (unsure). + pulsar_app_config: + # This needs to be the RabbitMQ server, but this should be the host + # and port that your GCP compute would connect to the server via. + message_queue_url: "" + # Example CLI runners. ssh_torque: runner: cli diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index e141808bca6b..d21b43a316d8 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -818,6 +818,13 @@ def register_cleanup_file_attribute(self, attribute): if attribute not in self.cleanup_file_attributes: self.cleanup_file_attributes.append(attribute) + def init_job_stream_files(self): + """For runners that don't create explicit job scripts - create job stream files.""" + with open(self.output_file, "w"): + pass + with open(self.error_file, "w"): + pass + class AsynchronousJobRunner(BaseJobRunner, Monitors): """Parent class for any job runner that runs jobs asynchronously (e.g. via @@ -907,17 +914,7 @@ def check_watched_items(self): def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]: raise NotImplementedError() - def finish_job(self, job_state: AsynchronousJobState): - """ - Get the output/error for a finished job, pass to `job_wrapper.finish` - and cleanup all the job's temporary files. - """ - galaxy_id_tag = job_state.job_wrapper.get_id_tag() - external_job_id = job_state.job_id - - # To ensure that files below are readable, ownership must be reclaimed first - job_state.job_wrapper.reclaim_ownership() - + def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState): # wait for the files to appear which_try = 0 collect_output_success = True @@ -931,11 +928,25 @@ def finish_job(self, job_state: AsynchronousJobState): if which_try == self.app.config.retry_job_output_collection: stdout = "" stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER - log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e)) + log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e)) collect_output_success = False else: time.sleep(1) which_try += 1 + return collect_output_success, stdout, stderr + + def finish_job(self, job_state: AsynchronousJobState): + """ + Get the output/error for a finished job, pass to `job_wrapper.finish` + and cleanup all the job's temporary files. + """ + galaxy_id_tag = job_state.job_wrapper.get_id_tag() + external_job_id = job_state.job_id + + # To ensure that files below are readable, ownership must be reclaimed first + job_state.job_wrapper.reclaim_ownership() + + collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state) if not collect_output_success: job_state.fail_message = stderr diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index 5f0fbe5d4969..36fef4665b5a 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -7,7 +7,6 @@ import math import os import re -import time from dataclasses import dataclass from datetime import datetime from typing import Union @@ -50,7 +49,6 @@ Service, service_object_dict, ) -from galaxy.util import unicodify from galaxy.util.bytesize import ByteSize log = logging.getLogger(__name__) @@ -175,12 +173,8 @@ def queue_job(self, job_wrapper): job_wrapper=job_wrapper, job_destination=job_wrapper.job_destination, ) - # Kubernetes doesn't really produce meaningful "job stdout", but file needs to be present - with open(ajs.output_file, "w"): - pass - with open(ajs.error_file, "w"): - pass - + # Kubernetes doesn't really produce a "job script", but job stream files needs to be present + ajs.init_job_stream_files() if not self.prepare_job( job_wrapper, include_metadata=False, @@ -680,26 +674,6 @@ def __transform_memory_value(self, mem_value): """ return ByteSize(mem_value).value - def __assemble_k8s_container_image_name(self, job_wrapper): - """Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional""" - job_destination = job_wrapper.job_destination - - # Determine the job's Kubernetes destination (context, namespace) and options from the job destination - # definition - repo = "" - owner = "" - if "repo" in job_destination.params: - repo = f"{job_destination.params['repo']}/" - if "owner" in job_destination.params: - owner = f"{job_destination.params['owner']}/" - - k8s_cont_image = repo + owner + job_destination.params["image"] - - if "tag" in job_destination.params: - k8s_cont_image += f":{job_destination.params['tag']}" - - return k8s_cont_image - def __get_k8s_container_name(self, job_wrapper): # These must follow a specific regex for Kubernetes. raw_id = job_wrapper.job_destination.id @@ -1115,22 +1089,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed", # To ensure that files below are readable, ownership must be reclaimed first job_state.job_wrapper.reclaim_ownership() - # wait for the files to appear - which_try = 0 - while which_try < self.app.config.retry_job_output_collection + 1: - try: - with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file: - job_stdout = self._job_io_for_db(stdout_file) - job_stderr = self._job_io_for_db(stderr_file) - break - except Exception as e: - if which_try == self.app.config.retry_job_output_collection: - job_stdout = "" - job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER - log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}") - else: - time.sleep(1) - which_try += 1 + _, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state) # get stderr and stdout to database outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs") diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 314f6a953d2c..73133bcf5fdc 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -210,6 +210,7 @@ class PulsarJobRunner(AsynchronousJobRunner): default_build_pulsar_app = False use_mq = False poll = True + client_manager_kwargs: dict[str, Any] = {} def __init__(self, app, nworkers, **kwds): """Start the job runner.""" @@ -240,19 +241,27 @@ def _init_client_manager(self): pulsar_conf_file = self.runner_params.get("pulsar_config", None) self.__init_pulsar_app(pulsar_conf, pulsar_conf_file) - client_manager_kwargs = {} - for kwd in "manager", "cache", "transport", "persistence_directory": - client_manager_kwargs[kwd] = self.runner_params[kwd] + client_manager_kwargs = self._pulsar_client_manager_args() if self.pulsar_app is not None: client_manager_kwargs["pulsar_app"] = self.pulsar_app - # TODO: Hack remove this following line pulsar lib update - # that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232 - client_manager_kwargs["file_cache"] = None + self.client_manager = build_client_manager(**client_manager_kwargs) + + def _pulsar_client_manager_args(self): + """Most connection parameters can be specified as environment parameters, but... + + ... global parameters about message queues, what Pulsar client to use, etc... must + be specified as runner parameters. This method returns a configuration based + on the runner parameters that is ready for Pulsar's build_client_manager. + """ + client_manager_kwargs = self.client_manager_kwargs.copy() + for kwd in "manager", "cache", "transport", "persistence_directory": + client_manager_kwargs[kwd] = self.runner_params[kwd] for kwd in self.runner_params.keys(): if kwd.startswith("amqp_") or kwd.startswith("transport_"): client_manager_kwargs[kwd] = self.runner_params[kwd] - self.client_manager = build_client_manager(**client_manager_kwargs) + + return client_manager_kwargs def __init_pulsar_app(self, conf, pulsar_conf_path): if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app: @@ -1050,7 +1059,6 @@ class PulsarMQJobRunner(PulsarJobRunner): COEXECUTION_DESTINATION_DEFAULTS = { "default_file_action": "remote_transfer", "rewrite_parameters": "true", - "jobs_directory": "/pulsar_staging", "pulsar_container_image": DEFAULT_PULSAR_CONTAINER, "remote_container_handling": True, "url": PARAMETER_SPECIFICATION_IGNORED, @@ -1058,27 +1066,20 @@ class PulsarMQJobRunner(PulsarJobRunner): } -class PulsarCoexecutionJobRunner(PulsarMQJobRunner): +class PulsarCoexecutionJobRunner(PulsarJobRunner): destination_defaults = COEXECUTION_DESTINATION_DEFAULTS - def _populate_parameter_defaults(self, job_destination): - super()._populate_parameter_defaults(job_destination) - params = job_destination.params - # Set some sensible defaults for Pulsar application that runs in staging container. - if "pulsar_app_config" not in params: - params["pulsar_app_config"] = {} - pulsar_app_config = params["pulsar_app_config"] - if "staging_directory" not in pulsar_app_config: - # coexecution always uses a fixed path for staging directory - pulsar_app_config["staging_directory"] = params.get("jobs_directory") - KUBERNETES_DESTINATION_DEFAULTS: dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS} +KUBERNETES_CLIENT_MANAGER_KWARGS = {"k8s_enabled": True} + class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner): destination_defaults = KUBERNETES_DESTINATION_DEFAULTS + use_mq = True poll = True # Poll so we can check API for pod IP for ITs. + client_manager_kwargs = KUBERNETES_CLIENT_MANAGER_KWARGS TES_DESTINATION_DEFAULTS: dict[str, Any] = { @@ -1086,9 +1087,29 @@ class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner): **COEXECUTION_DESTINATION_DEFAULTS, } +TES_CLIENT_MANAGER_KWARGS = {"tes_enabled": True} + class PulsarTesJobRunner(PulsarCoexecutionJobRunner): destination_defaults = TES_DESTINATION_DEFAULTS + client_manager_kwargs = TES_CLIENT_MANAGER_KWARGS + use_mq = True + poll = False + + +GCP_DESTINATION_DEFAULTS: dict[str, Any] = { + "project_id": PARAMETER_SPECIFICATION_REQUIRED, + **COEXECUTION_DESTINATION_DEFAULTS, +} +GCP_BATCH_CLIENT_MANAGER_KWARGS = {"gcp_batch_enabled": True} + + +class PulsarGcpBatchJobRunner(PulsarCoexecutionJobRunner): + use_mq = True + poll = False + + client_manager_kwargs = GCP_BATCH_CLIENT_MANAGER_KWARGS + destination_defaults = GCP_DESTINATION_DEFAULTS class PulsarRESTJobRunner(PulsarJobRunner): diff --git a/pyproject.toml b/pyproject.toml index b1d5f3fd27c2..baf95280978a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,7 +70,7 @@ dependencies = [ "pebble", "pillow", "psutil", - "pulsar-galaxy-lib>=0.15.0.dev0", + "pulsar-galaxy-lib>=0.15.10", "pycryptodome", "pydantic[email]>=2.7.4", # https://github.com/pydantic/pydantic/pull/9639 "PyJWT",