From 78c1bf70f98363a4cf61b7be3f9ba379a008afcd Mon Sep 17 00:00:00 2001 From: Kou Date: Fri, 20 Mar 2026 12:05:49 +0800 Subject: [PATCH 1/2] feat(retool): add local thread pool execution mode for tool calls --- retool/retool.py | 21 +++++++++++++++++---- retool/sandbox_fusion_tool_config.yaml | 1 + 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/retool/retool.py b/retool/retool.py index bd43ed43..2e19a2d3 100644 --- a/retool/retool.py +++ b/retool/retool.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import re from typing import Any @@ -20,6 +21,7 @@ from verl.tools.base_tool import OpenAIFunctionToolSchema from verl.tools.sandbox_fusion_tools import SandboxFusionTool +from verl.tools.schemas import ToolResponse from verl.utils.dataset import RLHFDataset from verl.utils.reward_score import math_dapo from verl.utils.rollout_trace import rollout_trace_op @@ -33,7 +35,7 @@ def __init__(self, config: dict, tool_schema: OpenAIFunctionToolSchema): self.code_pattern = re.compile(r"```python(.*?)```", re.DOTALL) @rollout_trace_op - async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[str, float, dict]: + async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[ToolResponse, float, dict]: code = parameters["code"] matches = self.code_pattern.findall(code) if matches: @@ -54,9 +56,20 @@ async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) if not isinstance(code, str): code = str(code) - result = await self.execution_pool.execute.remote(self.execute_code, instance_id, code, timeout, language) - # sandbox has no score or metrics, use Nones - return result, None, None + if self.use_ray_execution_pool: + result = await self.execution_pool.execute.remote(self.execute_code, instance_id, code, timeout, language) + else: + self._thread_semaphore.acquire() + try: + result = await asyncio.get_running_loop().run_in_executor( + None, self.execute_code, instance_id, code, timeout, language + ) + finally: + self._thread_semaphore.release() + + if isinstance(result, ToolResponse): + return result, None, None + return ToolResponse(text=None if result is None else str(result)), None, None class CustomRLHFDataset(RLHFDataset): diff --git a/retool/sandbox_fusion_tool_config.yaml b/retool/sandbox_fusion_tool_config.yaml index 71b10e50..36ed5c06 100644 --- a/retool/sandbox_fusion_tool_config.yaml +++ b/retool/sandbox_fusion_tool_config.yaml @@ -8,6 +8,7 @@ tools: default_timeout: 30 default_language: "python" memory_limit_mb: 1024 + use_ray_execution_pool: false type: native tool_schema: From c4365f72c98fb6c1381a599d583ffa3180052d6f Mon Sep 17 00:00:00 2001 From: Kou Date: Fri, 20 Mar 2026 14:26:21 +0800 Subject: [PATCH 2/2] fix: use asyncio.Semaphore to avoid blocking event loop --- retool/retool.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/retool/retool.py b/retool/retool.py index 2e19a2d3..9f733baa 100644 --- a/retool/retool.py +++ b/retool/retool.py @@ -59,13 +59,10 @@ async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) if self.use_ray_execution_pool: result = await self.execution_pool.execute.remote(self.execute_code, instance_id, code, timeout, language) else: - self._thread_semaphore.acquire() - try: + async with self._async_semaphore: result = await asyncio.get_running_loop().run_in_executor( None, self.execute_code, instance_id, code, timeout, language ) - finally: - self._thread_semaphore.release() if isinstance(result, ToolResponse): return result, None, None