Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions apps/mobile/src/connection/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ const capabilitiesLayer = Layer.succeedContext(
}),
),
);
if (token === null) {
return yield* new ConnectionBlockedError({
reason: "authentication",
detail: "The T3 Cloud session is unavailable.",
});
}
return token;
return yield* Option.match(token, {
onNone: () =>
Effect.fail(
new ConnectionBlockedError({
reason: "authentication",
detail: "The T3 Cloud session is unavailable.",
}),
),
onSome: Effect.succeed,
});
}),
}),
).pipe(
Expand Down
13 changes: 11 additions & 2 deletions apps/mobile/src/features/cloud/CloudAuthProvider.tsx
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { ClerkProvider, useAuth } from "@clerk/expo";
import { tokenCache } from "@clerk/expo/token-cache";
import { ManagedRelay, setManagedRelaySession } from "@t3tools/client-runtime/relay";
import {
ManagedRelay,
ManagedRelaySessionTokenReadError,
setManagedRelaySession,
} from "@t3tools/client-runtime/relay";
import {
reportAtomCommandResult,
settleAsyncResult,
settlePromise,
} from "@t3tools/client-runtime/state/runtime";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import { type ReactNode, useEffect, useRef } from "react";

import { environmentCatalog } from "../../connection/catalog";
Expand Down Expand Up @@ -39,7 +44,11 @@ export function activateCloudRelayAccount(
setAgentAwarenessRelayTokenProvider(tokenProvider, accountId);
setManagedRelaySession(appAtomRegistry, {
accountId,
readClerkToken: tokenProvider,
readClerkToken: () =>
Effect.tryPromise({
try: tokenProvider,
catch: (cause) => new ManagedRelaySessionTokenReadError({ cause }),
}).pipe(Effect.map(Option.fromNullOr)),
});
}

Expand Down
4 changes: 3 additions & 1 deletion apps/web/src/cloud/managedAuth.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { managedRelaySessionAtom, setManagedRelaySession } from "@t3tools/client-runtime/relay";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import { afterEach, describe, expect, it, vi } from "vite-plus/test";

import { appAtomRegistry } from "../rpc/atomRegistry";
Expand Down Expand Up @@ -45,7 +47,7 @@ describe("managed relay authentication", () => {
it("replaces an existing account session atomically", () => {
setManagedRelaySession(appAtomRegistry, {
accountId: "account-1",
readClerkToken: async () => "account-1-token",
readClerkToken: () => Effect.succeed(Option.some("account-1-token")),
});

activateManagedRelayAuthentication("account-2", async () => "account-2-token");
Expand Down
13 changes: 11 additions & 2 deletions apps/web/src/cloud/managedAuth.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { useAuth } from "@clerk/react";
import { ManagedRelay, setManagedRelaySession } from "@t3tools/client-runtime/relay";
import {
ManagedRelay,
ManagedRelaySessionTokenReadError,
setManagedRelaySession,
} from "@t3tools/client-runtime/relay";
import {
reportAtomCommandResult,
settleAsyncResult,
settlePromise,
} from "@t3tools/client-runtime/state/runtime";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import { useEffect, useRef, type ReactNode } from "react";

import { environmentCatalog } from "../connection/catalog";
Expand All @@ -32,7 +37,11 @@ export function activateManagedRelayAuthentication(
relayTokenProvider = readClerkToken;
setManagedRelaySession(appAtomRegistry, {
accountId,
readClerkToken,
readClerkToken: () =>
Effect.tryPromise({
try: readClerkToken,
catch: (cause) => new ManagedRelaySessionTokenReadError({ cause }),
}).pipe(Effect.map(Option.fromNullOr)),
});
}

Expand Down
17 changes: 10 additions & 7 deletions apps/web/src/connection/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ const capabilitiesLayer = Layer.effectContext(
}),
),
);
if (token === null) {
return yield* new ConnectionBlockedError({
reason: "authentication",
detail: "The T3 Cloud session is unavailable.",
});
}
return token;
return yield* Option.match(token, {
onNone: () =>
Effect.fail(
new ConnectionBlockedError({
reason: "authentication",
detail: "The T3 Cloud session is unavailable.",
}),
),
onSome: Effect.succeed,
});
}),
});
const identity = RelayDeviceIdentity.of({
Expand Down
90 changes: 56 additions & 34 deletions packages/client-runtime/src/relay/managedRelayState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import type {
RelayClientEnvironmentRecord,
RelayEnvironmentStatusResponse,
} from "@t3tools/contracts/relay";
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, expect, it } from "@effect/vitest";
import * as Deferred from "effect/Deferred";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Stream from "effect/Stream";
import * as TestClock from "effect/testing/TestClock";
import { Atom, AtomRegistry } from "effect/unstable/reactivity";
import { afterEach, vi } from "vite-plus/test";

Expand Down Expand Up @@ -96,7 +100,7 @@ function createManager(
function setSession() {
setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: () => Promise.resolve("clerk-token"),
readClerkToken: () => Effect.succeed(Option.some("clerk-token")),
});
}

Expand All @@ -115,20 +119,17 @@ describe("createManagedRelayQueryManager", () => {

setSession();

expect(yield* Fiber.join(tokenFiber)).toBe("clerk-token");
expect(registry.getNodes().get(managedRelaySessionAtom)?.listeners.size).toBe(0);
assert.equal(yield* Fiber.join(tokenFiber), "clerk-token");
assert.equal(registry.getNodes().get(managedRelaySessionAtom)?.listeners.size, 0);
}),
);

it.effect("deduplicates concurrent Clerk token reads and reuses the token until JWT expiry", () =>
Effect.gen(function* () {
const token = clerkToken(4_102_444_800);
let resolveToken!: (value: string) => void;
const readClerkToken = vi.fn(
() =>
new Promise<string>((resolve) => {
resolveToken = resolve;
}),
const tokenDeferred = yield* Deferred.make<string>();
const readClerkToken = vi.fn(() =>
Deferred.await(tokenDeferred).pipe(Effect.map(Option.some)),
);
const session = createManagedRelaySession({
accountId: "account-1",
Expand All @@ -139,68 +140,89 @@ describe("createManagedRelayQueryManager", () => {
concurrency: "unbounded",
}).pipe(Effect.forkChild);
yield* Effect.yieldNow;
expect(readClerkToken).toHaveBeenCalledTimes(1);
assert.equal(readClerkToken.mock.calls.length, 1);

resolveToken(token);
expect(yield* Fiber.join(readsFiber)).toEqual([token, token]);
expect(yield* session.readClerkToken()).toBe(token);
expect(readClerkToken).toHaveBeenCalledTimes(1);
yield* Deferred.succeed(tokenDeferred, token);
assert.deepEqual(yield* Fiber.join(readsFiber), [Option.some(token), Option.some(token)]);
assert.equal(Option.getOrThrow(yield* session.readClerkToken()), token);
assert.equal(readClerkToken.mock.calls.length, 1);
}),
);

it.effect("refreshes the cached Clerk token after JWT expiry using TestClock", () =>
Effect.gen(function* () {
const firstToken = clerkToken(10);
const secondToken = clerkToken(30);
const tokens = [firstToken, secondToken];
const readClerkToken = vi.fn(() =>
Effect.sync(() => Option.some(tokens.shift() ?? "unexpected-token")),
);
const session = createManagedRelaySession({
accountId: "account-1",
readClerkToken,
});

assert.equal(Option.getOrThrow(yield* session.readClerkToken()), firstToken);
assert.equal(Option.getOrThrow(yield* session.readClerkToken()), firstToken);
assert.equal(readClerkToken.mock.calls.length, 1);

yield* TestClock.adjust(Duration.seconds(6));

assert.equal(Option.getOrThrow(yield* session.readClerkToken()), secondToken);
assert.equal(readClerkToken.mock.calls.length, 2);
}).pipe(Effect.provide(TestClock.layer())),
);

it.effect("updates the token provider without replacing a same-account session", () =>
Effect.gen(function* () {
const firstRead = vi.fn(() => Promise.resolve<string | null>(null));
const firstRead = vi.fn(() => Effect.succeed(Option.none<string>()));
setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: firstRead,
});
const firstSession = registry.get(managedRelaySessionAtom);
expect(firstSession).not.toBeNull();
expect(yield* firstSession!.readClerkToken()).toBeNull();
assert.isTrue(Option.isNone(yield* firstSession!.readClerkToken()));

const secondRead = vi.fn(() => Promise.resolve<string | null>("refreshed-token"));
const secondRead = vi.fn(() => Effect.succeed(Option.some("refreshed-token")));
setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: secondRead,
});

expect(registry.get(managedRelaySessionAtom)).toBe(firstSession);
expect(yield* firstSession!.readClerkToken()).toBe("refreshed-token");
expect(firstRead).toHaveBeenCalledTimes(1);
expect(secondRead).toHaveBeenCalledTimes(1);
assert.equal(Option.getOrThrow(yield* firstSession!.readClerkToken()), "refreshed-token");
assert.equal(firstRead.mock.calls.length, 1);
assert.equal(secondRead.mock.calls.length, 1);
}),
);

it.effect("does not pin a refreshed session to an older pending token read", () =>
Effect.gen(function* () {
let resolveFirst!: (token: string) => void;
const firstTokenDeferred = yield* Deferred.make<string>();
setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: () =>
new Promise<string>((resolve) => {
resolveFirst = resolve;
}),
readClerkToken: () => Deferred.await(firstTokenDeferred).pipe(Effect.map(Option.some)),
});
const session = registry.get(managedRelaySessionAtom);
const firstRead = yield* session!.readClerkToken().pipe(Effect.forkChild);
yield* Effect.yieldNow;

setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: () => Promise.resolve("refreshed-token"),
readClerkToken: () => Effect.succeed(Option.some("refreshed-token")),
});

expect(yield* session!.readClerkToken()).toBe("refreshed-token");
resolveFirst("older-token");
expect(yield* Fiber.join(firstRead)).toBe("older-token");
assert.equal(Option.getOrThrow(yield* session!.readClerkToken()), "refreshed-token");
yield* Deferred.succeed(firstTokenDeferred, "older-token");
assert.equal(Option.getOrThrow(yield* Fiber.join(firstRead)), "older-token");
}),
);

it("emits credential changes only when the managed relay account changes", async () => {
setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: () => Promise.resolve("first-token"),
readClerkToken: () => Effect.succeed(Option.some("first-token")),
});
const changes = Effect.runPromise(
managedRelayAccountChanges(registry).pipe(Stream.take(2), Stream.runCollect),
Expand All @@ -211,11 +233,11 @@ describe("createManagedRelayQueryManager", () => {

setManagedRelaySession(registry, {
accountId: "account-1",
readClerkToken: () => Promise.resolve("refreshed-token"),
readClerkToken: () => Effect.succeed(Option.some("refreshed-token")),
});
setManagedRelaySession(registry, {
accountId: "account-2",
readClerkToken: () => Promise.resolve("second-token"),
readClerkToken: () => Effect.succeed(Option.some("second-token")),
});
setManagedRelaySession(registry, null);

Expand All @@ -234,7 +256,7 @@ describe("createManagedRelayQueryManager", () => {
},
} satisfies RelayClientEnvironmentRecord;
const token = clerkToken(4_102_444_800);
const readClerkToken = vi.fn(() => Promise.resolve(token));
const readClerkToken = vi.fn(() => Effect.succeed(Option.some(token)));
const manager = createManager({
listEnvironments: () => Effect.succeed([environment, secondEnvironment]),
getEnvironmentStatus: ({ environmentId }) => {
Expand Down
Loading
Loading