Skip to content

Fix that all slurm jobs were canceled if a single job failed #1347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Restart Slurm Cluster
run: cd ./dockered-slurm && docker compose restart slurmctld c1 c2

- name: "Run Tests (test_all, test_slurm) with modified slurn.conf"
- name: "Run Tests (test_all, test_slurm) with modified slurm.conf"
run: |
# Run tests requiring a modified slurm config
docker exec \
Expand Down
1 change: 1 addition & 0 deletions cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
### Changed

### Fixed
- Fixed that all slurm jobs were canceled if a single job failed. [#1347](https://github.com/scalableminds/webknossos-libs/pull/1347)


## [2.4.7](https://github.com/scalableminds/webknossos-libs/releases/tag/v2.4.7) - 2025-07-30
Expand Down
6 changes: 3 additions & 3 deletions cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,15 @@ def register_jobs(

# Overwrite the context manager __exit as it doesn't forward the information whether an exception was thrown or not otherwise
# which may lead to a deadlock if an exception is thrown within a cluster executor with statement, because self.jobs_empty_cond.wait()
# never succeeds.
# never succeeds. However, don't fail hard if a RemoteException occurred as we want the other scheduled jobs to keep running.
def __exit__(
self,
exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> Literal[False]:
# Don't wait if an exception was thrown
self.shutdown(wait=exc_type is None)
# Don't wait if an exception other than a RemoteException was thrown
self.shutdown(wait=exc_type is None or issubclass(exc_type, RemoteException))
return False

def shutdown(self, wait: bool = True, cancel_futures: bool = True) -> None:
Expand Down
50 changes: 36 additions & 14 deletions cluster_tools/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,28 +374,50 @@ def test_slurm_time_limit() -> None:


@pytest.mark.requires_modified_slurm_config
def test_slurm_memory_limit() -> None:
def test_slurm_memory_limit(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("SLURM_MAX_RUNNING_SIZE", "2")

# Request 30 MB
executor = cluster_tools.get_executor(
"slurm",
debug=True,
job_resources={"mem": "30M"}, # 30M is the smallest limit enforced by Cgroups
)

with executor:
# Schedule a job that allocates more than 30 MB and let it run for more than 1 second
# because the frequency of the memory polling is 1 second
duration = 3
futures = executor.map_to_futures(
partial(allocate, duration), [1024 * 1024 * 50]
)
concurrent.futures.wait(futures)

# Job should have been killed with a RemoteOutOfMemoryException
assert all(
isinstance(fut.exception(), cluster_tools.RemoteOutOfMemoryException)
for fut in futures
job_ids = []
with pytest.raises(
(
cluster_tools.RemoteOutOfMemoryException,
cluster_tools.schedulers.cluster_executor.RemoteException,
)
):
with executor:
# Schedule a job that allocates more than 30 MB and let it run for more than 1 second
# because the frequency of the memory polling is 1 second
duration = 3
futures = executor.map_to_futures(
partial(allocate, duration), [1024 * 1024 * 50, 10, 20, 30]
)
for future in futures:
future.add_done_callback(
lambda f: job_ids.append(f"{f.cluster_jobid}_{f.cluster_jobindex}") # type: ignore[attr-defined]
)
# Wait for jobs to finish
[fut.result() for fut in futures]

# Check that all jobs but one ran successfully
job_states = []
for job_id in job_ids:
# Show the overall job state (-X), without a header (-n) and without extra whitespace (-P)
stdout, _, exit_code = call(f"sacct -j {job_id} -o State -P -n -X")
assert exit_code == 0
job_states.append(stdout.strip())

# Although one job failed, the other jobs should have continued running and succeeded
assert job_states.count("FAILED") == 1
assert job_states.count("COMPLETED") == 3


def test_slurm_max_array_size_env() -> None:
Expand Down
Loading