diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 9c7a7c94bb1..5710de096eb 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -9,6 +9,7 @@ import { type OrchestrationSession, ThreadId, type ProviderSession, + type ProviderSendTurnInput, type RuntimeMode, type TurnId, } from "@t3tools/contracts"; @@ -29,6 +30,7 @@ import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { increment, orchestrationEventsProcessedTotal } from "../../observability/Metrics.ts"; import { ProviderAdapterRequestError } from "../../provider/Errors.ts"; import type { ProviderServiceError } from "../../provider/Errors.ts"; +import { classifyProviderServiceFailure } from "../../provider/providerFallback.ts"; import { TextGeneration } from "../../textGeneration/TextGeneration.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { ProviderRegistry } from "../../provider/Services/ProviderRegistry.ts"; @@ -41,6 +43,8 @@ import { import { ServerSettingsService } from "../../serverSettings.ts"; import { VcsStatusBroadcaster } from "../../vcs/VcsStatusBroadcaster.ts"; import { GitWorkflowService } from "../../git/GitWorkflowService.ts"; +import { attemptProviderFallback } from "../providerFallbackWorkflow.ts"; +import { completeProviderFallbackChain } from "../providerFallbackChain.ts"; const isProviderAdapterRequestError = Schema.is(ProviderAdapterRequestError); const isProviderDriverKind = Schema.is(ProviderDriverKind); @@ -770,6 +774,10 @@ const make = Effect.gen(function* () { return; } + // A visible user turn starts a new fallback chain. Mid-task fallback turns + // bypass this reactor, so their attempted-instance history remains intact. + completeProviderFallbackChain(thread.id); + const isFirstUserMessageTurn = thread.messages.filter((entry) => entry.role === "user").length === 1; if (isFirstUserMessageTurn) { @@ -825,8 +833,54 @@ const make = Effect.gen(function* () { ); }; - const recoverTurnStartFailure = (cause: Cause.Cause) => - handleTurnStartFailure(cause).pipe( + const attemptFallbackBeforeReporting = Effect.fnUntraced(function* ( + cause: Cause.Cause, + attemptedSendTurnInput?: ProviderSendTurnInput, + ) { + const failure = classifyProviderServiceFailure(cause); + if (!failure) return false; + const modelSelection = + attemptedSendTurnInput?.modelSelection ?? + event.payload.modelSelection ?? + thread.modelSelection; + const sendTurnInput: ProviderSendTurnInput = attemptedSendTurnInput ?? { + threadId: event.payload.threadId, + ...(toNonEmptyProviderInput(message.text) + ? { input: toNonEmptyProviderInput(message.text) } + : {}), + ...(message.attachments && message.attachments.length > 0 + ? { attachments: message.attachments } + : {}), + modelSelection, + interactionMode: event.payload.interactionMode, + }; + const fallback = yield* attemptProviderFallback({ + threadId: event.payload.threadId, + failedInstanceId: modelSelection.instanceId, + modelSelection, + runtimeMode: event.payload.runtimeMode, + sendTurnInput, + failure, + requireCompatibleContinuation: !isFirstUserMessageTurn, + createdAt: event.payload.createdAt, + }); + return fallback.switched; + }); + + const recoverTurnStartFailure = ( + cause: Cause.Cause, + attemptedSendTurnInput?: ProviderSendTurnInput, + ) => + attemptFallbackBeforeReporting(cause, attemptedSendTurnInput).pipe( + Effect.catchCause((fallbackCause) => + Effect.logWarning("provider command reactor fallback attempt failed", { + eventType: event.type, + threadId: event.payload.threadId, + cause: Cause.pretty(fallbackCause), + originalCause: Cause.pretty(cause), + }).pipe(Effect.as(false)), + ), + Effect.flatMap((switched) => (switched ? Effect.void : handleTurnStartFailure(cause))), Effect.catchCause((recoveryCause) => Effect.logWarning("provider command reactor failed to recover turn start failure", { eventType: event.type, @@ -848,16 +902,17 @@ const make = Effect.gen(function* () { createdAt: event.payload.createdAt, }).pipe( Effect.map(Option.some), - Effect.catchCause((cause) => handleTurnStartFailure(cause).pipe(Effect.as(Option.none()))), + Effect.catchCause((cause) => recoverTurnStartFailure(cause).pipe(Effect.as(Option.none()))), ); if (Option.isNone(sendTurnRequest)) { return; } - yield* providerService - .sendTurn(sendTurnRequest.value) - .pipe(Effect.catchCause(recoverTurnStartFailure), Effect.forkScoped); + yield* providerService.sendTurn(sendTurnRequest.value).pipe( + Effect.catchCause((cause) => recoverTurnStartFailure(cause, sendTurnRequest.value)), + Effect.forkScoped, + ); }); const processTurnInterruptRequested = Effect.fn("processTurnInterruptRequested")(function* ( diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 001ba388949..be5a2a71e49 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -39,6 +39,7 @@ import { ProviderService, type ProviderServiceShape, } from "../../provider/Services/ProviderService.ts"; +import { makeProviderRegistryLayer } from "../../provider/testUtils/providerRegistryMock.ts"; import * as RepositoryIdentityResolver from "../../project/RepositoryIdentityResolver.ts"; import { OrchestrationEngineLive } from "./OrchestrationEngine.ts"; import { OrchestrationProjectionPipelineLive } from "./ProjectionPipeline.ts"; @@ -238,6 +239,7 @@ describe("ProviderRuntimeIngestion", () => { Layer.provideMerge(projectionSnapshotLayer), Layer.provideMerge(SqlitePersistenceMemory), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), + Layer.provideMerge(makeProviderRegistryLayer([])), Layer.provideMerge(makeTestServerSettingsLayer(options?.serverSettings)), Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())), Layer.provideMerge(NodeServices.layer), diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3e5978f4846..ea5cd0fd368 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -28,6 +28,7 @@ import * as Stream from "effect/Stream"; import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { classifyProviderRuntimeFailure } from "../../provider/providerFallback.ts"; import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; import { isGitRepository } from "../../git/Utils.ts"; @@ -38,6 +39,9 @@ import { type ProviderRuntimeIngestionShape, } from "../Services/ProviderRuntimeIngestion.ts"; import { ServerSettingsService } from "../../serverSettings.ts"; +import { attemptProviderFallback } from "../providerFallbackWorkflow.ts"; +import { decideProviderFallbackTrialEvent } from "../providerFallbackTrialGate.ts"; +import { completeProviderFallbackChain } from "../providerFallbackChain.ts"; const providerTurnKey = (threadId: ThreadId, turnId: TurnId) => `${threadId}:${turnId}`; @@ -54,6 +58,8 @@ const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000; const BUFFERED_PROPOSED_PLAN_BY_ID_TTL = Duration.minutes(120); const MAX_BUFFERED_ASSISTANT_CHARS = 24_000; +const HANDLED_FALLBACK_EVENT_CACHE_CAPACITY = 10_000; +const HANDLED_FALLBACK_EVENT_TTL = Duration.minutes(120); const STRICT_PROVIDER_LIFECYCLE_GUARD = process.env.T3CODE_STRICT_PROVIDER_LIFECYCLE_GUARD !== "0"; type TurnStartRequestedDomainEvent = Extract< @@ -666,6 +672,12 @@ const make = Effect.gen(function* () { lookup: () => Effect.succeed({ text: "", createdAt: "" }), }); + const handledFallbackEvents = yield* Cache.make({ + capacity: HANDLED_FALLBACK_EVENT_CACHE_CAPACITY, + timeToLive: HANDLED_FALLBACK_EVENT_TTL, + lookup: () => Effect.succeed(true), + }); + const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) { return yield* projectionSnapshotQuery .getThreadDetailById(threadId) @@ -1208,6 +1220,32 @@ const make = Effect.gen(function* () { const thread = yield* resolveThreadShell(event.threadId); if (!thread) return; + // A fallback candidate can emit before sendTurn confirms the handoff. + // Hold those events until the trial commits, or discard them if it rolls + // back, so provisional output never leaks into the thread projection. + const fallbackTrialDecision = + event.providerInstanceId === undefined + ? "not-trial" + : yield* decideProviderFallbackTrialEvent( + thread.id, + event.providerInstanceId, + event.createdAt, + ); + if (fallbackTrialDecision === "reject") { + return; + } + + // Ignore events from an instance that no longer owns the thread. A + // committed trial is accepted during the narrow projection handoff. + if ( + event.providerInstanceId !== undefined && + thread.session?.providerInstanceId !== undefined && + event.providerInstanceId !== thread.session.providerInstanceId && + fallbackTrialDecision !== "accept" + ) { + return; + } + let loadedThreadDetail: OrchestrationThread | null | undefined; const getLoadedThreadDetail = () => Effect.gen(function* () { @@ -1222,6 +1260,59 @@ const make = Effect.gen(function* () { const eventTurnId = toTurnId(event.turnId); const activeTurnId = thread.session?.activeTurnId ?? null; + const fallbackFailure = classifyProviderRuntimeFailure(event); + const fallbackInstanceId = event.providerInstanceId ?? thread.session?.providerInstanceId; + if ( + fallbackFailure && + fallbackInstanceId !== undefined && + (activeTurnId !== null || eventTurnId !== undefined) + ) { + const fallbackKey = `${thread.id}:${fallbackInstanceId}:${eventTurnId ?? activeTurnId ?? event.eventId}`; + const handled = yield* Cache.getOption(handledFallbackEvents, fallbackKey); + if (Option.isNone(handled)) { + yield* Cache.set(handledFallbackEvents, fallbackKey, true); + const fallback = yield* attemptProviderFallback({ + threadId: thread.id, + failedInstanceId: fallbackInstanceId, + modelSelection: thread.modelSelection, + runtimeMode: thread.runtimeMode, + sendTurnInput: { + threadId: thread.id, + input: "Continue.", + modelSelection: thread.modelSelection, + interactionMode: thread.interactionMode, + }, + failure: fallbackFailure, + requireCompatibleContinuation: true, + createdAt: now, + }).pipe( + Effect.catchCause((cause) => + Effect.logWarning("provider runtime fallback attempt failed", { + eventId: event.eventId, + eventType: event.type, + threadId: thread.id, + cause: Cause.pretty(cause), + }).pipe( + Effect.as({ + switched: false, + restoredOriginalInstance: false, + skipped: [], + }), + ), + ), + ); + if (fallback.switched || fallback.restoredOriginalInstance) return; + } + } + + if ( + !fallbackFailure && + fallbackInstanceId !== undefined && + (event.type === "turn.completed" || event.type === "session.exited") + ) { + completeProviderFallbackChain(thread.id, fallbackInstanceId); + } + const conflictsWithActiveTurn = activeTurnId !== null && eventTurnId !== undefined && !sameId(activeTurnId, eventTurnId); const missingTurnForActiveTurn = activeTurnId !== null && eventTurnId === undefined; diff --git a/apps/server/src/orchestration/providerFallbackChain.test.ts b/apps/server/src/orchestration/providerFallbackChain.test.ts new file mode 100644 index 00000000000..97dfe5ef181 --- /dev/null +++ b/apps/server/src/orchestration/providerFallbackChain.test.ts @@ -0,0 +1,72 @@ +import { ProviderInstanceId, ThreadId } from "@t3tools/contracts"; +import { afterEach, describe, expect, it } from "vite-plus/test"; + +import { + beginProviderFallbackChain, + completeProviderFallbackChain, + markProviderFallbackInstanceAttempted, + resetProviderFallbackChainsForTest, +} from "./providerFallbackChain.ts"; + +const threadId = ThreadId.make("thread-1"); +const first = ProviderInstanceId.make("codex-first"); +const second = ProviderInstanceId.make("codex-second"); +const third = ProviderInstanceId.make("codex-third"); +const origin = { + instanceId: first, + displayName: "Codex First", + failure: { kind: "rate-limit" as const, message: "Usage limit reached." }, + modelSelection: { instanceId: first, model: "gpt-5" }, + session: undefined, +}; + +afterEach(resetProviderFallbackChainsForTest); + +describe("provider fallback chain", () => { + it("retains every attempted instance across consecutive runtime failures", () => { + expect([...beginProviderFallbackChain(threadId, first, origin).attemptedInstanceIds]).toEqual([ + first, + ]); + markProviderFallbackInstanceAttempted(threadId, second); + + const secondAttempt = beginProviderFallbackChain(threadId, second, { + ...origin, + instanceId: second, + }); + expect([...secondAttempt.attemptedInstanceIds]).toEqual([first, second]); + expect(secondAttempt.origin).toEqual(origin); + markProviderFallbackInstanceAttempted(threadId, third); + + expect([...beginProviderFallbackChain(threadId, third, origin).attemptedInstanceIds]).toEqual([ + first, + second, + third, + ]); + }); + + it("starts a fresh chain after the active instance completes", () => { + beginProviderFallbackChain(threadId, first, origin); + markProviderFallbackInstanceAttempted(threadId, second); + completeProviderFallbackChain(threadId, second); + + expect([ + ...beginProviderFallbackChain(threadId, second, { + ...origin, + instanceId: second, + }).attemptedInstanceIds, + ]).toEqual([second]); + }); + + it("does not let a stale instance complete the current chain", () => { + beginProviderFallbackChain(threadId, first, origin); + markProviderFallbackInstanceAttempted(threadId, second); + completeProviderFallbackChain(threadId, first); + + expect([ + ...beginProviderFallbackChain(threadId, second, { + ...origin, + instanceId: second, + }).attemptedInstanceIds, + ]).toEqual([first, second]); + }); +}); diff --git a/apps/server/src/orchestration/providerFallbackChain.ts b/apps/server/src/orchestration/providerFallbackChain.ts new file mode 100644 index 00000000000..a818c953339 --- /dev/null +++ b/apps/server/src/orchestration/providerFallbackChain.ts @@ -0,0 +1,90 @@ +import { + type ModelSelection, + type ProviderInstanceId, + type ProviderSession, + type ThreadId, +} from "@t3tools/contracts"; +import type { ProviderFallbackFailure } from "../provider/providerFallback.ts"; + +export interface ProviderFallbackChainOrigin { + readonly instanceId: ProviderInstanceId; + readonly displayName: string; + readonly failure: ProviderFallbackFailure; + readonly modelSelection: ModelSelection; + readonly session: ProviderSession | undefined; +} + +export interface ProviderFallbackChainSnapshot { + readonly attemptedInstanceIds: ReadonlySet; + readonly origin: ProviderFallbackChainOrigin; +} + +interface ProviderFallbackChainState { + readonly attemptedInstanceIds: Set; + readonly origin: ProviderFallbackChainOrigin; + activeInstanceId: ProviderInstanceId; +} + +const PROVIDER_FALLBACK_CHAIN_CAPACITY = 10_000; +const providerFallbackChains = new Map(); + +function makeRoomForProviderFallbackChain(): void { + if (providerFallbackChains.size < PROVIDER_FALLBACK_CHAIN_CAPACITY) return; + const oldestThreadId = providerFallbackChains.keys().next().value; + if (oldestThreadId !== undefined) { + providerFallbackChains.delete(oldestThreadId); + } +} + +/** + * Starts a new chain or resumes the chain owned by the failing instance. + * Returning a copy prevents callers from mutating the tracked chain. + */ +export function beginProviderFallbackChain( + threadId: ThreadId, + failedInstanceId: ProviderInstanceId, + origin: ProviderFallbackChainOrigin, +): ProviderFallbackChainSnapshot { + const existing = providerFallbackChains.get(threadId); + if (existing?.activeInstanceId === failedInstanceId) { + existing.attemptedInstanceIds.add(failedInstanceId); + return { + attemptedInstanceIds: new Set(existing.attemptedInstanceIds), + origin: existing.origin, + }; + } + + makeRoomForProviderFallbackChain(); + const attemptedInstanceIds = new Set([failedInstanceId]); + providerFallbackChains.set(threadId, { + attemptedInstanceIds, + activeInstanceId: failedInstanceId, + origin, + }); + return { attemptedInstanceIds: new Set(attemptedInstanceIds), origin }; +} + +export function markProviderFallbackInstanceAttempted( + threadId: ThreadId, + instanceId: ProviderInstanceId, +): void { + const state = providerFallbackChains.get(threadId); + if (!state) return; + state.attemptedInstanceIds.add(instanceId); + state.activeInstanceId = instanceId; +} + +export function completeProviderFallbackChain( + threadId: ThreadId, + activeInstanceId?: ProviderInstanceId, +): void { + const state = providerFallbackChains.get(threadId); + if (!state || (activeInstanceId !== undefined && state.activeInstanceId !== activeInstanceId)) { + return; + } + providerFallbackChains.delete(threadId); +} + +export function resetProviderFallbackChainsForTest(): void { + providerFallbackChains.clear(); +} diff --git a/apps/server/src/orchestration/providerFallbackTrialGate.test.ts b/apps/server/src/orchestration/providerFallbackTrialGate.test.ts new file mode 100644 index 00000000000..1ae26aa735d --- /dev/null +++ b/apps/server/src/orchestration/providerFallbackTrialGate.test.ts @@ -0,0 +1,63 @@ +import { ProviderInstanceId, ThreadId } from "@t3tools/contracts"; +import { describe, expect, it } from "@effect/vitest"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; + +import { + beginProviderFallbackTrial, + completeProviderFallbackTrial, + decideProviderFallbackTrialEvent, +} from "./providerFallbackTrialGate.ts"; + +const threadId = ThreadId.make("thread-fallback-gate"); +const instanceId = ProviderInstanceId.make("codex-work"); + +describe("provider fallback trial event gate", () => { + it.effect("holds provisional events and rejects them when the trial fails", () => + Effect.gen(function* () { + const token = yield* beginProviderFallbackTrial(threadId, instanceId); + const decision = yield* decideProviderFallbackTrialEvent( + threadId, + instanceId, + DateTime.formatIso(DateTime.makeUnsafe(token.startedAtMs + 1)), + ).pipe(Effect.forkChild); + + yield* Effect.yieldNow; + yield* completeProviderFallbackTrial(token, "reject"); + + expect(yield* Fiber.join(decision)).toBe("reject"); + }), + ); + + it.effect("releases provisional events only after the trial commits", () => + Effect.gen(function* () { + const token = yield* beginProviderFallbackTrial(threadId, instanceId); + const decision = yield* decideProviderFallbackTrialEvent( + threadId, + instanceId, + DateTime.formatIso(DateTime.makeUnsafe(token.startedAtMs + 1)), + ).pipe(Effect.forkChild); + + yield* Effect.yieldNow; + yield* completeProviderFallbackTrial(token, "accept"); + + expect(yield* Fiber.join(decision)).toBe("accept"); + }), + ); + + it.effect("does not apply an old trial outcome to later events from the same instance", () => + Effect.gen(function* () { + const token = yield* beginProviderFallbackTrial(threadId, instanceId); + yield* completeProviderFallbackTrial(token, "reject"); + + expect( + yield* decideProviderFallbackTrialEvent( + threadId, + instanceId, + DateTime.formatIso(DateTime.makeUnsafe(token.startedAtMs + 60_000)), + ), + ).toBe("not-trial"); + }), + ); +}); diff --git a/apps/server/src/orchestration/providerFallbackTrialGate.ts b/apps/server/src/orchestration/providerFallbackTrialGate.ts new file mode 100644 index 00000000000..74b9935aee8 --- /dev/null +++ b/apps/server/src/orchestration/providerFallbackTrialGate.ts @@ -0,0 +1,105 @@ +import type { ProviderInstanceId, ThreadId } from "@t3tools/contracts"; +import * as Clock from "effect/Clock"; +import * as Deferred from "effect/Deferred"; +import * as Effect from "effect/Effect"; + +export type ProviderFallbackTrialDecision = "accept" | "not-trial" | "reject"; + +export interface ProviderFallbackTrialToken { + readonly key: string; + readonly threadId: ThreadId; + readonly startedAtMs: number; + readonly decision: Deferred.Deferred<"accept" | "reject">; +} + +interface ProviderFallbackTrialState extends ProviderFallbackTrialToken { + completedAtMs: number | undefined; + outcome: "accept" | "reject" | undefined; +} + +const TRIAL_OUTCOME_TTL_MS = 120_000; +const TRIAL_OUTCOME_CAPACITY = 10_000; +const trialStateByKey = new Map(); + +const trialKey = (threadId: ThreadId, instanceId: ProviderInstanceId) => + `${threadId}:${instanceId}`; + +function pruneTrialOutcomes(nowMs: number): void { + for (const [key, state] of trialStateByKey) { + if (state.completedAtMs !== undefined && nowMs - state.completedAtMs > TRIAL_OUTCOME_TTL_MS) { + trialStateByKey.delete(key); + } + } + while (trialStateByKey.size >= TRIAL_OUTCOME_CAPACITY) { + const oldestCompleted = [...trialStateByKey.entries()].find( + ([, state]) => state.completedAtMs !== undefined, + ); + if (!oldestCompleted) break; + trialStateByKey.delete(oldestCompleted[0]); + } +} + +export const beginProviderFallbackTrial = Effect.fn("beginProviderFallbackTrial")(function* ( + threadId: ThreadId, + instanceId: ProviderInstanceId, +) { + const startedAtMs = yield* Clock.currentTimeMillis; + const decision = yield* Deferred.make<"accept" | "reject">(); + const token: ProviderFallbackTrialState = { + key: trialKey(threadId, instanceId), + threadId, + startedAtMs, + decision, + completedAtMs: undefined, + outcome: undefined, + }; + pruneTrialOutcomes(startedAtMs); + trialStateByKey.set(token.key, token); + return token satisfies ProviderFallbackTrialToken; +}); + +export const completeProviderFallbackTrial = Effect.fn("completeProviderFallbackTrial")(function* ( + token: ProviderFallbackTrialToken, + outcome: "accept" | "reject", +) { + const state = trialStateByKey.get(token.key); + if (state !== token || state.outcome !== undefined) return; + state.outcome = outcome; + state.completedAtMs = yield* Clock.currentTimeMillis; + yield* Deferred.succeed(state.decision, outcome); +}); + +export const rejectPendingProviderFallbackTrials = Effect.fn("rejectPendingProviderFallbackTrials")( + function* (threadId: ThreadId) { + const pending = [...trialStateByKey.values()].filter( + (state) => state.threadId === threadId && state.outcome === undefined, + ); + yield* Effect.forEach(pending, (state) => completeProviderFallbackTrial(state, "reject"), { + discard: true, + }); + }, +); + +export const decideProviderFallbackTrialEvent = Effect.fn("decideProviderFallbackTrialEvent")( + function* (threadId: ThreadId, instanceId: ProviderInstanceId, eventCreatedAt: string) { + const state = trialStateByKey.get(trialKey(threadId, instanceId)); + if (!state) return "not-trial" as const; + + const eventTimeMs = Date.parse(eventCreatedAt); + if (Number.isFinite(eventTimeMs) && eventTimeMs < state.startedAtMs) { + return "not-trial" as const; + } + + if (state.outcome === undefined) { + return yield* Deferred.await(state.decision); + } + if ( + state.completedAtMs !== undefined && + Number.isFinite(eventTimeMs) && + eventTimeMs > state.completedAtMs + ) { + return "not-trial" as const; + } + return state.outcome; + }, +); diff --git a/apps/server/src/orchestration/providerFallbackWorkflow.ts b/apps/server/src/orchestration/providerFallbackWorkflow.ts new file mode 100644 index 00000000000..aa6b38357e4 --- /dev/null +++ b/apps/server/src/orchestration/providerFallbackWorkflow.ts @@ -0,0 +1,419 @@ +import { + CommandId, + EventId, + type ModelSelection, + type OrchestrationSession, + type ProviderInstanceId, + type ProviderSendTurnInput, + type RuntimeMode, + type ThreadId, +} from "@t3tools/contracts"; +import * as Crypto from "effect/Crypto"; +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Semaphore from "effect/Semaphore"; + +import { resolveThreadWorkspaceCwd } from "../checkpointing/Utils.ts"; +import { + planProviderFallback, + providerFallbackDisplayName, + type ProviderFallbackCandidate, + type ProviderFallbackFailure, + type ProviderFallbackSkip, +} from "../provider/providerFallback.ts"; +import { ProviderRegistry } from "../provider/Services/ProviderRegistry.ts"; +import { ProviderService } from "../provider/Services/ProviderService.ts"; +import { ServerSettingsService } from "../serverSettings.ts"; +import { OrchestrationEngineService } from "./Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "./Services/ProjectionSnapshotQuery.ts"; +import { + beginProviderFallbackChain, + completeProviderFallbackChain, + markProviderFallbackInstanceAttempted, +} from "./providerFallbackChain.ts"; +import { + beginProviderFallbackTrial, + completeProviderFallbackTrial, + rejectPendingProviderFallbackTrials, + type ProviderFallbackTrialToken, +} from "./providerFallbackTrialGate.ts"; + +export interface ProviderFallbackAttemptInput { + readonly threadId: ThreadId; + readonly failedInstanceId: ProviderInstanceId; + readonly modelSelection: ModelSelection; + readonly runtimeMode: RuntimeMode; + readonly sendTurnInput: ProviderSendTurnInput; + readonly failure: ProviderFallbackFailure; + readonly requireCompatibleContinuation: boolean; + readonly createdAt: string; +} + +export interface ProviderFallbackAttemptResult { + readonly switched: boolean; + readonly restoredOriginalInstance?: boolean; + readonly skipped: ReadonlyArray; +} + +interface ProviderFallbackLockEntry { + readonly semaphore: Semaphore.Semaphore; + users: number; + generation: number; + lastResult: ProviderFallbackAttemptResult | undefined; +} + +const fallbackLocks = new Map(); + +function withProviderFallbackLock( + threadId: ThreadId, + effect: Effect.Effect, +): Effect.Effect { + return Effect.acquireUseRelease( + Effect.sync(() => { + const existing = fallbackLocks.get(threadId); + if (existing) { + existing.users += 1; + return { entry: existing, observedGeneration: existing.generation }; + } + const entry: ProviderFallbackLockEntry = { + semaphore: Semaphore.makeUnsafe(1), + users: 1, + generation: 0, + lastResult: undefined, + }; + fallbackLocks.set(threadId, entry); + return { entry, observedGeneration: 0 }; + }), + ({ entry, observedGeneration }) => + entry.semaphore.withPermits(1)( + Effect.gen(function* () { + if (entry.generation !== observedGeneration && entry.lastResult !== undefined) { + return entry.lastResult; + } + const result = yield* effect; + entry.generation += 1; + entry.lastResult = result; + return result; + }), + ), + ({ entry }) => + Effect.sync(() => { + entry.users -= 1; + if (entry.users === 0 && fallbackLocks.get(threadId) === entry) { + fallbackLocks.delete(threadId); + } + }), + ); +} + +function formatFailure(error: unknown): string { + return error instanceof Error && error.message.trim().length > 0 ? error.message : String(error); +} + +function sessionStatus(status: "connecting" | "ready" | "running" | "error" | "closed") { + switch (status) { + case "connecting": + return "starting" as const; + case "running": + return "running" as const; + case "error": + return "error" as const; + case "closed": + return "stopped" as const; + case "ready": + return "ready" as const; + } +} + +const attemptProviderFallbackUnlocked = Effect.fn("attemptProviderFallbackUnlocked")(function* ( + input: ProviderFallbackAttemptInput, +) { + const crypto = yield* Crypto.Crypto; + const engine = yield* OrchestrationEngineService; + const projection = yield* ProjectionSnapshotQuery; + const providerRegistry = yield* ProviderRegistry; + const providerService = yield* ProviderService; + const settingsService = yield* ServerSettingsService; + + const settings = yield* settingsService.getSettings; + if (!settings.providerFallback.enabled) { + return { switched: false, skipped: [] } satisfies ProviderFallbackAttemptResult; + } + + const thread = Option.getOrUndefined(yield* projection.getThreadDetailById(input.threadId)); + if (!thread) return { switched: false, skipped: [] } satisfies ProviderFallbackAttemptResult; + const project = Option.getOrUndefined(yield* projection.getProjectShellById(thread.projectId)); + const cwd = resolveThreadWorkspaceCwd({ thread, projects: project ? [project] : [] }); + const providers = yield* providerRegistry.getProviders; + const currentProvider = providers.find( + (provider) => provider.instanceId === input.failedInstanceId, + ); + if (!currentProvider) { + return { switched: false, skipped: [] } satisfies ProviderFallbackAttemptResult; + } + + const activeSession = (yield* providerService.listSessions()).find( + (session) => session.threadId === input.threadId, + ); + const fallbackChain = beginProviderFallbackChain(input.threadId, input.failedInstanceId, { + instanceId: input.failedInstanceId, + displayName: providerFallbackDisplayName(currentProvider), + failure: input.failure, + modelSelection: input.modelSelection, + session: activeSession, + }); + + const plan = planProviderFallback({ + settings, + providers, + currentInstanceId: input.failedInstanceId, + modelSelection: input.modelSelection, + requireCompatibleContinuation: input.requireCompatibleContinuation, + excludedInstanceIds: fallbackChain.attemptedInstanceIds, + }); + const skipped: ProviderFallbackSkip[] = [...plan.skipped]; + let bindingChanged = false; + let restoredOriginalInstance = false; + let boundFallbackInstance: ProviderFallbackCandidate | undefined; + let currentTrialToken: ProviderFallbackTrialToken | undefined; + + const nextCommandId = (tag: string) => + crypto.randomUUIDv4.pipe(Effect.map((id) => CommandId.make(`server:${tag}:${id}`))); + const appendOutcomeActivity = (outcome: { + readonly kind: "provider.fallback.failed" | "provider.fallback.succeeded"; + readonly summary: string; + readonly tone: "error" | "info"; + readonly toInstanceId?: ProviderInstanceId; + readonly toDisplayName?: string; + readonly useOriginalFailure?: boolean; + }) => + Effect.all({ commandId: nextCommandId(outcome.kind), eventId: crypto.randomUUIDv4 }).pipe( + Effect.flatMap(({ commandId, eventId }) => + engine.dispatch({ + type: "thread.activity.append", + commandId, + threadId: input.threadId, + activity: { + id: EventId.make(eventId), + tone: outcome.tone, + kind: outcome.kind, + summary: outcome.summary, + payload: { + fromInstanceId: outcome.useOriginalFailure + ? fallbackChain.origin.instanceId + : input.failedInstanceId, + fromDisplayName: outcome.useOriginalFailure + ? fallbackChain.origin.displayName + : providerFallbackDisplayName(currentProvider), + ...(outcome.toInstanceId ? { toInstanceId: outcome.toInstanceId } : {}), + ...(outcome.toDisplayName ? { toDisplayName: outcome.toDisplayName } : {}), + failureKind: outcome.useOriginalFailure + ? fallbackChain.origin.failure.kind + : input.failure.kind, + detail: outcome.useOriginalFailure + ? fallbackChain.origin.failure.message + : input.failure.message, + ...(outcome.useOriginalFailure ? { restoredOriginalInstance: true } : {}), + skipped, + }, + turnId: thread.session?.activeTurnId ?? null, + createdAt: input.createdAt, + }, + createdAt: input.createdAt, + }), + ), + ); + const stopFallbackSessionAndProject = Effect.fn("stopFallbackSessionAndProject")(function* () { + const bound = boundFallbackInstance; + yield* providerService.stopSession({ threadId: input.threadId }).pipe(Effect.ignore); + yield* engine.dispatch({ + type: "thread.session.set", + commandId: yield* nextCommandId("provider-fallback-stop"), + threadId: input.threadId, + session: { + threadId: input.threadId, + status: "stopped", + providerName: bound?.provider.driver ?? currentProvider.driver, + providerInstanceId: bound?.instanceId ?? input.failedInstanceId, + runtimeMode: input.runtimeMode, + activeTurnId: null, + lastError: input.failure.message, + updatedAt: input.createdAt, + }, + createdAt: input.createdAt, + }); + }); + + for (const candidate of plan.candidates) { + markProviderFallbackInstanceAttempted(input.threadId, candidate.instanceId); + currentTrialToken = yield* beginProviderFallbackTrial(input.threadId, candidate.instanceId); + const attempt = yield* Effect.gen(function* () { + const started = yield* providerService.startSession(input.threadId, { + threadId: input.threadId, + provider: candidate.provider.driver, + providerInstanceId: candidate.instanceId, + ...(cwd ? { cwd } : {}), + modelSelection: candidate.modelSelection, + ...(input.requireCompatibleContinuation && activeSession?.resumeCursor !== undefined + ? { resumeCursor: activeSession.resumeCursor } + : {}), + runtimeMode: input.runtimeMode, + }); + bindingChanged = true; + boundFallbackInstance = candidate; + const turn = yield* providerService.sendTurn({ + ...input.sendTurnInput, + modelSelection: candidate.modelSelection, + }); + return { started, turn }; + }).pipe(Effect.result); + + if (attempt._tag === "Failure") { + yield* completeProviderFallbackTrial(currentTrialToken, "reject"); + currentTrialToken = undefined; + skipped.push({ + instanceId: candidate.instanceId, + displayName: candidate.displayName, + reason: formatFailure(attempt.failure), + }); + continue; + } + + const { started, turn } = attempt.success; + yield* engine.dispatch({ + type: "thread.meta.update", + commandId: yield* nextCommandId("provider-fallback-model"), + threadId: input.threadId, + modelSelection: candidate.modelSelection, + }); + const session: OrchestrationSession = { + threadId: input.threadId, + status: "running", + providerName: started.provider, + providerInstanceId: candidate.instanceId, + runtimeMode: input.runtimeMode, + activeTurnId: turn.turnId, + lastError: null, + updatedAt: input.createdAt, + }; + yield* engine.dispatch({ + type: "thread.session.set", + commandId: yield* nextCommandId("provider-fallback-session"), + threadId: input.threadId, + session, + createdAt: input.createdAt, + }); + yield* completeProviderFallbackTrial(currentTrialToken, "accept"); + currentTrialToken = undefined; + yield* appendOutcomeActivity({ + kind: "provider.fallback.succeeded", + summary: `Switched to ${candidate.displayName}`, + tone: "info", + toInstanceId: candidate.instanceId, + toDisplayName: candidate.displayName, + }); + return { switched: true, skipped } satisfies ProviderFallbackAttemptResult; + } + + const originalSession = fallbackChain.origin.session; + const shouldRestoreOriginalInstance = + bindingChanged || fallbackChain.origin.instanceId !== input.failedInstanceId; + if (shouldRestoreOriginalInstance) { + if (originalSession) { + const originalInstanceId = originalSession.providerInstanceId; + if (originalInstanceId === undefined) { + skipped.push({ + instanceId: input.failedInstanceId, + displayName: providerFallbackDisplayName(currentProvider), + reason: "Could not restore the original session because its instance id is missing.", + }); + yield* stopFallbackSessionAndProject(); + } else { + const originalModelSelection: ModelSelection = fallbackChain.origin.modelSelection; + const restored = yield* providerService + .startSession(input.threadId, { + threadId: input.threadId, + provider: originalSession.provider, + providerInstanceId: originalInstanceId, + ...(cwd ? { cwd } : {}), + modelSelection: originalModelSelection, + ...(originalSession.resumeCursor !== undefined + ? { resumeCursor: originalSession.resumeCursor } + : {}), + runtimeMode: input.runtimeMode, + }) + .pipe(Effect.result); + if (restored._tag === "Success") { + yield* engine.dispatch({ + type: "thread.meta.update", + commandId: yield* nextCommandId("provider-fallback-restore-model"), + threadId: input.threadId, + modelSelection: originalModelSelection, + }); + yield* engine.dispatch({ + type: "thread.session.set", + commandId: yield* nextCommandId("provider-fallback-restore"), + threadId: input.threadId, + session: { + threadId: input.threadId, + status: sessionStatus(restored.success.status), + providerName: restored.success.provider, + providerInstanceId: originalInstanceId, + runtimeMode: input.runtimeMode, + activeTurnId: null, + lastError: input.failure.message, + updatedAt: input.createdAt, + }, + createdAt: input.createdAt, + }); + const attemptedOriginSkip = skipped.findIndex( + (entry) => + entry.instanceId === originalInstanceId && + entry.reason === + "This instance was already attempted during the current fallback chain.", + ); + if (attemptedOriginSkip >= 0) skipped.splice(attemptedOriginSkip, 1); + restoredOriginalInstance = true; + } else { + skipped.push({ + instanceId: originalInstanceId, + displayName: String(originalInstanceId), + reason: `Could not restore the original instance: ${formatFailure(restored.failure)}`, + }); + yield* stopFallbackSessionAndProject(); + } + } + } else { + yield* stopFallbackSessionAndProject(); + } + } + + yield* appendOutcomeActivity({ + kind: "provider.fallback.failed", + summary: "Automatic provider fallback failed", + tone: "error", + useOriginalFailure: restoredOriginalInstance, + }); + completeProviderFallbackChain(input.threadId); + return { + switched: false, + restoredOriginalInstance, + skipped, + } satisfies ProviderFallbackAttemptResult; +}); + +export const attemptProviderFallback = Effect.fn("attemptProviderFallback")(function* ( + input: ProviderFallbackAttemptInput, +) { + return yield* withProviderFallbackLock( + input.threadId, + attemptProviderFallbackUnlocked(input).pipe( + Effect.onError(() => + Effect.sync(() => { + completeProviderFallbackChain(input.threadId); + }), + ), + Effect.ensuring(rejectPendingProviderFallbackTrials(input.threadId)), + ), + ); +}); diff --git a/apps/server/src/provider/providerFallback.test.ts b/apps/server/src/provider/providerFallback.test.ts new file mode 100644 index 00000000000..95882a4a9d7 --- /dev/null +++ b/apps/server/src/provider/providerFallback.test.ts @@ -0,0 +1,170 @@ +import { + EventId, + ProviderDriverKind, + ProviderInstanceId, + ThreadId, + type ServerProvider, + type ServerSettings, +} from "@t3tools/contracts"; +import * as Cause from "effect/Cause"; +import { describe, expect, it } from "vite-plus/test"; + +import { ProviderAdapterRequestError } from "./Errors.ts"; +import { + classifyProviderRuntimeFailure, + classifyProviderServiceFailure, + planProviderFallback, +} from "./providerFallback.ts"; + +const provider = (input: { + id: string; + models?: ReadonlyArray; + continuation?: string; +}): ServerProvider => ({ + instanceId: ProviderInstanceId.make(input.id), + driver: ProviderDriverKind.make("codex"), + displayName: input.id, + ...(input.continuation ? { continuation: { groupKey: input.continuation } } : {}), + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated", type: "test" }, + checkedAt: "2026-06-21T00:00:00.000Z", + models: (input.models ?? ["gpt-5"]).map((slug) => ({ + slug, + name: slug, + isCustom: false, + capabilities: null, + })), + slashCommands: [], + skills: [], +}); + +const settings = { + providerFallback: { enabled: true }, + providerInstances: {}, +} as ServerSettings; + +describe("planProviderFallback", () => { + it("preserves provider order and explains model and continuation skips", () => { + const plan = planProviderFallback({ + settings, + providers: [ + provider({ id: "codex", continuation: "home:a" }), + provider({ id: "codex_missing", models: ["gpt-4"], continuation: "home:a" }), + provider({ id: "codex_other_home", continuation: "home:b" }), + provider({ id: "codex_work", continuation: "home:a" }), + ], + currentInstanceId: ProviderInstanceId.make("codex"), + modelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "gpt-5" }, + requireCompatibleContinuation: true, + }); + + expect(plan.candidates.map((entry) => entry.instanceId)).toEqual(["codex_work"]); + expect(plan.skipped.map((entry) => entry.reason)).toEqual([ + "Model 'gpt-5' was not found on this instance.", + "The provider home directory or continuation store does not match the active instance.", + ]); + }); + + it("defaults instance participation on and excludes explicitly opted-out instances", () => { + const plan = planProviderFallback({ + settings: { + ...settings, + providerInstances: { + [ProviderInstanceId.make("codex_disabled")]: { + driver: ProviderDriverKind.make("codex"), + allowFallback: false, + }, + }, + }, + providers: [ + provider({ id: "codex" }), + provider({ id: "codex_enabled" }), + provider({ id: "codex_disabled" }), + ], + currentInstanceId: ProviderInstanceId.make("codex"), + modelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "gpt-5" }, + requireCompatibleContinuation: false, + }); + + expect(plan.candidates.map((entry) => entry.instanceId)).toEqual(["codex_enabled"]); + expect(plan.skipped[0]?.reason).toBe("Automatic fallback is disabled for this instance."); + }); + + it("never retries instances already attempted by the current fallback chain", () => { + const plan = planProviderFallback({ + settings, + providers: [ + provider({ id: "codex_first" }), + provider({ id: "codex_second" }), + provider({ id: "codex_third" }), + ], + currentInstanceId: ProviderInstanceId.make("codex_second"), + modelSelection: { instanceId: ProviderInstanceId.make("codex_second"), model: "gpt-5" }, + requireCompatibleContinuation: false, + excludedInstanceIds: new Set([ + ProviderInstanceId.make("codex_first"), + ProviderInstanceId.make("codex_second"), + ]), + }); + + expect(plan.candidates.map((entry) => entry.instanceId)).toEqual(["codex_third"]); + expect(plan.skipped).toEqual([ + { + instanceId: "codex_first", + displayName: "codex_first", + reason: "This instance was already attempted during the current fallback chain.", + }, + ]); + }); + + it("does not loop back to the first instance when both available instances fail", () => { + const first = ProviderInstanceId.make("codex_first"); + const second = ProviderInstanceId.make("codex_second"); + const plan = planProviderFallback({ + settings, + providers: [provider({ id: first }), provider({ id: second })], + currentInstanceId: second, + modelSelection: { instanceId: second, model: "gpt-5" }, + requireCompatibleContinuation: false, + excludedInstanceIds: new Set([first, second]), + }); + + expect(plan.candidates).toEqual([]); + expect(plan.skipped.map((entry) => entry.instanceId)).toEqual([first]); + }); +}); + +describe("provider fallback failure classification", () => { + it("accepts operational failures and rejects unrelated provider errors", () => { + const rateLimit = new ProviderAdapterRequestError({ + provider: "codex", + method: "turn/start", + detail: "Usage limit reached for this account.", + }); + const invalidPrompt = new ProviderAdapterRequestError({ + provider: "codex", + method: "turn/start", + detail: "Prompt is invalid.", + }); + + expect(classifyProviderServiceFailure(Cause.fail(rateLimit))?.kind).toBe("rate-limit"); + expect(classifyProviderServiceFailure(Cause.fail(invalidPrompt))).toBeUndefined(); + }); + + it("classifies canonical transport errors without parsing provider-specific details", () => { + expect( + classifyProviderRuntimeFailure({ + type: "runtime.error", + eventId: EventId.make("event-1"), + provider: ProviderDriverKind.make("codex"), + providerInstanceId: ProviderInstanceId.make("codex"), + threadId: ThreadId.make("thread-1"), + createdAt: "2026-06-21T00:00:00.000Z", + payload: { message: "Disconnected", class: "transport_error" }, + })?.kind, + ).toBe("transport"); + }); +}); diff --git a/apps/server/src/provider/providerFallback.ts b/apps/server/src/provider/providerFallback.ts new file mode 100644 index 00000000000..8dcf8d79689 --- /dev/null +++ b/apps/server/src/provider/providerFallback.ts @@ -0,0 +1,171 @@ +import { + isProviderAvailable, + type ModelSelection, + type ProviderInstanceId, + type ProviderRuntimeEvent, + type ServerProvider, + type ServerSettings, +} from "@t3tools/contracts"; +import * as Cause from "effect/Cause"; +import * as Schema from "effect/Schema"; + +import { + ProviderAdapterProcessError, + ProviderAdapterRequestError, + ProviderUnsupportedError, + type ProviderServiceError, +} from "./Errors.ts"; + +export type ProviderFallbackFailureKind = + | "authentication" + | "process" + | "rate-limit" + | "transport" + | "unavailable"; + +export interface ProviderFallbackFailure { + readonly kind: ProviderFallbackFailureKind; + readonly message: string; +} + +export interface ProviderFallbackSkip { + readonly instanceId: ProviderInstanceId; + readonly displayName: string; + readonly reason: string; +} + +export interface ProviderFallbackCandidate { + readonly instanceId: ProviderInstanceId; + readonly displayName: string; + readonly modelSelection: ModelSelection; + readonly provider: ServerProvider; +} + +export interface ProviderFallbackPlan { + readonly candidates: ReadonlyArray; + readonly skipped: ReadonlyArray; +} + +const RATE_LIMIT_PATTERN = + /\b(?:rate[ -]?limit|usage limit|quota|too many requests|resource exhausted|credits? exhausted|limit reached)\b/i; +const AUTH_PATTERN = + /\b(?:unauthenticated|authentication|not authenticated|invalid (?:api )?key|expired token|login required|unauthorized|forbidden)\b/i; +const TRANSPORT_PATTERN = + /\b(?:connection (?:closed|lost|refused|reset)|network|socket|timed? out|timeout|transport|broken pipe|econnreset|econnrefused|service unavailable|bad gateway|gateway timeout|http 5\d\d)\b/i; +const isProviderUnsupportedError = Schema.is(ProviderUnsupportedError); +const isProviderAdapterProcessError = Schema.is(ProviderAdapterProcessError); +const isProviderAdapterRequestError = Schema.is(ProviderAdapterRequestError); + +function classifyMessage(message: string): ProviderFallbackFailure | undefined { + if (RATE_LIMIT_PATTERN.test(message)) return { kind: "rate-limit", message }; + if (AUTH_PATTERN.test(message)) return { kind: "authentication", message }; + if (TRANSPORT_PATTERN.test(message)) return { kind: "transport", message }; + return undefined; +} + +export function classifyProviderServiceFailure( + cause: Cause.Cause, +): ProviderFallbackFailure | undefined { + const failReason = cause.reasons.find(Cause.isFailReason); + const error = failReason?.error; + if (isProviderUnsupportedError(error)) { + return { kind: "unavailable", message: error.message }; + } + if (isProviderAdapterProcessError(error)) { + return { kind: "process", message: error.detail }; + } + if (isProviderAdapterRequestError(error)) { + return classifyMessage(error.detail); + } + return error instanceof Error + ? classifyMessage(error.message) + : classifyMessage(Cause.pretty(cause)); +} + +export function classifyProviderRuntimeFailure( + event: ProviderRuntimeEvent, +): ProviderFallbackFailure | undefined { + if (event.type === "runtime.error") { + if (event.payload.class === "transport_error") { + return { kind: "transport", message: event.payload.message }; + } + return classifyMessage(event.payload.message); + } + if (event.type === "session.exited" && event.payload.exitKind === "error") { + const message = event.payload.reason ?? "Provider process exited unexpectedly."; + return classifyMessage(message) ?? { kind: "process", message }; + } + if (event.type === "turn.completed" && event.payload.state === "failed") { + const message = event.payload.errorMessage ?? "Provider turn failed."; + return classifyMessage(message); + } + return undefined; +} + +export function providerFallbackDisplayName(provider: ServerProvider): string { + return provider.displayName?.trim() || String(provider.instanceId); +} + +export function planProviderFallback(input: { + readonly settings: ServerSettings; + readonly providers: ReadonlyArray; + readonly currentInstanceId: ProviderInstanceId; + readonly modelSelection: ModelSelection; + readonly requireCompatibleContinuation: boolean; + readonly excludedInstanceIds?: ReadonlySet; +}): ProviderFallbackPlan { + const current = input.providers.find( + (provider) => provider.instanceId === input.currentInstanceId, + ); + if (!current) return { candidates: [], skipped: [] }; + + const candidates: ProviderFallbackCandidate[] = []; + const skipped: ProviderFallbackSkip[] = []; + for (const provider of input.providers) { + if (provider.instanceId === current.instanceId || provider.driver !== current.driver) continue; + const displayName = providerFallbackDisplayName(provider); + const skip = (reason: string) => + skipped.push({ instanceId: provider.instanceId, displayName, reason }); + const config = input.settings.providerInstances[provider.instanceId]; + + if (input.excludedInstanceIds?.has(provider.instanceId)) { + skip("This instance was already attempted during the current fallback chain."); + continue; + } + + if (config?.allowFallback === false) { + skip("Automatic fallback is disabled for this instance."); + continue; + } + if (!provider.enabled || provider.status === "disabled") { + skip("The instance is disabled."); + continue; + } + if (!provider.installed || !isProviderAvailable(provider) || provider.status === "error") { + skip(provider.unavailableReason ?? provider.message ?? "The instance is unavailable."); + continue; + } + if (!provider.models.some((model) => model.slug === input.modelSelection.model)) { + skip(`Model '${input.modelSelection.model}' was not found on this instance.`); + continue; + } + if (input.requireCompatibleContinuation) { + const currentGroup = current.continuation?.groupKey; + const candidateGroup = provider.continuation?.groupKey; + if (!currentGroup || !candidateGroup || currentGroup !== candidateGroup) { + skip( + "The provider home directory or continuation store does not match the active instance.", + ); + continue; + } + } + + candidates.push({ + instanceId: provider.instanceId, + displayName, + modelSelection: { ...input.modelSelection, instanceId: provider.instanceId }, + provider, + }); + } + return { candidates, skipped }; +} diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index cf5bb9de5e9..73715554c8c 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -1684,6 +1684,98 @@ function ChatViewContent(props: ChatViewProps) { const selectedProvider: ProviderDriverKind = lockedProvider ?? unlockedSelectedProvider; const phase = derivePhase(activeThread?.session ?? null); const threadActivities = activeThread?.activities ?? EMPTY_ACTIVITIES; + const seenFallbackActivityIdsByThreadRef = useRef(new Map>()); + useEffect(() => { + if (!activeThreadKey || !activeThreadRef) return; + const existing = seenFallbackActivityIdsByThreadRef.current.get(activeThreadKey); + if (!existing) { + seenFallbackActivityIdsByThreadRef.current.set( + activeThreadKey, + new Set(threadActivities.map((activity) => String(activity.id))), + ); + return; + } + + for (const activity of threadActivities) { + const activityId = String(activity.id); + if (existing.has(activityId)) continue; + existing.add(activityId); + if ( + activity.kind !== "provider.fallback.succeeded" && + activity.kind !== "provider.fallback.failed" + ) { + continue; + } + const payload = + activity.payload && typeof activity.payload === "object" && !Array.isArray(activity.payload) + ? (activity.payload as Record) + : {}; + const from = + typeof payload.fromDisplayName === "string" ? payload.fromDisplayName : "current instance"; + const to = typeof payload.toDisplayName === "string" ? payload.toDisplayName : null; + const detail = typeof payload.detail === "string" ? payload.detail : undefined; + const restoredOriginalInstance = payload.restoredOriginalInstance === true; + const skipped = Array.isArray(payload.skipped) + ? payload.skipped.filter( + (entry): entry is { instanceId: string; displayName: string; reason: string } => + Boolean(entry) && + typeof entry === "object" && + typeof (entry as Record).instanceId === "string" && + typeof (entry as Record).displayName === "string" && + typeof (entry as Record).reason === "string", + ) + : []; + const expandableContent = + skipped.length > 0 ? ( +
    + {skipped.map((entry) => ( +
  • + {entry.displayName}:{" "} + {entry.reason} +
  • + ))} +
+ ) : undefined; + const description = + activity.kind === "provider.fallback.succeeded" && detail ? ( +
+

+ Switched after “{from}” reported: +

+
+ {detail} +
+
+ ) : ( + detail + ); + + toastManager.add( + stackedThreadToast({ + type: activity.kind === "provider.fallback.succeeded" ? "success" : "error", + title: + activity.kind === "provider.fallback.succeeded" && to + ? `Switched from ${from} to ${to}` + : restoredOriginalInstance + ? "No fallback instance succeeded" + : `Could not switch from ${from}`, + ...(description ? { description } : {}), + data: { + threadRef: activeThreadRef, + ...(expandableContent + ? { + expandableContent, + expandableLabels: { + expand: "Show skipped instances", + collapse: "Hide skipped instances", + }, + } + : {}), + }, + }), + ); + } + }, [activeThreadKey, activeThreadRef, threadActivities]); const workLogEntries = useMemo(() => deriveWorkLogEntries(threadActivities), [threadActivities]); const pendingApprovals = useMemo( () => derivePendingApprovals(threadActivities), diff --git a/apps/web/src/components/settings/ProviderInstanceCard.tsx b/apps/web/src/components/settings/ProviderInstanceCard.tsx index ac2f7be81e8..46349df0df7 100644 --- a/apps/web/src/components/settings/ProviderInstanceCard.tsx +++ b/apps/web/src/components/settings/ProviderInstanceCard.tsx @@ -326,6 +326,7 @@ interface ProviderInstanceCardProps { readonly isExpanded: boolean; readonly onExpandedChange: (open: boolean) => void; readonly onUpdate: (nextInstance: ProviderInstanceConfig) => void; + readonly providerFallbackEnabled: boolean; /** * Pass `undefined` to hide the delete button entirely. Built-in default * instance slots use `undefined` — they can't be deleted without losing @@ -383,6 +384,7 @@ export function ProviderInstanceCard({ isExpanded, onExpandedChange, onUpdate, + providerFallbackEnabled, onDelete, headerAction, hiddenModels, @@ -466,6 +468,10 @@ export function ProviderInstanceCard({ onUpdate({ ...instance, enabled: value }); }; + const updateAllowFallback = (value: boolean) => { + onUpdate({ ...instance, allowFallback: value }); + }; + const updateAccentColor = (value: string) => { const normalized = normalizeProviderAccentColor(value); const { accentColor: _omit, ...rest } = instance; @@ -747,6 +753,39 @@ export function ProviderInstanceCard({ +
+
+
+

Use for automatic fallback

+

+ Allows this instance to receive compatible turns when another instance fails. +

+
+ + + } + > + updateAllowFallback(Boolean(checked))} + aria-label={`Allow ${displayName} for automatic fallback`} + /> + + {!providerFallbackEnabled ? ( + + Enable automatic provider fallback to change this setting. + + ) : null} + +
+
+
deleteProviderInstance(row.instanceId)} headerAction={headerAction} hiddenModels={modelPreferences.hiddenModels} @@ -1415,6 +1416,24 @@ export function ProviderSettingsPanel() { })} + + + updateSettings({ + providerFallback: { enabled: Boolean(checked) }, + }) + } + aria-label="Enable automatic provider fallback" + /> + } + /> + + {isAddInstanceDialogOpen ? ( ) : null} diff --git a/packages/contracts/src/providerInstance.ts b/packages/contracts/src/providerInstance.ts index 2a9fc9ed0d1..545a7ba87ee 100644 --- a/packages/contracts/src/providerInstance.ts +++ b/packages/contracts/src/providerInstance.ts @@ -127,6 +127,8 @@ export const ProviderInstanceConfig = Schema.Struct({ accentColor: Schema.optional(TrimmedNonEmptyString), environment: Schema.optionalKey(ProviderInstanceEnvironment), enabled: Schema.optionalKey(Schema.Boolean), + /** Whether this instance may participate in automatic same-driver fallback. */ + allowFallback: Schema.optionalKey(Schema.Boolean), config: Schema.optionalKey(Schema.Unknown), }); export type ProviderInstanceConfig = typeof ProviderInstanceConfig.Type; diff --git a/packages/contracts/src/settings.test.ts b/packages/contracts/src/settings.test.ts index aba97cbe205..546fa1c39ff 100644 --- a/packages/contracts/src/settings.test.ts +++ b/packages/contracts/src/settings.test.ts @@ -11,6 +11,7 @@ const encodeServerSettings = Schema.encodeSync(ServerSettings); describe("ServerSettings.providerInstances (slice-2 invariant)", () => { it("defaults to an empty record so legacy configs without the key still decode", () => { expect(DEFAULT_SERVER_SETTINGS.providerInstances).toEqual({}); + expect(DEFAULT_SERVER_SETTINGS.providerFallback.enabled).toBe(false); }); it("decodes a fully empty config (legacy on-disk shape) without complaint", () => { diff --git a/packages/contracts/src/settings.ts b/packages/contracts/src/settings.ts index 7ba267b1e72..cd4417670ba 100644 --- a/packages/contracts/src/settings.ts +++ b/packages/contracts/src/settings.ts @@ -386,6 +386,9 @@ export const ServerSettings = Schema.Struct({ }), ), ), + providerFallback: Schema.Struct({ + enabled: Schema.Boolean.pipe(Schema.withDecodingDefault(Effect.succeed(false))), + }).pipe(Schema.withDecodingDefault(Effect.succeed({}))), // Legacy single-instance-per-driver settings. Continues to be the source // of truth until `providerInstances` (below) lands per-driver migration @@ -510,6 +513,11 @@ export const ServerSettingsPatch = Schema.Struct({ newWorktreesStartFromOrigin: Schema.optionalKey(Schema.Boolean), addProjectBaseDirectory: Schema.optionalKey(TrimmedString), textGenerationModelSelection: Schema.optionalKey(ModelSelectionPatch), + providerFallback: Schema.optionalKey( + Schema.Struct({ + enabled: Schema.optionalKey(Schema.Boolean), + }), + ), observability: Schema.optionalKey( Schema.Struct({ otlpTracesUrl: Schema.optionalKey(TrimmedString),