Skip to content

feat(recipe): add Dynamo as rollout backend#110

Draft
sophiayyya wants to merge 16 commits into
verl-project:mainfrom
sophiayyya:sopy/dynamo
Draft

feat(recipe): add Dynamo as rollout backend#110
sophiayyya wants to merge 16 commits into
verl-project:mainfrom
sophiayyya:sopy/dynamo

Conversation

@sophiayyya

@sophiayyya sophiayyya commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Author: @sophiayyya @hou2lin

Summary

Add a recipe-side Dynamo rollout backend for verl training.

This PR adds a recipe.dynamo integration that registers a Dynamo rollout replica, launches Dynamo subprocesses for async generation,
routes generation through Dynamo frontend, and supports rollout weight updates from the trainer side. Validated using retool recipe.

Changes

  • Add recipe.dynamo.main_dynamo entry point and Hydra config for actor_rollout_ref.rollout.name=dynamo.
  • Add DynamoReplica / DynamoHttpServer implementation to launch and manage etcd, NATS, Dynamo vLLM workers, and Dynamo frontend.
  • Add Dynamo rollout adapter for control RPCs, sleep/wake, KV cache clear, global step propagation, and bucketed weight update.
  • Add Dynamo vLLM worker extension so trainer-side and engine-side IPC/ZMQ handles agree in the multi-DP-per-node topology.
  • Add Dynamo-specific agent loop/server manager plumbing.

Validation

sophiayyya and others added 14 commits April 28, 2026 23:58
First-commit of the recipe-side dynamo rollout backend. m1 validates
the registry path; m2 swaps in vLLM thin subclasses (real Dynamo
runtime injection deferred to m3+).

Backend registration is statically wired into verl proper
(verl/workers/rollout/base.py + replica.py) — no runtime monkey-patch
or worker_process_setup_hook is needed, so this package has no
module-level side effects.

E2E smoke (Qwen2.5-0.5B, 3 steps, 8 GPUs, 1 node):
  sbatch recipe/dynamo/slurm_dynamo_e2e.sh   # job 5170154, rc=0
  Training Progress: 100%|██████████| 3/3

Co-authored-by: Claude
m3 (gated by VERL_DYNAMO_ENABLE_RUNTIME=1): after vLLM AsyncLLM is up,
DynamoHttpServer additionally:
  - create_runtime(discovery_backend="file") same-process (validated
    in scripts/smoke_runtime_in_ray_actor.py — dynamo Rust runtime
    coexists with the Ray actor's asyncio loop)
  - register_model(ModelInput.Tokens, Chat | Completions, ...) so
    KV router has a properly typed worker
  - KvEventPublisher tap on vLLM's KV cache events when prefix
    caching + kv_events are configured (else router falls back to
    approximate routing)

m4 (gated by VERL_DYNAMO_ENABLE_FRONTEND=1, implies m3): all replicas
register their own worker_{r}_{n}/generate endpoint so the Frontend's
KV router has the full set of workers to balance across. The master
replica then spawns `python -m dynamo.frontend --router-mode kv` and
swaps its own _server_port to the Frontend port; non-master replicas
poll the master Ray actor (is_dynamo_frontend_ready / get_server_address)
and rebind their (server_address, server_port) to the master Frontend.
Net effect: every per-replica server_handle.get_server_address returned
to verl resolves to the same Frontend HTTP endpoint -> all generate
traffic goes through the KV router. Weight transfer stays on the m2
CUDA-IPC + ZMQ path, no degradation.

Setup is wrapped in try/except in run_server: any m3/m4 failure logs a
warning and falls back to the vLLM-native generate path so m2 contract
holds even when dynamo SDK is missing or misbehaves. The two env flags
are opt-in so existing m2 deployments on vllm017 (no dynamo SDK) keep
working unchanged.

run_dynamo_e2e_smoke.sh: enable use_dynamic_bsz on actor + ref +
rollout log_prob (matches existing log_prob_max_token_len_per_gpu) so
config validation passes. Add TP_SIZE env override to support
single-replica TP=N variants in the future without touching the
script body.

E2E verification (job 5180705):
  Training Progress: 100%|##########| 3/3   rc=0
  [m3] replica=0..7 registered model=...  (8 workers in file_kv tree)
  [m4] dynamo Frontend ready at <host>:<port>
  [m4] replica=1..7 redirected to master Frontend
  update_weights: 0.65s/step (vs m2 0.63s — within noise)

Co-authored-by: Claude
Old `/lustre/fsw/general_sa/sopyang/` workspace is gone — repoint
WORKSPACE/CONTAINER/SBATCH logs to
`/lustre/fsw/portfolios/coreai/users/sopyang/`, and in-container paths
from `/workspace/rl/verl_0211/...` to `/workspace/dynamorl_workspace/...`
(verl repo, slurm logs, dynamo logs, retool.py, sandbox_fusion_tool_config.yaml).

Also:
  * SBATCH --account: general_sa -> coreai_dlalgo_llm (general_sa no
    longer in user's slurm assoc list)
  * dataset paths: drop the non-existent retool_dataset/ subdir; point
    directly at /workspace/dynamorl_workspace/datasets/{DAPO-Math-17k,aime_2025}

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
batch partition rejects jobs without a GPU request; the workload is
2 nodes x 8 H100 so add `#SBATCH --gpus-per-node=8`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: OpenAI Codex <codex@openai.com>
Co-authored-by: OpenAI Codex <codex@openai.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the Dynamo rollout backend integration with verl, establishing a wrapper with a ZMQ control sidecar, a dedicated server manager, and a custom worker extension to handle node-global ranks. It also refactors the retool dataset processing to support AIME 2025 datasets and patches SandboxFusion to use system Python. The review feedback highlights a critical security vulnerability regarding insecure deserialization via pickle.loads in the ZMQ control protocol, recommending JSON or HMAC signatures. Additionally, the feedback suggests redirecting the output of etcd and nats-server subprocesses to dedicated log files, utilizing stable node-local port allocation for the control port to prevent EADDRINUSE collisions, and eliminating duplicate mapping logic in retool.py by reusing retool_dataset_utils.py.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

logger.exception("recv failed")
continue
try:
req = pickle.loads(raw)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Using pickle.loads to deserialize control requests received over a TCP network socket (VERL_DYNAMO_CONTROL_ZMQ) poses a high security risk of Remote Code Execution (RCE). Since the control payloads only contain simple metadata (method names, basic arguments, and timeouts), they can be safely serialized using json instead of pickle. Consider migrating the ZMQ control protocol to JSON, or sign the payloads using hmac with a shared secret key to verify the sender's authenticity before deserializing.

self._etcd_process = subprocess.Popen(cmd, env=env)
self._wait_for_etcd(_ETCD_READY_TIMEOUT_S)

def _wait_for_etcd(self, timeout: float):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The etcd process is launched without redirecting its stdout/stderr. If it produces a large volume of logs, it can clutter the main process's output. Consider redirecting its output to a dedicated log file in the log directory for better observability and debugging.

Suggested change
def _wait_for_etcd(self, timeout: float):
log_root = os.environ.get("VERL_DYNAMO_LOG_DIR", "/tmp")
log_path = os.path.join(log_root, f"verl_dynamo_replica{self.replica_rank}_etcd.log")
with open(log_path, "w") as f:
self._etcd_process = subprocess.Popen(cmd, env=env, stdout=f, stderr=subprocess.STDOUT)

Comment thread dynamo/dynamo_async_server.py Outdated
return
cmd = ["nats-server", "-p", str(self._nats_port)]
logger.info("[DynamoHttpServer] starting NATS: %s", " ".join(cmd))
self._nats_process = subprocess.Popen(cmd)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The nats-server process is launched without redirecting its stdout/stderr. Consider redirecting its output to a dedicated log file in the log directory to keep the main process logs clean and assist in troubleshooting.

Suggested change
self._nats_process = subprocess.Popen(cmd)
log_root = os.environ.get("VERL_DYNAMO_LOG_DIR", "/tmp")
log_path = os.path.join(log_root, f"verl_dynamo_replica{self.replica_rank}_nats.log")
with open(log_path, "w") as f:
self._nats_process = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT)


for spec_idx, spec in enumerate(worker_specs):
worker_cvd = spec.cuda_visible_devices
control_port = self._allocate_tcp_port(bind_wildcard=False)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using _allocate_tcp_port for control_port can lead to EADDRINUSE port collisions during concurrent startup of multiple replicas on the same physical node, as it lacks the replica-slot and node-slot segmentation used by _allocate_stable_node_port. Consider allocating control_port using _allocate_stable_node_port with a dedicated base port (e.g., 30000).

Suggested change
control_port = self._allocate_tcp_port(bind_wildcard=False)
control_port = self._allocate_stable_node_port(30000, spec_idx)

Comment thread retool/retool.py
Comment on lines 20 to 21
from verl.tools.base_tool import OpenAIFunctionToolSchema
from verl.tools.sandbox_fusion_tools import SandboxFusionTool

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The dataset mapping functions _map_fn_aime and _map_fn_default defined locally in this file duplicate the logic in retool/retool_dataset_utils.py. To maintain clean, DRY (Don't Repeat Yourself) code, consider keeping the import from retool_dataset_utils.py and updating the mapping functions there instead of duplicating them locally.

Co-authored-by: Codex <codex@openai.com>
@sophiayyya sophiayyya marked this pull request as draft June 20, 2026 12:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant