Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 101 additions & 2 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@
import pprint
import shutil
import stat
import subprocess
import sys
import textwrap
import uuid

# This is also in configargparse but MyPy doesn't know it
from argparse import RawDescriptionHelpFormatter
from collections.abc import Callable, Iterator, Mapping, MutableMapping, MutableSequence
from collections.abc import (
Callable,
Generator,
Iterator,
Mapping,
MutableMapping,
MutableSequence,
)
from tempfile import NamedTemporaryFile, TemporaryFile, gettempdir
from threading import Thread
from typing import IO, Any, Literal, Optional, Protocol, TextIO, TypeVar, Union, cast
Expand Down Expand Up @@ -67,6 +75,8 @@
fill_in_defaults,
shortname,
)
from cwltool.docker import DockerCommandLineJob, PodmanCommandLineJob
from cwltool.job import ContainerCommandLineJob
from cwltool.secrets import SecretStore
from cwltool.singularity import SingularityCommandLineJob
from cwltool.software_requirements import (
Expand Down Expand Up @@ -102,6 +112,12 @@
from toil.common import Config, Toil, addOptions
from toil.cwl import check_cwltool_version
from toil.lib.directory import DirectoryContents, decode_directory, encode_directory
from toil.lib.interpreter import (
add_injections,
command_line_to_shell_script,
handle_injection_messages_from_outdir,
shell_script_to_command_line,
)
from toil.lib.misc import call_command
from toil.lib.trs import resolve_workflow
from toil.provisioners.clusterScaler import JobTooBigError
Expand Down Expand Up @@ -1086,6 +1102,32 @@ def run_jobs(
return super().run_jobs(process, job_order_object, logger, runtime_context)


class ToilContainerCommandLineJob(ContainerCommandLineJob):
"""Container job that collects resource stats from injected in-container code."""

def _execute(
self,
runtime: list[str],
env: MutableMapping[str, str],
runtimeContext: cwltool.context.RuntimeContext,
monitor_function: Callable[["subprocess.Popen[str]"], None] | None = None,
) -> None:
super()._execute(runtime, env, runtimeContext, monitor_function)
handle_injection_messages_from_outdir(self.outdir)


class ToilDockerCommandLineJob(ToilContainerCommandLineJob, DockerCommandLineJob):
"""Docker container job with Toil runtime injection support."""


class ToilPodmanCommandLineJob(ToilContainerCommandLineJob, PodmanCommandLineJob):
"""Podman container job with Toil runtime injection support."""


class ToilSingularityCommandLineJob(ToilContainerCommandLineJob, SingularityCommandLineJob):
"""Singularity container job with Toil runtime injection support."""

Comment on lines +1105 to +1129

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach of hooking the ContainerCommandLineJob in addition to the CommandLineTool is a little awkward. We have to have all these extra classes for each container system, and we already don't have one for UDockerCommandLineJob and it's hard to tell whether that's because we don't need one. And the hook here only works in concert with the hook that ToilCommandLineTool applies, so we end up with one feature spread out over several classes.

Instead of hooking ContainerCommandLineJob._execute(), did you consider hooking CommandLineTool.collect_output_ports() instead? It looks like that has access to the outdir, and it gets sent into _execute() and called inside there after the container has run. And if we did it that way we could keep all the hook logic together inside ToilCommandLineTool and not need to worry as much about the container type.


class ToilTool:
"""Mixin to hook Toil into a cwltool tool type."""

Expand Down Expand Up @@ -1141,7 +1183,64 @@ def __str__(self) -> str:


class ToilCommandLineTool(ToilTool, cwltool.command_line_tool.CommandLineTool):
"""Subclass the cwltool command line tool to provide the custom ToilPathMapper."""
"""Subclass the cwltool command line tool to provide the custom ToilPathMapper
and add the monitoring code to the job's container command line."""

def make_job_runner(
self, runtimeContext: cwltool.context.RuntimeContext
) -> type[cwltool.job.JobBase]:
"""Use Toil container job classes that collect injected runtime messages."""
parent_class = super().make_job_runner(runtimeContext)
if parent_class is DockerCommandLineJob:
return ToilDockerCommandLineJob
if parent_class is PodmanCommandLineJob:
return ToilPodmanCommandLineJob
if parent_class is SingularityCommandLineJob:
return ToilSingularityCommandLineJob
return parent_class

def _uses_container(self, runtimeContext: cwltool.context.RuntimeContext) -> bool:
if not runtimeContext.use_container:
return False
docker_req, _ = self.get_requirement("DockerRequirement")
if docker_req is not None:
return True
if runtimeContext.find_default_container is not None:
return runtimeContext.find_default_container(self) is not None
return runtimeContext.default_container is not None

@staticmethod
def _file_mounts_from_pathmapper(
job: ContainerCommandLineJob,
) -> list[tuple[str, str]]:
Comment on lines +1213 to +1215

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a docstring to at least explain which item in the result tuples is which.

file_mounts: list[tuple[str, str]] = []
for location in job.pathmapper.files():
ent = job.pathmapper.mapper(location)
if ent.type == "File" and not ent.resolved.startswith("_:"):
file_mounts.append((ent.resolved, ent.target))
return file_mounts

def job(
self,
job_order: CWLObjectType,
output_callbacks: Callable[[CWLObjectType | None, str], None],
runtimeContext: cwltool.context.RuntimeContext,
) -> Generator[
cwltool.job.JobBase | cwltool.command_line_tool.CallbackJob, None, None
]:
"""
Override the job method to inject resource (cpu & memory) monitoring code into
the job's container command line.
"""
for job in super().job(job_order, output_callbacks, runtimeContext):
if isinstance(job, ContainerCommandLineJob) and self._uses_container(
runtimeContext
):
file_mounts = self._file_mounts_from_pathmapper(job)
script = command_line_to_shell_script(job.command_line)
script = add_injections(script, file_mounts)
job.command_line = shell_script_to_command_line(script)
Comment on lines +1235 to +1242

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to inject the monitoring logic even when we're using Singularity (or I think UDocker?) where we should already see CPU and memory usage in the stats because it happens under a child process of Toil? Will that lead to double-counting of CPU usage?

yield job

def _initialworkdir(
self, j: cwltool.job.JobBase | None, builder: cwltool.builder.Builder
Expand Down
56 changes: 42 additions & 14 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3304,20 +3304,8 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[
)
job_time = time.time() - startTime
job_cpu_time = totalCpuTime - startClock
allocated_cpu_time = job_time * self.cores

if job_cpu_time > allocated_cpu_time and allocated_cpu_time > 0:
# Too much CPU was used by this job! Maybe we're using a batch
# system that doesn't/can't sandbox us and we started too many
# threads. Complain to the user!
excess_factor = job_cpu_time / allocated_cpu_time
fileStore.log_to_leader(
f"Job {self.description} used {excess_factor:.2f}x more "
f"CPU than the requested {self.cores} cores. Consider "
f"increasing the job's required CPU cores or limiting the "
f"number of processes/threads launched.",
level=logging.WARNING,
)

self._check_cpu_usage(job_time, job_cpu_time, fileStore)

# Finish up the stats
if stats is not None:
Expand All @@ -3337,6 +3325,46 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[
succeeded=str(succeeded),
)
)


def _check_cpu_usage(
self,
job_time: float,
job_cpu_time: float,
fileStore: AbstractFileStore,
threshold_factor: float = 1.05,
) -> None:
"""
Log a warning when a job consumes more CPU time than its allocation.

If *job_time* is non-positive, returns immediately (wall-clock time may
lack resolution for very short jobs). If *cores* is non-positive, logs
a warning that overuse cannot be assessed and returns, avoiding
division by zero. Otherwise, compares *job_cpu_time* to
``job_time * cores * threshold_factor``.
"""
if job_time <= 0:
# The job may have run too quickly for wall-clock resolution.
return
if self.cores <= 0:
# Jobs should not have non-positive core requests, but avoid divide-by-zero.
fileStore.log_to_leader(
f"Job {self.description} has invalid requested cores value "
f"{self.cores}; cannot evaluate CPU overuse.",
level=logging.WARNING,
)
return
allocated_cpu_time = job_time * self.cores

excess_factor = job_cpu_time / allocated_cpu_time
if excess_factor > threshold_factor:
fileStore.log_to_leader(
f"Job {self.description} used {excess_factor:.2f}x more "
f"CPU than the requested {self.cores} cores. Consider "
f"increasing the job's required CPU cores or limiting the "
f"number of processes/threads launched.",
level=logging.WARNING,
)

def _runner(
self,
Expand Down
Loading