Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions lib/galaxy/config/sample/job_conf.sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,27 @@ runners:
#app: {}
#pulsar_config: path/to/pulsar/app.yml

# Using Pulsar and co-execution to run jobs against a TES service.
# - https://www.ga4gh.org/product/task-execution-service-tes/
# - https://pulsar.readthedocs.io/en/latest/containers.html#ga4gh-tes
pulsar_tes:
load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner
# RabbitMQ URL from Galaxy server (include credentials).
amqp_url: <amqp_url>
# If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
#galaxy_url: <galaxy_url>

# Using Pulsar and co-execution to run jobs against Google Cloud Platform's Batch service.
# - https://cloud.google.com/batch/docs/get-started
# - https://pulsar.readthedocs.io/en/latest/containers.html#google-cloud-platform-batch
pulsar_gcp:
load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner
# RabbitMQ URL from Galaxy server (include credentials).
amqp_url: <amqp_url>
# If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
#galaxy_url: <galaxy_url>


# Job handler configuration - for a full discussion of job handlers, see the documentation at:
# https://docs.galaxyproject.org/en/latest/admin/scaling.html
handling:
Expand Down Expand Up @@ -916,6 +937,55 @@ execution:
# Path to Kubernetes configuration fil (see Kubernetes runner description.)
#k8s_config_path: /path/to/kubeconfig

pulsar_tes_environment:
runner: pulsar_tes
# A tes_url is required
tes_url: "<tes_url>"
#basic_auth:
# username: <username>
# password: <password>
#cpu_cores: 1
# Define if the task is allowed to run on preemptible compute instances,\nfor example, AWS Spot. This option may have no effect when utilized\non some backends that don't have the concept of preemptible jobs.
#preemptible: false
#ram_gb: 8
#disk_gb: 40
# Request that the task be run in these compute zones. How this string is utilized will be dependent on the backend system. For example, a\nsystem based on a cluster queueing system may use this string to define\npriorty queue to which the job is assigned.
#zones: us-west-1
#backend_parameters: {}
#backend_parameters_strict: false
# Configure the embedded Pulsar app, only message_queue_url is required but
# other options may be useful (unsure).
pulsar_app_config:
# This needs to be the RabbitMQ server, but this should be the host
# and port that your TES nodes would connect to the server via.
message_queue_url: "<amqp_url>"

pulsar_gcp_environment:
runner: pulsar_gcp
# required
project_id: <gcp_project_id>
# Path to GCP service account credentials file. (not sure if ~ would be implicitly respected in this example)
#credentials_file: ~/.config/gcloud/application_default_credentials.json
# GCP region or zone to use (optional)
#region: us-central1
# Max walltime to use in seconds (defaults to 60 * 60 * 24)
#walltime_limit: 216000
# Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.
#retry_count: 2
# Name of the SSD volume to be mounted in the task. Shared among all containers in job.
#ssd_name: pulsar_staging
# Size of the shared local SSD disk in GB (must be a multiple of 375).
#disk_size: 375
# Machine type for the job's VM.
#machine_type: n1-standard-1
#labels: {}
# Configure the embedded Pulsar app, only message_queue_url is required but
# other options may be useful (unsure).
pulsar_app_config:
# This needs to be the RabbitMQ server, but this should be the host
# and port that your GCP compute would connect to the server via.
message_queue_url: "<amqp_url>"

# Example CLI runners.
ssh_torque:
runner: cli
Expand Down
35 changes: 23 additions & 12 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,13 @@ def register_cleanup_file_attribute(self, attribute):
if attribute not in self.cleanup_file_attributes:
self.cleanup_file_attributes.append(attribute)

def init_job_stream_files(self):
"""For runners that don't create explicit job scripts - create job stream files."""
with open(self.output_file, "w"):
pass
with open(self.error_file, "w"):
pass


class AsynchronousJobRunner(BaseJobRunner, Monitors):
"""Parent class for any job runner that runs jobs asynchronously (e.g. via
Expand Down Expand Up @@ -907,17 +914,7 @@ def check_watched_items(self):
def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]:
raise NotImplementedError()

def finish_job(self, job_state: AsynchronousJobState):
"""
Get the output/error for a finished job, pass to `job_wrapper.finish`
and cleanup all the job's temporary files.
"""
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
external_job_id = job_state.job_id

# To ensure that files below are readable, ownership must be reclaimed first
job_state.job_wrapper.reclaim_ownership()

def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
# wait for the files to appear
which_try = 0
collect_output_success = True
Expand All @@ -931,11 +928,25 @@ def finish_job(self, job_state: AsynchronousJobState):
if which_try == self.app.config.retry_job_output_collection:
stdout = ""
stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
collect_output_success = False
else:
time.sleep(1)
which_try += 1
return collect_output_success, stdout, stderr

def finish_job(self, job_state: AsynchronousJobState):
"""
Get the output/error for a finished job, pass to `job_wrapper.finish`
and cleanup all the job's temporary files.
"""
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
external_job_id = job_state.job_id

# To ensure that files below are readable, ownership must be reclaimed first
job_state.job_wrapper.reclaim_ownership()

collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)

if not collect_output_success:
job_state.fail_message = stderr
Expand Down
47 changes: 3 additions & 44 deletions lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import math
import os
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Union
Expand Down Expand Up @@ -50,7 +49,6 @@
Service,
service_object_dict,
)
from galaxy.util import unicodify
from galaxy.util.bytesize import ByteSize

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -175,12 +173,8 @@ def queue_job(self, job_wrapper):
job_wrapper=job_wrapper,
job_destination=job_wrapper.job_destination,
)
# Kubernetes doesn't really produce meaningful "job stdout", but file needs to be present
with open(ajs.output_file, "w"):
pass
with open(ajs.error_file, "w"):
pass

# Kubernetes doesn't really produce a "job script", but job stream files needs to be present
ajs.init_job_stream_files()
if not self.prepare_job(
job_wrapper,
include_metadata=False,
Expand Down Expand Up @@ -680,26 +674,6 @@ def __transform_memory_value(self, mem_value):
"""
return ByteSize(mem_value).value

def __assemble_k8s_container_image_name(self, job_wrapper):
"""Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional"""
job_destination = job_wrapper.job_destination

# Determine the job's Kubernetes destination (context, namespace) and options from the job destination
# definition
repo = ""
owner = ""
if "repo" in job_destination.params:
repo = f"{job_destination.params['repo']}/"
if "owner" in job_destination.params:
owner = f"{job_destination.params['owner']}/"

k8s_cont_image = repo + owner + job_destination.params["image"]

if "tag" in job_destination.params:
k8s_cont_image += f":{job_destination.params['tag']}"

return k8s_cont_image

def __get_k8s_container_name(self, job_wrapper):
# These must follow a specific regex for Kubernetes.
raw_id = job_wrapper.job_destination.id
Expand Down Expand Up @@ -1115,22 +1089,7 @@ def fail_job(self, job_state: "JobState", exception=False, message="Job failed",
# To ensure that files below are readable, ownership must be reclaimed first
job_state.job_wrapper.reclaim_ownership()

# wait for the files to appear
which_try = 0
while which_try < self.app.config.retry_job_output_collection + 1:
try:
with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
job_stdout = self._job_io_for_db(stdout_file)
job_stderr = self._job_io_for_db(stderr_file)
break
except Exception as e:
if which_try == self.app.config.retry_job_output_collection:
job_stdout = ""
job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
else:
time.sleep(1)
which_try += 1
_, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)

# get stderr and stdout to database
outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")
Expand Down
61 changes: 41 additions & 20 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
default_build_pulsar_app = False
use_mq = False
poll = True
client_manager_kwargs: dict[str, Any] = {}

def __init__(self, app, nworkers, **kwds):
"""Start the job runner."""
Expand Down Expand Up @@ -240,19 +241,27 @@ def _init_client_manager(self):
pulsar_conf_file = self.runner_params.get("pulsar_config", None)
self.__init_pulsar_app(pulsar_conf, pulsar_conf_file)

client_manager_kwargs = {}
for kwd in "manager", "cache", "transport", "persistence_directory":
client_manager_kwargs[kwd] = self.runner_params[kwd]
client_manager_kwargs = self._pulsar_client_manager_args()
if self.pulsar_app is not None:
client_manager_kwargs["pulsar_app"] = self.pulsar_app
# TODO: Hack remove this following line pulsar lib update
# that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232
client_manager_kwargs["file_cache"] = None
self.client_manager = build_client_manager(**client_manager_kwargs)

def _pulsar_client_manager_args(self):
"""Most connection parameters can be specified as environment parameters, but...

... global parameters about message queues, what Pulsar client to use, etc... must
be specified as runner parameters. This method returns a configuration based
on the runner parameters that is ready for Pulsar's build_client_manager.
"""
client_manager_kwargs = self.client_manager_kwargs.copy()
for kwd in "manager", "cache", "transport", "persistence_directory":
client_manager_kwargs[kwd] = self.runner_params[kwd]

for kwd in self.runner_params.keys():
if kwd.startswith("amqp_") or kwd.startswith("transport_"):
client_manager_kwargs[kwd] = self.runner_params[kwd]
self.client_manager = build_client_manager(**client_manager_kwargs)

return client_manager_kwargs

def __init_pulsar_app(self, conf, pulsar_conf_path):
if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
Expand Down Expand Up @@ -1050,45 +1059,57 @@ class PulsarMQJobRunner(PulsarJobRunner):
COEXECUTION_DESTINATION_DEFAULTS = {
"default_file_action": "remote_transfer",
"rewrite_parameters": "true",
"jobs_directory": "/pulsar_staging",
"pulsar_container_image": DEFAULT_PULSAR_CONTAINER,
"remote_container_handling": True,
"url": PARAMETER_SPECIFICATION_IGNORED,
"private_token": PARAMETER_SPECIFICATION_IGNORED,
}


class PulsarCoexecutionJobRunner(PulsarMQJobRunner):
class PulsarCoexecutionJobRunner(PulsarJobRunner):
destination_defaults = COEXECUTION_DESTINATION_DEFAULTS

def _populate_parameter_defaults(self, job_destination):
super()._populate_parameter_defaults(job_destination)
params = job_destination.params
# Set some sensible defaults for Pulsar application that runs in staging container.
if "pulsar_app_config" not in params:
params["pulsar_app_config"] = {}
pulsar_app_config = params["pulsar_app_config"]
if "staging_directory" not in pulsar_app_config:
# coexecution always uses a fixed path for staging directory
pulsar_app_config["staging_directory"] = params.get("jobs_directory")


KUBERNETES_DESTINATION_DEFAULTS: dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}

KUBERNETES_CLIENT_MANAGER_KWARGS = {"k8s_enabled": True}


class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner):
destination_defaults = KUBERNETES_DESTINATION_DEFAULTS
use_mq = True
poll = True # Poll so we can check API for pod IP for ITs.
client_manager_kwargs = KUBERNETES_CLIENT_MANAGER_KWARGS


TES_DESTINATION_DEFAULTS: dict[str, Any] = {
"tes_url": PARAMETER_SPECIFICATION_REQUIRED,
**COEXECUTION_DESTINATION_DEFAULTS,
}

TES_CLIENT_MANAGER_KWARGS = {"tes_enabled": True}


class PulsarTesJobRunner(PulsarCoexecutionJobRunner):
destination_defaults = TES_DESTINATION_DEFAULTS
client_manager_kwargs = TES_CLIENT_MANAGER_KWARGS
use_mq = True
poll = False


GCP_DESTINATION_DEFAULTS: dict[str, Any] = {
"project_id": PARAMETER_SPECIFICATION_REQUIRED,
**COEXECUTION_DESTINATION_DEFAULTS,
}
GCP_BATCH_CLIENT_MANAGER_KWARGS = {"gcp_batch_enabled": True}


class PulsarGcpBatchJobRunner(PulsarCoexecutionJobRunner):
use_mq = True
poll = False

client_manager_kwargs = GCP_BATCH_CLIENT_MANAGER_KWARGS
destination_defaults = GCP_DESTINATION_DEFAULTS


class PulsarRESTJobRunner(PulsarJobRunner):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ dependencies = [
"pebble",
"pillow",
"psutil",
"pulsar-galaxy-lib>=0.15.0.dev0",
"pulsar-galaxy-lib>=0.15.10",
"pycryptodome",
"pydantic[email]>=2.7.4", # https://github.com/pydantic/pydantic/pull/9639
"PyJWT",
Expand Down
Loading