[feat] Decoupled Inference and Reward Computation via Reward Queue#113
[feat] Decoupled Inference and Reward Computation via Reward Queue#113Silypie wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the 'Reward Queue' feature to decouple inference from reward computation in VERL's fully asynchronous training pipeline, adding several components including a custom rollouter, trainer, agent loop worker, and sample aggregator. The code review identified several critical issues, including potential AttributeError crashes due to uninitialized managers or clients, underestimation of staleness samples by ignoring the reward queue, and a severe concurrency bottleneck caused by holding a lock during an asynchronous wait. Additionally, the feedback highlights potential device mismatches on GPU, deprecated asyncio.wait usage, and several potential KeyError or IndexError exceptions when handling empty collections or missing keys.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| self.max_concurrent_samples = len(self.llm_server_manager.get_replicas()) * 16 | ||
| self.max_concurrent_samples = min(self.max_concurrent_samples, self.max_required_samples) |
There was a problem hiding this comment.
At the time set_max_required_samples is called in main.py, self.llm_server_manager has not yet been initialized (it is created asynchronously in _init_async_rollout_manager which is called during fit). Accessing self.llm_server_manager.get_replicas() here will raise an AttributeError and crash the application on startup. We should guard this call and defer the computation of max_concurrent_samples until llm_server_manager is actually initialized.
| self.max_concurrent_samples = len(self.llm_server_manager.get_replicas()) * 16 | |
| self.max_concurrent_samples = min(self.max_concurrent_samples, self.max_required_samples) | |
| if hasattr(self, "llm_server_manager") and self.llm_server_manager is not None: | |
| self.max_concurrent_samples = len(self.llm_server_manager.get_replicas()) * 16 | |
| self.max_concurrent_samples = min(self.max_concurrent_samples, self.max_required_samples) | |
| else: | |
| self.max_concurrent_samples = self.max_required_samples |
| self._resume_event.set() | ||
| self._scoring_resume_event.set() | ||
| # every time param change, reset staleness_samples | ||
| self.staleness_samples = len(self.active_tasks) + await self.message_queue_client.get_queue_size() |
There was a problem hiding this comment.
When enable_reward_queue is enabled, the staleness calculation completely ignores the samples currently in the reward queue and those currently being aggregated. This leads to an underestimation of staleness_samples, which can violate the PPO staleness threshold and cause training instability. We should include the in-flight samples from the reward queue and aggregator in the staleness count.
in_flight_samples = len(self.active_tasks)
if self.enable_reward_queue and self.reward_queue_client:
rq_size = await self.reward_queue_client.get_queue_size()
in_flight_samples += int(rq_size / (self.rollout_n or 1))
in_flight_samples += self.sample_aggregator.pending_groups_count
self.staleness_samples = in_flight_samples + await self.message_queue_client.get_queue_size()| while len(self.active_tasks) >= self.max_concurrent_samples: | ||
| async with self.lock: | ||
| if self.active_tasks: | ||
| done_tasks, self.active_tasks = await asyncio.wait( | ||
| self.active_tasks, return_when=asyncio.FIRST_COMPLETED | ||
| ) | ||
| for task in done_tasks: | ||
| await task | ||
|
|
There was a problem hiding this comment.
Holding self.lock while awaiting asyncio.wait is a highly problematic practice in asyncio. Since asyncio.wait blocks until a long-running generation task completes, holding the lock during this time will block other critical operations (such as reset_staleness called by the trainer) from acquiring the lock, causing a severe performance bottleneck or potential hangs. We should await the tasks outside the lock, and let the task completion callbacks handle cleanup naturally.
while len(self.active_tasks) >= self.max_concurrent_samples:
await asyncio.wait(self.active_tasks, return_when=asyncio.FIRST_COMPLETED)| rm_scores = torch.zeros_like(response_mask, dtype=torch.float32) | ||
| rm_scores[torch.arange(response_mask.size(0)), valid_response_length] = torch.tensor( | ||
| scores, dtype=torch.float32 | ||
| ) |
There was a problem hiding this comment.
If response_mask is on a GPU device, torch.arange and torch.tensor (which default to CPU) will cause a device mismatch error or slow CPU-GPU synchronization when indexing and assigning values. We should explicitly specify the device of the source tensors to match response_mask.device.
| rm_scores = torch.zeros_like(response_mask, dtype=torch.float32) | |
| rm_scores[torch.arange(response_mask.size(0)), valid_response_length] = torch.tensor( | |
| scores, dtype=torch.float32 | |
| ) | |
| device = response_mask.device | |
| rm_scores = torch.zeros_like(response_mask, dtype=torch.float32, device=device) | |
| rm_scores[torch.arange(response_mask.size(0), device=device), valid_response_length] = torch.tensor( | |
| scores, dtype=torch.float32, device=device | |
| ) |
| done, pending = await asyncio.wait( | ||
| tasks_to_wait, return_when=asyncio.FIRST_COMPLETED | ||
| ) |
There was a problem hiding this comment.
Passing a list to asyncio.wait is deprecated since Python 3.8 and can raise a TypeError or deprecation warnings in newer Python versions. We should convert tasks_to_wait to a set before passing it to asyncio.wait.
| done, pending = await asyncio.wait( | |
| tasks_to_wait, return_when=asyncio.FIRST_COMPLETED | |
| ) | |
| done, pending = await asyncio.wait( | |
| set(tasks_to_wait), return_when=asyncio.FIRST_COMPLETED | |
| ) |
| self._scoring_resume_event.set() | ||
|
|
||
| async def _should_pause_reward_queue(self) -> bool: | ||
| reward_queue_stats = await self.reward_queue_client.get_statistics() |
There was a problem hiding this comment.
If self.reward_queue_client is not yet initialized or is None during startup, calling get_statistics() directly will raise an AttributeError. We should add a safety check to return False if the client is not available.
| reward_queue_stats = await self.reward_queue_client.get_statistics() | |
| if not self.reward_queue_client: | |
| return False | |
| reward_queue_stats = await self.reward_queue_client.get_statistics() |
|
|
||
| def addition_process(output: DataProto, enable_reward_queue: bool = False): | ||
| """collect metirics""" | ||
| metrics = output.meta_info.pop("metrics") # List[Dict[str, str]] |
There was a problem hiding this comment.
Using pop("metrics") without a default value will raise a KeyError if the "metrics" key is missing from output.meta_info (e.g., during validation or custom evaluation runs). We should use pop("metrics", None) to handle this gracefully.
| metrics = output.meta_info.pop("metrics") # List[Dict[str, str]] | |
| metrics = output.meta_info.pop("metrics", None) # List[Dict[str, str]] | |
| if metrics is None: | |
| return output |
| elif arr.ndim == 2: | ||
| for i in range(batch_size): | ||
| new_arr[i] = arr[i] | ||
| else: | ||
| for i in range(batch_size): | ||
| new_arr[i] = arr[i] |
| index_val = batch.non_tensor_batch["index"] | ||
| index = [index_val[0] if isinstance(index_val, (list, np.ndarray)) else index_val] |
There was a problem hiding this comment.
If index_val is an empty list or numpy array, accessing index_val[0] will raise an IndexError. We should check the length of the list/array before accessing its first element.
| index_val = batch.non_tensor_batch["index"] | |
| index = [index_val[0] if isinstance(index_val, (list, np.ndarray)) else index_val] | |
| index_val = batch.non_tensor_batch["index"] | |
| if isinstance(index_val, (list, np.ndarray)): | |
| index = [index_val[0]] if len(index_val) > 0 else [0] | |
| else: | |
| index = [index_val] |
| batch.meta_info.get("global_steps", -1), index, batch.meta_info.get("validate", False) | ||
| ) | ||
|
|
||
| kwargs = {k: v[0] for k, v in batch.non_tensor_batch.items()} |
There was a problem hiding this comment.
If any value v in batch.non_tensor_batch is an empty list or array, accessing v[0] will raise an IndexError. We should add a safety check to handle empty lists/arrays or non-iterable values.
| kwargs = {k: v[0] for k, v in batch.non_tensor_batch.items()} | |
| kwargs = {k: (v[0] if isinstance(v, (list, np.ndarray)) and len(v) > 0 else v) for k, v in batch.non_tensor_batch.items()} |
Motivation
In the fully asynchronous training pipeline of VERL (
verl.experimental.fully_async_policy), inference (generation) and reward computation are traditionally tightly coupled in a sequential pipeline:Therefore, decoupling Inference and Reward allows their time consumption to mask each other, ideally reducing the total time to half of the original.
This coupling creates a critical performance bottleneck: when reward computation is slow (e.g., due to external LLM-based judges, complex scoring functions, or network latency), the GPU sits idle waiting for scores to be ready, wasting expensive compute resources.
The Reward Queue feature decouples inference from reward computation by introducing an intermediate queue between the two stages, enabling concurrent execution:
This allows generation and scoring to overlap in time, maximizing GPU utilization and throughput.
Proposed Design
Design Overview
This architecture introduces a RewardQueue as a central decoupling mechanism between inference and reward computation, enabling fully asynchronous processing across the reinforcement learning pipeline.
Core Pipeline
Sampling & Inference — The rollouter continuously feeds batches from the DataLoader. For each batch, sub-items are dispatched for async generation and immediately buffered into the RewardQueue without waiting for reward results.
Reward Computation — A dedicated consumer worker pulls sub-items from the RewardQueue and distributes them across a pool of reward workers for concurrent scoring. Backpressure is applied when needed to prevent resource exhaustion.
Aggregation & Training — Scored sub-items flow back to the aggregator, which assembles complete samples once all sub-items arrive. These are then published to a MessageQueue for the trainer to consume and process.
Key Design Points
- Temporal decoupling: Inference output and reward computation run at their own pace via the queue buffer
- Concurrent scoring: Multiple reward workers score sub-items in parallel, throttled by a concurrency limit
- Backpressure control: The consumer can pause/resume scoring based on system load
- Clean handoff boundary: MessageQueue separates rollouter and trainer execution domains
This design eliminates the traditional blocking pattern where inference waits for reward computation to complete, significantly improving pipeline throughput.
Data Flow
Phase 1: Sample Feeding
FullyAsyncRollouter._feed_samples()iterates over the DataLoaderRolloutSamplefor each batch and puts intopending_queuePhase 2: Inference and Queue Production
_processor_worker()processes samples frompending_queueenable_reward_queue=True, calls_process_sample_with_reward_queue():generate_single_for_reward_queue()SubRewardDataItemwith inference timing metadataRewardQueueviareward_queue_client.put_sample()Phase 3: Reward Computation (Consumer)
_reward_consumer_worker()continuously:_should_pause_scoring())SubRewardDataItemfromreward_queue_client.get_sample()reward_loop_worker.compute_score.remote()max_concurrent_rewardsPhase 4: Aggregation and Finalization
SampleAggregator.add_scored_item()accumulates scored sub-items_finalize_sample():rm_scorestensor with scores at the last valid positionRolloutSampleand puts intoMessageQueue(for Trainer)Phase 5: Training
FullyAsyncTrainer._get_samples_from_queue()retrieves samplesassemble_batch_from_rollout_samples()withenable_reward_queue=TrueKey Data Structures
RewardQueue(reuse MessageQueue)
SampleAggregatorConfiguration
Where:
max_required_samples = ppo_mini_batch_size * require_batches * (1 + staleness_threshold) * trigger_parameter_sync_steprollout_n = actor_rollout_ref.rollout.n(number of responses per prompt)Monitoring Metrics
The reward queue exports the following metrics:
monitor/queue/reward_queue_sizereward_queue/total_producedreward_queue/total_consumedreward_queue/dropped_samplesstatic/max_reward_queue_sizetiming_s/reward_compute/meantiming_s/reward_compute/maxtiming_s/reward_compute/tp95Use Cases
External LLM Judges: When reward computation involves calling external LLM APIs (e.g., for LLM-as-a-Judge scoring), network latency can be significant. Reward queue allows generation to continue while waiting for API responses.
Complex Scoring Functions: Multi-step reward computation pipelines with multiple model calls benefit from overlapping generation with scoring.
Variable Reward Latency: When reward computation time varies significantly across samples, the queue buffers fast results while waiting for slow ones.
Throughput Optimization: Maximizing GPU utilization by keeping either generation or scoring always active, even when the other is blocked.