Feat/add agentic training example#83
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a RemoteAgentLoop and an associated LLM proxy server to enable PPO training with remote third-party agents. The implementation includes a standalone proxy mode using WebSockets and a Ray-based actor mode, along with an SDK for submitting agent runs to a Harbor server. Feedback focuses on improving robustness and efficiency, specifically regarding error handling for environment variable parsing, optimizing HTTP session management, fixing potential crashes in the agent loop logic, and leveraging modern Ray features for asynchronous operations.
| if v := os.getenv("REMOTE_MODEL_NAME"): | ||
| kwargs["model_name"] = v | ||
| if v := os.getenv("REMOTE_AGENT_KWARGS"): | ||
| kwargs["agent_kwargs"] = json.loads(v) |
There was a problem hiding this comment.
json.loads() is called directly on environment variables without error handling. If REMOTE_AGENT_KWARGS (or other JSON-based env vars on lines 84 and 86) contains invalid JSON, the application will crash with a JSONDecodeError during configuration loading. It is safer to wrap these calls in a try-except block and provide a descriptive error message.
| async def _register_session(self, proxy_url: str, trial_id: str) -> None: | ||
| """POST /sessions/{trial_id} to register a new session.""" | ||
| async with aiohttp.ClientSession() as session: | ||
| async with session.post(f"{proxy_url}/sessions/{trial_id}") as resp: | ||
| resp.raise_for_status() | ||
|
|
||
| async def _get_session_data(self, proxy_url: str, trial_id: str) -> SessionRecord | None: | ||
| """GET /sessions/{trial_id} to retrieve recorded session data.""" | ||
| async with aiohttp.ClientSession() as session: | ||
| async with session.get(f"{proxy_url}/sessions/{trial_id}") as resp: | ||
| if resp.status == 404: | ||
| return None | ||
| resp.raise_for_status() | ||
| data = await resp.json() | ||
| return SessionRecord(**data) | ||
|
|
||
| async def _complete_session(self, proxy_url: str, trial_id: str) -> None: | ||
| """POST /sessions/{trial_id}/complete to mark session completed.""" | ||
| async with aiohttp.ClientSession() as session: | ||
| async with session.post(f"{proxy_url}/sessions/{trial_id}/complete") as resp: | ||
| resp.raise_for_status() | ||
|
|
||
| async def _delete_session(self, proxy_url: str, trial_id: str) -> None: | ||
| """DELETE /sessions/{trial_id} to remove session data.""" | ||
| async with aiohttp.ClientSession() as session: | ||
| async with session.delete(f"{proxy_url}/sessions/{trial_id}") as resp: | ||
| resp.raise_for_status() |
There was a problem hiding this comment.
Creating a new aiohttp.ClientSession for every helper method call (e.g., _register_session, _get_session_data, etc.) is inefficient. aiohttp sessions are designed to be reused to take advantage of connection pooling and reduce overhead. Consider creating a single session instance in the __init__ method and reusing it throughout the lifecycle of the RemoteAgentLoop instance.
| logprobs captured by the proxy. | ||
| """ | ||
| import shortuuid | ||
| messages = list(kwargs["raw_prompt"]) |
There was a problem hiding this comment.
The messages variable is initialized here but is never used in the rest of the run method. Additionally, accessing kwargs["raw_prompt"] directly will raise a KeyError if the key is missing. Since the reconstruction logic later uses session.turns[0].request_messages, this line appears to be redundant and potentially unsafe.
| "LOCAL_IP is not set, falling back to 0.0.0.0. " | ||
| "The remote agent may not be able to reach the proxy." | ||
| ) | ||
| agent_base_url = f"http://{local_ip}:{proxy_port}/{trial_id}/v1" |
There was a problem hiding this comment.
urlparse(proxy_url).port returns None if the port is not explicitly specified in the URL (e.g., http://proxy-server). This results in an invalid agent_base_url like http://10.0.30.11:None/.... You should handle the case where the port is missing by defaulting to the standard port for the scheme (80 for http, 443 for https).
| agent_base_url = f"http://{local_ip}:{proxy_port}/{trial_id}/v1" | |
| proxy_port = parsed.port or (443 if parsed.scheme == "https" else 80) |
| ) | ||
| agent_base_url = f"http://{local_ip}:{proxy_port}/{trial_id}/v1" | ||
|
|
||
| agent_kwargs = dict(self.remote_agent_kwargs) |
There was a problem hiding this comment.
dict(self.remote_agent_kwargs) creates a shallow copy. If the agent configuration contains nested dictionaries or lists, they will be shared across all rollout executions. Mutations to these nested structures (e.g., by the agent SDK) could lead to subtle bugs or race conditions. Use copy.deepcopy to ensure each run has a completely isolated set of arguments.
|
|
||
| import ray | ||
|
|
||
| output = await asyncio.to_thread(ray.get, ref) |
There was a problem hiding this comment.
In modern Ray versions, ObjectRef is awaitable. Instead of using asyncio.to_thread(ray.get, ref), which blocks a thread from the pool, you can directly await ref. This is more efficient and follows the recommended pattern for asynchronous Ray applications.
| output = await asyncio.to_thread(ray.get, ref) | |
| output = await ref |
| if not log_probs: | ||
| return None | ||
| content = [] | ||
| for tid, lp in zip(token_ids, log_probs): |
There was a problem hiding this comment.
Signed-off-by: KunWuLuan <30817980+KunWuLuan@users.noreply.github.com>
Signed-off-by: KunWuLuan <30817980+KunWuLuan@users.noreply.github.com>
Signed-off-by: KunWuLuan <30817980+KunWuLuan@users.noreply.github.com>
60a24c7 to
13ccb14
Compare
|
@wuxibin89 Hi, would like to hear your thoughts, thank you! |
- Promote REMOTE_AGENT_* / LOCAL_IP knobs into yaml; inject via runtime_env.env_vars with os.environ.setdefault to keep CLI > shell env > yaml precedence. - Rename proxy_server.local_ip -> llm_proxy_ip (LOCAL_IP -> LLM_PROXY_IP) to match its real semantic (LLM proxy's externally reachable IP). - Add recipe/agentic/dataset/local_harbor.py: scan local Harbor task dirs (task.toml + instruction.md) into a cached verl parquet; driver overrides data.train_files / data.val_files when data.train_harbor_dir / val_harbor_dir is set. - Rows emit top-level instance_id / local_task_path; RemoteAgentLoop resolves task_path as local_task_path -> <harbor_root>/<instance_id> -> legacy task_path_template. - Add README, rewrite agentic-qwen2.5-3b.sh to pure hydra overrides, drop obsolete mcp-tools.sh. Assisted-by: Qoder Signed-off-by: kunwuluan@gmail.com
4f15d7c to
e510cca
Compare
Summary
Implement
RemoteAgentLoopfor VeRL, enabling RL training of external agents (e.g., SWE-agent, Claude Code) running in separate environments. The proxy server captures all LLMtraffic via an HTTP proxy, recording token_ids and logprobs for PPO updates.
Closes #5737
Commits
177581fRemoteAgentLoopand Ray-based proxy374c1d460a24c7Architecture
Key Components
RemoteAgentLoop
Extends
AgentLoopBaseto:trial_idfor each rollout trajectoryPROXY_SERVER_URL)AgentLoopOutputfrom recorded session data (token_ids, logprobs)LLMProxyServer
Supports two operating modes:
InferenceWorkerClient
WebSocket client that:
/ws/workerendpointFiles Changed
agentic/__init__.pyagentic/agent_loop/__init__.pyremote_agentloopagentic/agent_loop/config.pyRemoteAgentConfigwith env parsingagentic/agent_loop/remote_agent_loop.pyRemoteAgentLoopimplementationagentic/agentic-qwen2.5-3b.shagentic/agentic_main.pyagentic/config/agentic_trainer.yamlagentic/mcp-tools.shagentic/swe-agent.yamlagentic/proxyserver/__init__.pyagentic/proxyserver/models.pyagentic/proxyserver/recorder.pyagentic/proxyserver/server.pyagentic/proxyserver/vllm_provider.pyagentic/proxyserver/proxy_server.pyagentic/proxyserver/ray_actor.pyagentic/proxyserver/relay.pyagentic/proxyserver/worker_client.pyagentic/proxyserver/Dockerfileagentic/proxyserver/deploy.yamlagentic/serversdk/__init__.pyagentic/serversdk/client.pyagentic/serversdk/models.pyUsage
Ray actor mode (default):