Skip to content

Commit 3d7dfaa

Browse files
authored
fix(auth): coordinate credential refresh across processes and guard logins (#235)
* fix(auth): coordinate credential refresh across processes and guard logins Multiple panda servers on one host share a single credentials file (the path is keyed only by issuer+client+resource). Each runs a background refresh, and because the provider rotates refresh tokens (issuing a new one and revoking the old), concurrent refreshes revoke each other and produce invalid_grant storms. Separately, a login that returns no refresh token (e.g. an outdated device flow without offline_access) silently overwrote a working credential and the restarted server then expired at the next access-token expiry. - store.refresh now coordinates across processes with a non-blocking flock: the winner reloads and refreshes; losers reuse the on-disk token and never replay a revoked one. - Save/Clear take the lock too (blocking up to WriteLockWait, fail-closed with ErrCredentialBusy on contention) so logins/logouts serialize with refreshes. - Save refuses to replace a refreshable credential with one that cannot refresh (ErrCredentialDowngrade), failing closed on read errors. - Credentials are written atomically (temp + sync + rename). - Added structured logging for refresh/rotation/contention/invalid_grant and a login warning when no refresh token is issued. * fix(auth): correct offline_access messaging and show refresh-token detail in status The 'upgrade panda' advice on a missing refresh token was wrong: this version already requests offline_access, so a missing refresh token is the provider not granting it. Reword the login warning and downgrade error accordingly. panda auth status now reports whether a refresh token is present and when it was last rotated, and a new --verify flag actively tests the refresh token against the provider (rotating it). * fix(auth): raise credential write-lock wait above the refresh HTTP timeout A refresh holds the file lock across its token request (capped at 30s by the auth client), so a 5s wait made login/logout fail spuriously during a slow refresh. Wait 35s so an interactive write rides through a slow-but- valid refresh and only fails when one is genuinely stuck.
1 parent 57f865d commit 3d7dfaa

7 files changed

Lines changed: 701 additions & 13 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
github.com/spf13/cobra v1.10.2
2828
github.com/stretchr/testify v1.11.1
2929
github.com/testcontainers/testcontainers-go v0.42.0
30+
golang.org/x/sys v0.45.0
3031
golang.org/x/time v0.15.0
3132
google.golang.org/protobuf v1.36.11
3233
gopkg.in/yaml.v3 v3.0.1
@@ -131,7 +132,6 @@ require (
131132
golang.org/x/net v0.55.0 // indirect
132133
golang.org/x/oauth2 v0.36.0 // indirect
133134
golang.org/x/sync v0.20.0 // indirect
134-
golang.org/x/sys v0.45.0 // indirect
135135
golang.org/x/telemetry v0.0.0-20260421165255-392afab6f40e // indirect
136136
golang.org/x/term v0.43.0 // indirect
137137
golang.org/x/text v0.37.0 // indirect

pkg/auth/store/lock_other.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
//go:build !unix || aix
2+
3+
package store
4+
5+
// tryFileLock is a no-op on platforms without flock(2). Process-local
6+
// serialization via refreshMu still applies; cross-process coordination is
7+
// simply unavailable, matching the historical behaviour.
8+
func tryFileLock(_ string) (release func(), acquired bool, err error) {
9+
return func() {}, true, nil
10+
}

pkg/auth/store/lock_test.go

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
//go:build unix && !aix
2+
3+
package store
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"errors"
9+
"fmt"
10+
"os"
11+
"path/filepath"
12+
"sync"
13+
"sync/atomic"
14+
"testing"
15+
"time"
16+
17+
"github.com/sirupsen/logrus"
18+
19+
authclient "github.com/ethpandaops/panda/pkg/auth/client"
20+
)
21+
22+
func TestTryFileLockIsMutuallyExclusive(t *testing.T) {
23+
t.Parallel()
24+
25+
path := filepath.Join(t.TempDir(), "creds.json")
26+
27+
release1, ok1, err1 := tryFileLock(path)
28+
if err1 != nil || !ok1 {
29+
t.Fatalf("first lock should be acquired: ok=%v err=%v", ok1, err1)
30+
}
31+
32+
if _, ok2, _ := tryFileLock(path); ok2 {
33+
t.Fatal("second lock should fail while the first is held")
34+
}
35+
36+
release1()
37+
38+
release3, ok3, err3 := tryFileLock(path)
39+
if err3 != nil || !ok3 {
40+
t.Fatalf("lock should be acquirable after release: ok=%v err=%v", ok3, err3)
41+
}
42+
release3()
43+
}
44+
45+
// TestSaveAndClearFailClosedWhileLockHeld verifies that an interactive write
46+
// refuses to proceed (rather than clobber a rotation) while another process
47+
// holds the credentials lock, and succeeds once it is released.
48+
func TestSaveAndClearFailClosedWhileLockHeld(t *testing.T) {
49+
t.Parallel()
50+
51+
path := filepath.Join(t.TempDir(), "creds.json")
52+
st := New(logrus.New(), Config{
53+
Path: path,
54+
WriteLockWait: 100 * time.Millisecond,
55+
}).(*store)
56+
57+
// Simulate a refresh in another process holding the lock.
58+
release, ok, err := tryFileLock(path)
59+
if err != nil || !ok {
60+
t.Fatalf("failed to take lock: ok=%v err=%v", ok, err)
61+
}
62+
63+
tokens := &authclient.Tokens{
64+
AccessToken: "access",
65+
RefreshToken: "refresh",
66+
ExpiresAt: time.Now().Add(time.Hour),
67+
}
68+
69+
if err := st.Save(tokens); !errors.Is(err, ErrCredentialBusy) {
70+
t.Fatalf("expected ErrCredentialBusy while lock held, got %v", err)
71+
}
72+
73+
if err := st.Clear(); !errors.Is(err, ErrCredentialBusy) {
74+
t.Fatalf("expected ErrCredentialBusy from Clear while lock held, got %v", err)
75+
}
76+
77+
release()
78+
79+
if err := st.Save(tokens); err != nil {
80+
t.Fatalf("Save should succeed after the lock is released: %v", err)
81+
}
82+
}
83+
84+
// TestRefreshReusesTokenRotatedByAnotherProcess verifies the reload-recheck
85+
// step: if another process rotated the on-disk token after we decided to
86+
// refresh but before we acquired the lock, we reuse that token instead of
87+
// refreshing again (which would present a token the other process revoked).
88+
func TestRefreshReusesTokenRotatedByAnotherProcess(t *testing.T) {
89+
t.Parallel()
90+
91+
path := filepath.Join(t.TempDir(), "creds.json")
92+
client := &countingAuthClient{}
93+
store := New(logrus.New(), Config{
94+
Path: path,
95+
AuthClient: client,
96+
RefreshBuffer: 5 * time.Minute,
97+
}).(*store)
98+
99+
// In-memory token is due for refresh (inside the buffer).
100+
store.tokens = &authclient.Tokens{
101+
AccessToken: "stale",
102+
RefreshToken: "stale-rt",
103+
ExpiresAt: time.Now().Add(time.Minute),
104+
}
105+
106+
// Another process has already written a fresh token to the shared file.
107+
writeTokens(t, path, &authclient.Tokens{
108+
AccessToken: "fresh",
109+
RefreshToken: "fresh-rt",
110+
ExpiresIn: 3600,
111+
ExpiresAt: time.Now().Add(time.Hour),
112+
})
113+
114+
token, err := store.GetAccessToken()
115+
if err != nil {
116+
t.Fatalf("GetAccessToken returned error: %v", err)
117+
}
118+
119+
if token != "fresh" {
120+
t.Fatalf("expected the token written by another process, got %q", token)
121+
}
122+
123+
if client.refreshCalls.Load() != 0 {
124+
t.Fatalf("expected no provider refresh, got %d", client.refreshCalls.Load())
125+
}
126+
}
127+
128+
// TestSharedCredentialRefreshNeverReusesRevokedToken hammers several stores
129+
// that share one credentials file against a provider that rotates and revokes
130+
// like authentik. The file lock must ensure no store ever presents a
131+
// rotated-away token (which would return invalid_grant).
132+
func TestSharedCredentialRefreshNeverReusesRevokedToken(t *testing.T) {
133+
t.Parallel()
134+
135+
path := filepath.Join(t.TempDir(), "creds.json")
136+
client := newRotatingAuthClient("rt-0")
137+
writeTokens(t, path, &authclient.Tokens{
138+
AccessToken: "at-0",
139+
RefreshToken: "rt-0",
140+
ExpiresIn: 3600,
141+
ExpiresAt: time.Now().Add(time.Hour),
142+
RefreshTokenIssuedAt: time.Now(),
143+
})
144+
145+
const (
146+
stores = 4
147+
iterations = 50
148+
)
149+
150+
newSharedStore := func() *store {
151+
return New(logrus.New(), Config{
152+
Path: path,
153+
AuthClient: client,
154+
RefreshBuffer: 5 * time.Minute,
155+
RefreshTokenTTL: time.Nanosecond, // always past half-life: forces a refresh every call
156+
}).(*store)
157+
}
158+
159+
start := make(chan struct{})
160+
161+
var wg sync.WaitGroup
162+
163+
wg.Add(stores)
164+
165+
for range stores {
166+
s := newSharedStore()
167+
168+
go func() {
169+
defer wg.Done()
170+
171+
<-start // release all goroutines together to maximise contention
172+
173+
for range iterations {
174+
token, err := s.GetAccessToken()
175+
if err != nil {
176+
t.Errorf("GetAccessToken returned error: %v", err)
177+
178+
return
179+
}
180+
181+
if token == "" {
182+
t.Error("GetAccessToken returned an empty token")
183+
184+
return
185+
}
186+
}
187+
}()
188+
}
189+
190+
close(start)
191+
wg.Wait()
192+
193+
if got := client.invalidGrants.Load(); got != 0 {
194+
t.Fatalf("expected zero invalid_grant (revoked-token reuse), got %d", got)
195+
}
196+
197+
if got := client.refreshCalls.Load(); got == 0 {
198+
t.Fatal("expected at least one provider refresh")
199+
}
200+
201+
// The shared credential must end in a usable, authenticated state.
202+
final := New(logrus.New(), Config{Path: path, AuthClient: client}).(*store)
203+
if !final.IsAuthenticated() {
204+
t.Fatal("shared credential is not authenticated after concurrent refreshes")
205+
}
206+
}
207+
208+
func writeTokens(t *testing.T, path string, tokens *authclient.Tokens) {
209+
t.Helper()
210+
211+
data, err := json.MarshalIndent(tokens, "", " ")
212+
if err != nil {
213+
t.Fatalf("marshaling tokens: %v", err)
214+
}
215+
216+
if err := os.WriteFile(path, data, 0o600); err != nil {
217+
t.Fatalf("writing tokens: %v", err)
218+
}
219+
}
220+
221+
// rotatingAuthClient models authentik: each refresh issues a new refresh token
222+
// and revokes the presented one. Presenting a revoked token returns
223+
// invalid_grant.
224+
type rotatingAuthClient struct {
225+
mu sync.Mutex
226+
counter int
227+
valid map[string]struct{}
228+
refreshCalls atomic.Int64
229+
invalidGrants atomic.Int64
230+
}
231+
232+
func newRotatingAuthClient(initial string) *rotatingAuthClient {
233+
return &rotatingAuthClient{valid: map[string]struct{}{initial: {}}}
234+
}
235+
236+
func (c *rotatingAuthClient) Login(_ context.Context) (*authclient.Tokens, error) {
237+
return nil, errors.New("not implemented")
238+
}
239+
240+
func (c *rotatingAuthClient) ClientCredentials(_ context.Context) (*authclient.Tokens, error) {
241+
return nil, errors.New("not implemented")
242+
}
243+
244+
func (c *rotatingAuthClient) Refresh(_ context.Context, refreshToken string) (*authclient.Tokens, error) {
245+
c.refreshCalls.Add(1)
246+
247+
c.mu.Lock()
248+
defer c.mu.Unlock()
249+
250+
if _, ok := c.valid[refreshToken]; !ok {
251+
c.invalidGrants.Add(1)
252+
253+
return nil, fmt.Errorf("token endpoint returned status 400: {\"error\": \"invalid_grant\"}")
254+
}
255+
256+
delete(c.valid, refreshToken)
257+
c.counter++
258+
next := fmt.Sprintf("rt-%d", c.counter)
259+
c.valid[next] = struct{}{}
260+
261+
return &authclient.Tokens{
262+
AccessToken: fmt.Sprintf("at-%d", c.counter),
263+
RefreshToken: next,
264+
TokenType: "Bearer",
265+
ExpiresIn: 3600,
266+
ExpiresAt: time.Now().Add(time.Hour),
267+
RefreshTokenIssuedAt: time.Now(),
268+
}, nil
269+
}

pkg/auth/store/lock_unix.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//go:build unix && !aix
2+
3+
package store
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"os"
9+
"path/filepath"
10+
11+
"golang.org/x/sys/unix"
12+
)
13+
14+
// tryFileLock attempts a non-blocking exclusive lock on path+".lock" so that
15+
// only one process drives a credential refresh at a time when several panda
16+
// processes share one credentials file.
17+
//
18+
// It returns (release, true, nil) when this process holds the lock,
19+
// (nil, false, nil) when another process currently holds it, and
20+
// (no-op release, true, err) when the lock file itself cannot be created — in
21+
// which case the caller proceeds relying on process-local serialization, so a
22+
// single process is never blocked by lock-file errors.
23+
//
24+
// The lock is advisory and released automatically by the OS when the holding
25+
// process exits, so a crashed holder never leaves a stale lock.
26+
func tryFileLock(path string) (release func(), acquired bool, err error) {
27+
lockPath := path + ".lock"
28+
29+
if mkErr := os.MkdirAll(filepath.Dir(lockPath), 0o700); mkErr != nil {
30+
return func() {}, true, fmt.Errorf("creating lock directory: %w", mkErr)
31+
}
32+
33+
f, openErr := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o600)
34+
if openErr != nil {
35+
return func() {}, true, fmt.Errorf("opening lock file: %w", openErr)
36+
}
37+
38+
if flockErr := unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB); flockErr != nil {
39+
// EWOULDBLOCK/EAGAIN is genuine contention: another process holds the
40+
// lock, so back off and reuse what it writes. Any other flock error is
41+
// unexpected — fail open and refresh without cross-process coordination
42+
// rather than risk being unable to refresh at all.
43+
if errors.Is(flockErr, unix.EWOULDBLOCK) || errors.Is(flockErr, unix.EAGAIN) {
44+
_ = f.Close()
45+
46+
return nil, false, nil
47+
}
48+
49+
return func() { _ = f.Close() }, true, fmt.Errorf("locking credentials file: %w", flockErr)
50+
}
51+
52+
return func() {
53+
_ = unix.Flock(int(f.Fd()), unix.LOCK_UN)
54+
_ = f.Close()
55+
}, true, nil
56+
}

0 commit comments

Comments
 (0)