From 15bfa7a4b53e7015704d7bfd29df88d108241d6f Mon Sep 17 00:00:00 2001 From: Vu Anh Nguyen Date: Tue, 23 Jun 2026 17:36:31 +0700 Subject: [PATCH] fix(coding-agent): respect unlimited task concurrency --- packages/coding-agent/CHANGELOG.md | 2 + packages/coding-agent/src/task/parallel.ts | 3 +- .../coding-agent/test/task/task-batch.test.ts | 55 +++++++++++++++++++ .../coding-agent/test/task/task-spawn.test.ts | 44 +++++++++++++++ 4 files changed, 103 insertions(+), 1 deletion(-) diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index 1bff466483..c7f92122d1 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -27,6 +27,8 @@ ### Fixed +- Fixed `task.maxConcurrency: 0` serializing subagent spawns instead of disabling the task semaphore limit ([#3305](https://github.com/can1357/oh-my-pi/issues/3305)). + - Fixed `local://` URLs decoding images as corrupted text (mojibake) instead of showing the image - Fixed `omp --resume` hanging instead of exiting when the startup session picker is cancelled (Esc) or there are no sessions to resume. Startup arms long-lived handles (theme/appearance listeners, settings save timer, model registry), so the cancel/empty paths' bare `return` left the event loop alive and the process stuck after the picker cleared the alternate screen. These paths now exit cleanly via `process.exit(0)`, matching the `--version`/`--export` early-exit convention. The in-session `/resume` picker is unaffected — it keeps its own cancel handler that just closes the overlay. - Fixed the `/resume` session picker scrolling down after a session is deleted. The delete-confirmation dialog was mounted as a sibling below the picker's bottom border, briefly growing the picker past the terminal height; the TUI committed the picker's header rows into native scrollback to fit, and when the dialog closed `windowTop` stayed pinned at the new commit boundary, leaving the header stranded above the viewport. The picker now hosts the `SessionList` in a single content slot and swaps the dialog INTO that slot (replacing the `SessionList`) while it is open, so the dialog only competes with the `SessionList`'s rendered budget — not the `SessionList` AND the picker chrome — and the picker frame stays inside the viewport. ([#3283](https://github.com/can1357/oh-my-pi/issues/3283)) diff --git a/packages/coding-agent/src/task/parallel.ts b/packages/coding-agent/src/task/parallel.ts index 4a061e88f1..95d854ebb7 100644 --- a/packages/coding-agent/src/task/parallel.ts +++ b/packages/coding-agent/src/task/parallel.ts @@ -92,7 +92,8 @@ export class Semaphore { #queue: Array<() => void> = []; constructor(max: number) { - this.#max = Math.max(1, max); + const normalizedMax = Number.isFinite(max) ? Math.floor(max) : Number.POSITIVE_INFINITY; + this.#max = normalizedMax > 0 ? normalizedMax : Number.POSITIVE_INFINITY; } async acquire(): Promise { diff --git a/packages/coding-agent/test/task/task-batch.test.ts b/packages/coding-agent/test/task/task-batch.test.ts index 35982c885e..96ae5ee79b 100644 --- a/packages/coding-agent/test/task/task-batch.test.ts +++ b/packages/coding-agent/test/task/task-batch.test.ts @@ -13,6 +13,7 @@ * item; with `async.enabled=false`, it blocks and returns merged results. * Both modes forward the shared `context`; the flat form stays accepted at * runtime for internal callers. + * 4. task.maxConcurrency=0 preserves full-width synchronous batch fan-out. */ import { afterEach, beforeEach, describe, expect, it, vi } from "bun:test"; import { toolWireSchema } from "@oh-my-pi/pi-ai/utils/schema"; @@ -76,6 +77,16 @@ function makeResult(id: string, overrides: Partial = {}): SingleRe }; } +interface Deferred { + promise: Promise; + resolve: () => void; +} + +function deferred(): Deferred { + const { promise, resolve } = Promise.withResolvers(); + return { promise, resolve }; +} + function mockDiscovery(): void { vi.spyOn(discoveryModule, "discoverAgents").mockResolvedValue({ agents: [taskAgent], @@ -364,4 +375,48 @@ describe("task.batch spawning", () => { "# Goal\nShared synchronous context.", ]); }); + + it("treats task.maxConcurrency 0 as unlimited for synchronous batch fan-out", async () => { + mockDiscovery(); + const bothStarted = deferred(); + const release = deferred(); + const started: string[] = []; + vi.spyOn(executorModule, "runSubprocess").mockImplementation(async options => { + const id = options.id ?? "?"; + started.push(id); + if (started.length === 2) bothStarted.resolve(); + await release.promise; + return makeResult(id); + }); + + const manager = createManager(); + const tool = await TaskTool.create( + createSession({ + manager, + settings: { "async.enabled": false, "task.batch": true, "task.maxConcurrency": 0 }, + }), + ); + + const pending = tool.execute("tc-sync-unlimited", { + agent: "task", + context: "# Goal\nShared synchronous context.", + tasks: [ + { id: "Alpha", assignment: "Do A." }, + { id: "Beta", assignment: "Do B." }, + ], + } as TaskParams); + + const race = await Promise.race([ + bothStarted.promise.then(() => "started" as const), + Bun.sleep(250).then(() => "timeout" as const), + ]); + expect(race).toBe("started"); + expect([...started].sort()).toEqual(["Alpha", "Beta"]); + + release.resolve(); + const result = await pending; + expect(getFirstText(result)).toContain("All done."); + expect(result.details?.async).toBeUndefined(); + expect(result.details?.results.map(item => item.id).sort()).toEqual(["Alpha", "Beta"]); + }); }); diff --git a/packages/coding-agent/test/task/task-spawn.test.ts b/packages/coding-agent/test/task/task-spawn.test.ts index 0a18bf1cba..1cf945368b 100644 --- a/packages/coding-agent/test/task/task-spawn.test.ts +++ b/packages/coding-agent/test/task/task-spawn.test.ts @@ -7,6 +7,7 @@ * 2. The session-scoped spawn semaphore (task.maxConcurrency) serializes job * bodies: with concurrency 1 the second body does not start until the * first releases. + * 3. task.maxConcurrency=0 means unlimited rather than one-at-a-time. * * Param validation (missing agent / missing assignment) is covered by * test/task/task-schema.test.ts. @@ -187,4 +188,47 @@ describe("task spawn routing", () => { expect(firstJob.status).toBe("completed"); expect(secondJob.status).toBe("completed"); }); + + it("treats task.maxConcurrency 0 as unlimited for background spawns", async () => { + vi.spyOn(discoveryModule, "discoverAgents").mockResolvedValue({ + agents: [taskAgent], + projectAgentsDir: null, + }); + const started: string[] = []; + const gates = new Map(); + vi.spyOn(executorModule, "runSubprocess").mockImplementation(async options => { + const id = options.id ?? "?"; + started.push(id); + const gate = deferred(); + gates.set(id, gate); + await gate.promise; + return makeResult(id); + }); + + const manager = createManager(); + const tool = await TaskTool.create(createSession({ manager, settings: { "task.maxConcurrency": 0 } })); + + const first = await tool.execute("tc-unlimited-1", { + agent: "task", + id: "First", + assignment: "Work A.", + } as TaskParams); + const second = await tool.execute("tc-unlimited-2", { + agent: "task", + id: "Second", + assignment: "Work B.", + } as TaskParams); + const firstJob = manager.getJob(first.details!.async!.jobId)!; + const secondJob = manager.getJob(second.details!.async!.jobId)!; + + await pollUntil(() => started.length === 2); + expect(started).toEqual(["First", "Second"]); + expect(secondJob.queued).toBe(false); + + gates.get("First")!.resolve(); + gates.get("Second")!.resolve(); + await Promise.all([firstJob.promise, secondJob.promise]); + expect(firstJob.status).toBe("completed"); + expect(secondJob.status).toBe("completed"); + }); });