From 7cdac00718d296933ac88a5fc6d4d05679aa0a93 Mon Sep 17 00:00:00 2001 From: Avni Gandhi Date: Tue, 5 May 2026 12:10:10 -0700 Subject: [PATCH 1/4] Harden CPU overuse warning checks and add unit tests --- src/toil/job.py | 48 +++++++++++++++++++++++++----------- src/toil/test/src/jobTest.py | 47 +++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/src/toil/job.py b/src/toil/job.py index fa42eb8fdc..6075d8692e 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -3304,20 +3304,8 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[ ) job_time = time.time() - startTime job_cpu_time = totalCpuTime - startClock - allocated_cpu_time = job_time * self.cores - - if job_cpu_time > allocated_cpu_time and allocated_cpu_time > 0: - # Too much CPU was used by this job! Maybe we're using a batch - # system that doesn't/can't sandbox us and we started too many - # threads. Complain to the user! - excess_factor = job_cpu_time / allocated_cpu_time - fileStore.log_to_leader( - f"Job {self.description} used {excess_factor:.2f}x more " - f"CPU than the requested {self.cores} cores. Consider " - f"increasing the job's required CPU cores or limiting the " - f"number of processes/threads launched.", - level=logging.WARNING, - ) + + self._check_cpu_usage(job_time, job_cpu_time, fileStore) # Finish up the stats if stats is not None: @@ -3337,6 +3325,38 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[ succeeded=str(succeeded), ) ) + + + def _check_cpu_usage( + self, + job_time: float, + job_cpu_time: float, + fileStore: AbstractFileStore, + threshold_factor: float = 1.05, + ) -> None: + """Log a warning when a job consumes more CPU time than its allocation.""" + if job_time <= 0: + # The job may have run too quickly for wall-clock resolution. + return + if self.cores <= 0: + # Jobs should not have non-positive core requests, but avoid divide-by-zero. + fileStore.log_to_leader( + f"Job {self.description} has invalid requested cores value " + f"{self.cores}; cannot evaluate CPU overuse.", + level=logging.WARNING, + ) + return + allocated_cpu_time = job_time * self.cores + + excess_factor = job_cpu_time / allocated_cpu_time + if excess_factor > threshold_factor: + fileStore.log_to_leader( + f"Job {self.description} used {excess_factor:.2f}x more " + f"CPU than the requested {self.cores} cores. Consider " + f"increasing the job's required CPU cores or limiting the " + f"number of processes/threads launched.", + level=logging.WARNING, + ) def _runner( self, diff --git a/src/toil/test/src/jobTest.py b/src/toil/test/src/jobTest.py index ec6a7f9bdd..92b155147a 100644 --- a/src/toil/test/src/jobTest.py +++ b/src/toil/test/src/jobTest.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import collections +import logging import os import random import time +from unittest.mock import Mock from collections.abc import Callable from pathlib import Path from typing import Any, Callable, NoReturn, cast @@ -35,6 +37,7 @@ class TestJob: + """Tests the job class.""" @slow @@ -815,7 +818,51 @@ def cyclic(fNode: int, visited: set[int], stack: list[int]) -> bool | int: if cyclic(i, visited, []): return False return True + + def test_cpu_usage(self, subtests: pytest.Subtests) -> None: + with subtests.test("no warnings for non-positive runtime"): + job = Job() + file_store = Mock() + job._check_cpu_usage( + job_time=0.0, job_cpu_time=100.0, fileStore=file_store) + job._check_cpu_usage( + job_time=-1.0, job_cpu_time=100.0, fileStore=file_store + ) + file_store.log_to_leader.assert_not_called() + + with subtests.test("warn for invalid cores value"): + job = Job() + file_store = Mock() + job.cores = 0 + job._check_cpu_usage( + job_time=10.0, job_cpu_time=10.0, fileStore=file_store + ) + file_store.log_to_leader.assert_called_once() + invalid_cores_message = file_store.log_to_leader.call_args.args[0] + invalid_cores_level = file_store.log_to_leader.call_args.kwargs["level"] + assert "invalid requested cores value 0" in invalid_cores_message + assert invalid_cores_level == logging.WARNING + + with subtests.test("threshold boundary does not warn"): + threshold_job = Job(cores=2) + threshold_store = Mock() + threshold_job._check_cpu_usage( + job_time=10.0, job_cpu_time=21.0, fileStore=threshold_store + ) + threshold_store.log_to_leader.assert_not_called() + with subtests.test("just above threshold warns"): + threshold_job = Job(cores=2) + threshold_store = Mock() + threshold_job._check_cpu_usage( + job_time=10.0, job_cpu_time=21.1, fileStore=threshold_store + ) + threshold_store.log_to_leader.assert_called_once() + cpu_message = threshold_store.log_to_leader.call_args.args[0] + cpu_level = threshold_store.log_to_leader.call_args.kwargs["level"] + assert "used " in cpu_message + assert "more CPU than the requested 2 cores" in cpu_message + assert cpu_level == logging.WARNING def simpleJobFn(job: ServiceHostJob, value: str) -> None: job.fileStore.log_to_leader(value) From f37f102c1992e2f64153ea47ed00892a43c648df Mon Sep 17 00:00:00 2001 From: Avni Gandhi Date: Mon, 11 May 2026 14:53:43 -0700 Subject: [PATCH 2/4] address CPU usage warning test review feedback --- src/toil/test/src/jobTest.py | 40 ++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/toil/test/src/jobTest.py b/src/toil/test/src/jobTest.py index 92b155147a..37f600fa48 100644 --- a/src/toil/test/src/jobTest.py +++ b/src/toil/test/src/jobTest.py @@ -824,44 +824,54 @@ def test_cpu_usage(self, subtests: pytest.Subtests) -> None: job = Job() file_store = Mock() job._check_cpu_usage( - job_time=0.0, job_cpu_time=100.0, fileStore=file_store) + job_time=0.0, + job_cpu_time=100.0, + fileStore=file_store + ) job._check_cpu_usage( - job_time=-1.0, job_cpu_time=100.0, fileStore=file_store + job_time=-1.0, + job_cpu_time=100.0, + fileStore=file_store ) file_store.log_to_leader.assert_not_called() - with subtests.test("warn for invalid cores value"): + with subtests.test("accepts 0 cores"): job = Job() file_store = Mock() job.cores = 0 job._check_cpu_usage( - job_time=10.0, job_cpu_time=10.0, fileStore=file_store + job_time=10.0, + job_cpu_time=10.0, + fileStore=file_store ) - file_store.log_to_leader.assert_called_once() - invalid_cores_message = file_store.log_to_leader.call_args.args[0] - invalid_cores_level = file_store.log_to_leader.call_args.kwargs["level"] - assert "invalid requested cores value 0" in invalid_cores_message - assert invalid_cores_level == logging.WARNING + TIME = 10.0 + THRESHOLD = 1.05 + CORES = 2 with subtests.test("threshold boundary does not warn"): - threshold_job = Job(cores=2) + threshold_job = Job(cores=CORES) threshold_store = Mock() threshold_job._check_cpu_usage( - job_time=10.0, job_cpu_time=21.0, fileStore=threshold_store + job_time=TIME, + job_cpu_time=TIME * CORES * THRESHOLD, + fileStore=threshold_store, + threshold_factor=THRESHOLD, ) threshold_store.log_to_leader.assert_not_called() with subtests.test("just above threshold warns"): - threshold_job = Job(cores=2) + threshold_job = Job(cores=CORES) threshold_store = Mock() threshold_job._check_cpu_usage( - job_time=10.0, job_cpu_time=21.1, fileStore=threshold_store + job_time=TIME, + job_cpu_time=TIME * CORES * THRESHOLD + 0.1, + fileStore=threshold_store, + threshold_factor=THRESHOLD, ) threshold_store.log_to_leader.assert_called_once() cpu_message = threshold_store.log_to_leader.call_args.args[0] cpu_level = threshold_store.log_to_leader.call_args.kwargs["level"] - assert "used " in cpu_message - assert "more CPU than the requested 2 cores" in cpu_message + assert str(threshold_job.description) in cpu_message assert cpu_level == logging.WARNING def simpleJobFn(job: ServiceHostJob, value: str) -> None: From e5fda4c31a6cbe5c9fdfd7f3c3c58c5122aa36ee Mon Sep 17 00:00:00 2001 From: Avni Gandhi Date: Mon, 11 May 2026 15:38:06 -0700 Subject: [PATCH 3/4] address CPU usage warning test review feedback --- src/toil/job.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/toil/job.py b/src/toil/job.py index 6075d8692e..ca39f92500 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -3334,7 +3334,15 @@ def _check_cpu_usage( fileStore: AbstractFileStore, threshold_factor: float = 1.05, ) -> None: - """Log a warning when a job consumes more CPU time than its allocation.""" + """ + Log a warning when a job consumes more CPU time than its allocation. + + If *job_time* is non-positive, returns immediately (wall-clock time may + lack resolution for very short jobs). If *cores* is non-positive, logs + a warning that overuse cannot be assessed and returns, avoiding + division by zero. Otherwise, compares *job_cpu_time* to + ``job_time * cores * threshold_factor``. + """ if job_time <= 0: # The job may have run too quickly for wall-clock resolution. return From c4c69a4234175a2f5e2447b3ce2d68762c8f69bb Mon Sep 17 00:00:00 2001 From: Avni Gandhi Date: Thu, 28 May 2026 12:43:36 -0700 Subject: [PATCH 4/4] add injected container cpu/memory accounting for cwl and wdl --- src/toil/cwl/cwltoil.py | 103 ++++++++++++++++++- src/toil/lib/interpreter.py | 194 +++++++++++++++++++++++++++++++++++ src/toil/test/cwl/cwlTest.py | 111 ++++++++++++++++++++ src/toil/wdl/wdltoil.py | 157 +++------------------------- 4 files changed, 418 insertions(+), 147 deletions(-) create mode 100644 src/toil/lib/interpreter.py diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index 726dda7098..bf6a2ae778 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -31,13 +31,21 @@ import pprint import shutil import stat +import subprocess import sys import textwrap import uuid # This is also in configargparse but MyPy doesn't know it from argparse import RawDescriptionHelpFormatter -from collections.abc import Callable, Iterator, Mapping, MutableMapping, MutableSequence +from collections.abc import ( + Callable, + Generator, + Iterator, + Mapping, + MutableMapping, + MutableSequence, +) from tempfile import NamedTemporaryFile, TemporaryFile, gettempdir from threading import Thread from typing import IO, Any, Literal, Optional, Protocol, TextIO, TypeVar, Union, cast @@ -67,6 +75,8 @@ fill_in_defaults, shortname, ) +from cwltool.docker import DockerCommandLineJob, PodmanCommandLineJob +from cwltool.job import ContainerCommandLineJob from cwltool.secrets import SecretStore from cwltool.singularity import SingularityCommandLineJob from cwltool.software_requirements import ( @@ -102,6 +112,12 @@ from toil.common import Config, Toil, addOptions from toil.cwl import check_cwltool_version from toil.lib.directory import DirectoryContents, decode_directory, encode_directory +from toil.lib.interpreter import ( + add_injections, + command_line_to_shell_script, + handle_injection_messages_from_outdir, + shell_script_to_command_line, +) from toil.lib.misc import call_command from toil.lib.trs import resolve_workflow from toil.provisioners.clusterScaler import JobTooBigError @@ -1086,6 +1102,32 @@ def run_jobs( return super().run_jobs(process, job_order_object, logger, runtime_context) +class ToilContainerCommandLineJob(ContainerCommandLineJob): + """Container job that collects resource stats from injected in-container code.""" + + def _execute( + self, + runtime: list[str], + env: MutableMapping[str, str], + runtimeContext: cwltool.context.RuntimeContext, + monitor_function: Callable[["subprocess.Popen[str]"], None] | None = None, + ) -> None: + super()._execute(runtime, env, runtimeContext, monitor_function) + handle_injection_messages_from_outdir(self.outdir) + + +class ToilDockerCommandLineJob(ToilContainerCommandLineJob, DockerCommandLineJob): + """Docker container job with Toil runtime injection support.""" + + +class ToilPodmanCommandLineJob(ToilContainerCommandLineJob, PodmanCommandLineJob): + """Podman container job with Toil runtime injection support.""" + + +class ToilSingularityCommandLineJob(ToilContainerCommandLineJob, SingularityCommandLineJob): + """Singularity container job with Toil runtime injection support.""" + + class ToilTool: """Mixin to hook Toil into a cwltool tool type.""" @@ -1141,7 +1183,64 @@ def __str__(self) -> str: class ToilCommandLineTool(ToilTool, cwltool.command_line_tool.CommandLineTool): - """Subclass the cwltool command line tool to provide the custom ToilPathMapper.""" + """Subclass the cwltool command line tool to provide the custom ToilPathMapper + and add the monitoring code to the job's container command line.""" + + def make_job_runner( + self, runtimeContext: cwltool.context.RuntimeContext + ) -> type[cwltool.job.JobBase]: + """Use Toil container job classes that collect injected runtime messages.""" + parent_class = super().make_job_runner(runtimeContext) + if parent_class is DockerCommandLineJob: + return ToilDockerCommandLineJob + if parent_class is PodmanCommandLineJob: + return ToilPodmanCommandLineJob + if parent_class is SingularityCommandLineJob: + return ToilSingularityCommandLineJob + return parent_class + + def _uses_container(self, runtimeContext: cwltool.context.RuntimeContext) -> bool: + if not runtimeContext.use_container: + return False + docker_req, _ = self.get_requirement("DockerRequirement") + if docker_req is not None: + return True + if runtimeContext.find_default_container is not None: + return runtimeContext.find_default_container(self) is not None + return runtimeContext.default_container is not None + + @staticmethod + def _file_mounts_from_pathmapper( + job: ContainerCommandLineJob, + ) -> list[tuple[str, str]]: + file_mounts: list[tuple[str, str]] = [] + for location in job.pathmapper.files(): + ent = job.pathmapper.mapper(location) + if ent.type == "File" and not ent.resolved.startswith("_:"): + file_mounts.append((ent.resolved, ent.target)) + return file_mounts + + def job( + self, + job_order: CWLObjectType, + output_callbacks: Callable[[CWLObjectType | None, str], None], + runtimeContext: cwltool.context.RuntimeContext, + ) -> Generator[ + cwltool.job.JobBase | cwltool.command_line_tool.CallbackJob, None, None + ]: + """ + Override the job method to inject resource (cpu & memory) monitoring code into + the job's container command line. + """ + for job in super().job(job_order, output_callbacks, runtimeContext): + if isinstance(job, ContainerCommandLineJob) and self._uses_container( + runtimeContext + ): + file_mounts = self._file_mounts_from_pathmapper(job) + script = command_line_to_shell_script(job.command_line) + script = add_injections(script, file_mounts) + job.command_line = shell_script_to_command_line(script) + yield job def _initialworkdir( self, j: cwltool.job.JobBase | None, builder: cwltool.builder.Builder diff --git a/src/toil/lib/interpreter.py b/src/toil/lib/interpreter.py new file mode 100644 index 0000000000..35ab399a37 --- /dev/null +++ b/src/toil/lib/interpreter.py @@ -0,0 +1,194 @@ +import glob +import logging +import os +import platform +import shlex +import textwrap +from typing import Iterable + +from toil.lib.resources import ResourceMonitor + +logger = logging.getLogger(__name__) + +### +# Runtime code injection system +# When a workflow steps runs inside a container, the Toil worker process on the host +# often cannot see how much CPU and RAM that step actually used. This system allows +# the step to inject code into the container that will write resource usage information +# to files in a directory, which the Toil worker process can then read and use to +# update the resource usage stats. +### + +# Runtime code injected in the container communicates back to the rest of the +# runtime through files in this directory. +INJECTED_MESSAGE_DIR = ".toil_runtime" + + +# Helper function to convert a CWL command from argv list to one shell script string +# that can be executed in the container. +def command_line_to_shell_script(command_line: list[str]) -> str: + """ + Extract or synthesize the inner shell script from a cwltool argv list. + + cwltool uses ``["/bin/sh", "-c", script]`` when ShellCommandRequirement is + present, and a flat argv list otherwise. + """ + if ( + len(command_line) >= 3 + and command_line[0] == "/bin/sh" + and command_line[1] == "-c" + ): + return command_line[2] + return " ".join(shlex.quote(arg) for arg in command_line) # this is the shell script string + + +# Helper function to convert a shell script back into an argv list that can be used by cwltool. +def shell_script_to_command_line(script: str) -> list[str]: + """Wrap a shell script for cwltool/docker (injection requires bash).""" + return ["/bin/bash", "-c", script] + +# Core function to inject the resource usage monitoring code into the command line for the docker swarm container. +# Takes the command string and the file mounts and returns the modified command string. +def add_injections( + command_string: str, + file_mounts: Iterable[tuple[str, str]], + message_dir: str = INJECTED_MESSAGE_DIR, +) -> str: + """ + Inject extra Bash code from the Toil runtime into the command for the container. + + Currently doesn't implement the MiniWDL plugin system, but does add + resource usage monitoring to Docker containers. + """ + + parts = [] + # We're running on Docker Swarm, so we need to monitor CPU usage + # and so on from inside the container, since it won't be attributed + # to Toil child processes in the leader's self-monitoring. + # TODO: Mount this from a file Toil installs instead or something. + script = textwrap.dedent( + """\ + function _toil_resource_monitor () { + # Turn off error checking and echo in here + set +ex + MESSAGE_DIR="${1}" + mkdir -p "${MESSAGE_DIR}" + + function sample_cpu_usec() { + if [[ -f /sys/fs/cgroup/cpu.stat ]] ; then + awk '{ if ($1 == "usage_usec") {print $2} }' /sys/fs/cgroup/cpu.stat + elif [[ -f /sys/fs/cgroup/cpuacct/cpuacct.stat ]] ; then + echo $(( $(head -n 1 /sys/fs/cgroup/cpuacct/cpuacct.stat | cut -f2 -d' ') * 10000 )) + fi + } + + function sample_memory_bytes() { + if [[ -f /sys/fs/cgroup/memory.stat ]] ; then + awk '{ if ($1 == "anon") { print $2 } }' /sys/fs/cgroup/memory.stat + elif [[ -f /sys/fs/cgroup/memory/memory.stat ]] ; then + awk '{ if ($1 == "total_rss") { print $2 } }' /sys/fs/cgroup/memory/memory.stat + fi + } + + while true ; do + printf "CPU\\t" >> ${MESSAGE_DIR}/resources.tsv + sample_cpu_usec >> ${MESSAGE_DIR}/resources.tsv + printf "Memory\\t" >> ${MESSAGE_DIR}/resources.tsv + sample_memory_bytes >> ${MESSAGE_DIR}/resources.tsv + sleep 1 + done + } + """ + ) + parts.append(script) + # Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell + parts.append(f"(_toil_resource_monitor {message_dir} &)") + + if platform.system() == "Darwin": + # With gRPC FUSE file sharing, files immediately downloaded before + # being mounted may appear as size 0 in the container due to a race + # condition. Check for this and produce an approperiate error. + + script = textwrap.dedent( + """\ + function _toil_check_size () { + TARGET_FILE="${1}" + GOT_SIZE="$(stat -c %s "${TARGET_FILE}")" + EXPECTED_SIZE="${2}" + if [[ "${GOT_SIZE}" != "${EXPECTED_SIZE}" ]] ; then + echo >&2 "Toil Error:" + echo >&2 "File size visible in container for ${TARGET_FILE} is size ${GOT_SIZE} but should be size ${EXPECTED_SIZE}" + echo >&2 "Are you using gRPC FUSE file sharing in Docker Desktop?" + echo >&2 "It doesn't work: see ." + exit 1 + fi + } + """ + ) + parts.append(script) + for host_path, job_path in file_mounts: + expected_size = os.path.getsize(host_path) + if expected_size != 0: + parts.append(f'_toil_check_size "{job_path}" {expected_size}') + + parts.append(command_string) + + return "\n".join(parts) + +# Helper function to parse the injected resource usage monitoring code and update the resource usage stats. +def handle_message_file(file_path: str) -> None: + """ + Handle a message file received from in-container injected code. + + Takes the host-side path of the file. + """ + if os.path.basename(file_path) == "resources.tsv": + # This is a TSV of resource usage info. + first_cpu_usec: int | None = None + last_cpu_usec: int | None = None + max_memory_bytes: int | None = None + + for line in open(file_path): + if not line.endswith("\n"): + # Skip partial lines + continue + # For each full line we got + parts = line.strip().split("\t") + if len(parts) != 2: + # Skip odd-shaped lines + continue + if parts[0] == "CPU": + # Parse CPU usage + cpu_usec = int(parts[1]) + # Update summary stats + if first_cpu_usec is None: + first_cpu_usec = cpu_usec + last_cpu_usec = cpu_usec + elif parts[0] == "Memory": + # Parse memory usage + memory_bytes = int(parts[1]) + # Update summary stats + if max_memory_bytes is None or max_memory_bytes < memory_bytes: + max_memory_bytes = memory_bytes + + if max_memory_bytes is not None: + logger.info( + "Container used at about %s bytes of memory at peak", + max_memory_bytes, + ) + # Treat it as if used by a child process + ResourceMonitor.record_extra_memory(max_memory_bytes // 1024) + if last_cpu_usec is not None: + assert first_cpu_usec is not None + cpu_seconds = (last_cpu_usec - first_cpu_usec) / 1000000 + logger.info("Container used about %s seconds of CPU time", cpu_seconds) + # Treat it as if used by a child process + ResourceMonitor.record_extra_cpu(cpu_seconds) + +# Helper function that is a convenience wrapper around the handle_message_file function. +def handle_injection_messages_from_outdir(outdir: str) -> None: + """Read and handle any message files left in the job outdir by injected code.""" + message_dir = os.path.join(outdir, INJECTED_MESSAGE_DIR) + for file_path in glob.glob(os.path.join(message_dir, "*")): + if os.path.isfile(file_path): + handle_message_file(file_path) diff --git a/src/toil/test/cwl/cwlTest.py b/src/toil/test/cwl/cwlTest.py index d440374fdf..fcf63f5774 100644 --- a/src/toil/test/cwl/cwlTest.py +++ b/src/toil/test/cwl/cwlTest.py @@ -40,6 +40,7 @@ from schema_salad.exceptions import ValidationException +from toil.common import Toil from toil.cwl.utils import ( DirectoryStructure, download_structure, @@ -66,6 +67,7 @@ from toil.test import pneeds_torque as needs_torque from toil.test import pneeds_wes_server as needs_wes_server from toil.test import pslow as slow +from toil.utils.toilStats import get_stats, process_data log = logging.getLogger(__name__) CONFORMANCE_TEST_TIMEOUT = 10000 @@ -482,6 +484,37 @@ def test_s3_as_secondary_file(self, tmp_path: Path) -> None: with open(out["output"]["location"][len("file://") :]) as f: assert f.read().strip() == "When is s4 coming out?" + @needs_docker + @pytest.mark.docker + @pytest.mark.online + @pytest.mark.timeout(180) + def test_cpu_memory_monitoring(self, tmp_path: Path) -> None: + """Container CPU and memory usage is recorded in Toil job stats.""" + from toil.cwl import cwltoil + + with get_data("test/cwl/waste_cpu_memory.cwl") as cwl_file: + with get_data("test/cwl/empty.json") as inputs_file: + job_store = tmp_path / "jobStore" + main_args = [ + "--outdir", + str(tmp_path / "output_dir"), + str(cwl_file), + str(inputs_file), + "--jobStore", + str(job_store), + "--stats", + ] + cwltoil.main(main_args) + + resumed_job_store = Toil.resumeJobStore(str(job_store)) + stats = get_stats(resumed_job_store) + collated_stats = process_data(resumed_job_store.config, stats) + + # The tool burns ~30s of CPU and ~1 GiB of RAM inside Docker. + # Per-job stats use KiB for memory and include injected container usage. + assert collated_stats.jobs.total_clock >= 30 + assert collated_stats.jobs.max_memory >= 1024 * 1024 + @needs_docker @pytest.mark.docker @pytest.mark.online @@ -2248,6 +2281,84 @@ def test_download_structure(tmp_path: Path) -> None: ) +@needs_cwl +@pytest.mark.cwl +@pytest.mark.cwl_small +def test_cwl_resource_message_parsing_records_cpu_and_memory( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """ + Resource monitor messages emitted from a container are translated into + Toil's extra CPU and memory accounting. + """ + from toil.lib import interpreter + + message_file = tmp_path / "resources.tsv" + # Include malformed and partial lines to exercise parser robustness. + message_file.write_text( + "CPU\t1000000\n" + "Memory\t1024\n" + "CPU\t4000000\n" + "Memory\t2048\n" + "Odd\tline\n" + "CPU\t5000000", + encoding="utf-8", + ) + + recorded_memory_ki: list[int] = [] + recorded_cpu_seconds: list[float] = [] + monkeypatch.setattr( + interpreter.ResourceMonitor, + "record_extra_memory", + lambda peak_ki: recorded_memory_ki.append(peak_ki), + ) + monkeypatch.setattr( + interpreter.ResourceMonitor, + "record_extra_cpu", + lambda seconds: recorded_cpu_seconds.append(seconds), + ) + + interpreter.handle_message_file(str(message_file)) + + assert recorded_memory_ki == [2] + assert recorded_cpu_seconds == [3.0] + + +@needs_cwl +@pytest.mark.cwl +@pytest.mark.cwl_small +def test_cwl_job_injection_wraps_container_command( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """ + Container jobs are wrapped with injected runtime monitoring code. + """ + from toil.cwl import cwltoil + + host_input = tmp_path / "input.txt" + host_input.write_text("hello", encoding="utf-8") + + class FakeMapperEntry: + type = "File" + + def __init__(self, resolved: str, target: str) -> None: + self.resolved = resolved + self.target = target + + class FakePathMapper: + def __init__(self, entry: FakeMapperEntry) -> None: + self._entry = entry + + def files(self) -> list[str]: + return ["file://fake-input"] + + def mapper(self, _location: str) -> FakeMapperEntry: + return self._entry + + job = object.__new__(cwltoil.ToilDockerCommandLineJob) + job.command_line = ["echo", "hello"] + job.pathmapper = FakePathMapper(FakeMapperEntry(str(host_input), "/work/input.txt")) + @needs_cwl @pytest.mark.cwl @pytest.mark.timeout(300) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 60a2ad9f0a..46e27a502b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -130,6 +130,11 @@ from toil.lib.trs import resolve_workflow from toil.lib.url import URLAccess from toil.provisioners.clusterScaler import JobTooBigError +from toil.lib.interpreter import ( + INJECTED_MESSAGE_DIR, + add_injections, + handle_message_file, +) logger = logging.getLogger(__name__) @@ -3866,98 +3871,6 @@ def __init__( self._cache_key = cache_key self._mount_spec = mount_spec - ### - # Runtime code injection system - ### - - # WDL runtime code injected in the container communicates back to the rest - # of the runtime through files in this directory. - INJECTED_MESSAGE_DIR = ".toil_wdl_runtime" - - def add_injections(self, command_string: str, task_container: TaskContainer) -> str: - """ - Inject extra Bash code from the Toil WDL runtime into the command for the container. - - Currently doesn't implement the MiniWDL plugin system, but does add - resource usage monitoring to Docker containers. - """ - - parts = [] - - if isinstance(task_container, SwarmContainer): - # We're running on Docker Swarm, so we need to monitor CPU usage - # and so on from inside the container, since it won't be attributed - # to Toil child processes in the leader's self-monitoring. - # TODO: Mount this from a file Toil installs instead or something. - script = textwrap.dedent( - """\ - function _toil_resource_monitor () { - # Turn off error checking and echo in here - set +ex - MESSAGE_DIR="${1}" - mkdir -p "${MESSAGE_DIR}" - - function sample_cpu_usec() { - if [[ -f /sys/fs/cgroup/cpu.stat ]] ; then - awk '{ if ($1 == "usage_usec") {print $2} }' /sys/fs/cgroup/cpu.stat - elif [[ -f /sys/fs/cgroup/cpuacct/cpuacct.stat ]] ; then - echo $(( $(head -n 1 /sys/fs/cgroup/cpuacct/cpuacct.stat | cut -f2 -d' ') * 10000 )) - fi - } - - function sample_memory_bytes() { - if [[ -f /sys/fs/cgroup/memory.stat ]] ; then - awk '{ if ($1 == "anon") { print $2 } }' /sys/fs/cgroup/memory.stat - elif [[ -f /sys/fs/cgroup/memory/memory.stat ]] ; then - awk '{ if ($1 == "total_rss") { print $2 } }' /sys/fs/cgroup/memory/memory.stat - fi - } - - while true ; do - printf "CPU\\t" >> ${MESSAGE_DIR}/resources.tsv - sample_cpu_usec >> ${MESSAGE_DIR}/resources.tsv - printf "Memory\\t" >> ${MESSAGE_DIR}/resources.tsv - sample_memory_bytes >> ${MESSAGE_DIR}/resources.tsv - sleep 1 - done - } - """ - ) - parts.append(script) - # Launch in a subshell so that it doesn't interfere with Bash "wait" in the main shell - parts.append(f"(_toil_resource_monitor {self.INJECTED_MESSAGE_DIR} &)") - - if isinstance(task_container, SwarmContainer) and platform.system() == "Darwin": - # With gRPC FUSE file sharing, files immediately downloaded before - # being mounted may appear as size 0 in the container due to a race - # condition. Check for this and produce an approperiate error. - - script = textwrap.dedent( - """\ - function _toil_check_size () { - TARGET_FILE="${1}" - GOT_SIZE="$(stat -c %s "${TARGET_FILE}")" - EXPECTED_SIZE="${2}" - if [[ "${GOT_SIZE}" != "${EXPECTED_SIZE}" ]] ; then - echo >&2 "Toil Error:" - echo >&2 "File size visible in container for ${TARGET_FILE} is size ${GOT_SIZE} but should be size ${EXPECTED_SIZE}" - echo >&2 "Are you using gRPC FUSE file sharing in Docker Desktop?" - echo >&2 "It doesn't work: see ." - exit 1 - fi - } - """ - ) - parts.append(script) - for host_path, job_path in task_container.input_path_map.items(): - expected_size = os.path.getsize(host_path) - if expected_size != 0: - parts.append(f'_toil_check_size "{job_path}" {expected_size}') - - parts.append(command_string) - - return "\n".join(parts) - def handle_injection_messages( self, outputs_library: ToilWDLStdLibTaskOutputs ) -> None: @@ -3966,60 +3879,11 @@ def handle_injection_messages( """ message_files = outputs_library._glob( - WDL.Value.String(os.path.join(self.INJECTED_MESSAGE_DIR, "*")) + WDL.Value.String(os.path.join(INJECTED_MESSAGE_DIR, "*")) ) logger.debug("Handling message files: %s", message_files) for message_file in message_files.value: - self.handle_message_file(message_file.value) - - def handle_message_file(self, file_path: str) -> None: - """ - Handle a message file received from in-container injected code. - - Takes the host-side path of the file. - """ - if os.path.basename(file_path) == "resources.tsv": - # This is a TSV of resource usage info. - first_cpu_usec: int | None = None - last_cpu_usec: int | None = None - max_memory_bytes: int | None = None - - for line in open(file_path): - if not line.endswith("\n"): - # Skip partial lines - continue - # For each full line we got - parts = line.strip().split("\t") - if len(parts) != 2: - # Skip odd-shaped lines - continue - if parts[0] == "CPU": - # Parse CPU usage - cpu_usec = int(parts[1]) - # Update summary stats - if first_cpu_usec is None: - first_cpu_usec = cpu_usec - last_cpu_usec = cpu_usec - elif parts[0] == "Memory": - # Parse memory usage - memory_bytes = int(parts[1]) - # Update summary stats - if max_memory_bytes is None or max_memory_bytes < memory_bytes: - max_memory_bytes = memory_bytes - - if max_memory_bytes is not None: - logger.info( - "Container used at about %s bytes of memory at peak", - max_memory_bytes, - ) - # Treat it as if used by a child process - ResourceMonitor.record_extra_memory(max_memory_bytes // 1024) - if last_cpu_usec is not None: - assert first_cpu_usec is not None - cpu_seconds = (last_cpu_usec - first_cpu_usec) / 1000000 - logger.info("Container used about %s seconds of CPU time", cpu_seconds) - # Treat it as if used by a child process - ResourceMonitor.record_extra_cpu(cpu_seconds) + handle_message_file(message_file.value) ### # Helper functions to work out what containers runtime we can use @@ -4496,8 +4360,11 @@ def get_path_in_container(inode: AnyINode) -> AnyINode | None: .value ) - # Do any command injection we might need to do - command_string = self.add_injections(command_string, task_container) + # Do command injection for docker container resource monitoring + # when a docker container is used + if isinstance(task_container, SwarmContainer): + file_mounts = task_container.input_path_map.items() + command_string = add_injections(command_string, file_mounts) # Grab the standard out and error paths. MyPy complains if we call # them because in the current MiniWDL version they are untyped.