-
Notifications
You must be signed in to change notification settings - Fork 563
Expand file tree
/
Copy pathclose_test.go
More file actions
154 lines (132 loc) · 4.69 KB
/
Copy pathclose_test.go
File metadata and controls
154 lines (132 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2026 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"io"
"sync"
"testing"
"time"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
// TestCloseWithBlockedRemoteIO verifies that DB.Close() completes promptly when
// background operations (table stats loading, compactions) are blocked on
// remote storage I/O. The test creates a remote storage implementation that
// blocks reads until the context is cancelled, ingests external files backed by
// it, then closes the DB and verifies it doesn't hang.
func TestCloseWithBlockedRemoteIO(t *testing.T) {
storage := newBlockingRemoteStorage()
mem := vfs.NewMem()
opts := &Options{
FS: mem,
FormatMajorVersion: FormatNewest,
L0CompactionThreshold: 100,
L0StopWritesThreshold: 100,
}
opts.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
remote.MakeLocator("blocking"): storage,
})
d, err := Open("", opts)
require.NoError(t, err)
// Write an SST into the blocking remote storage.
writeOpts := d.opts.MakeWriterOptions(0, d.TableFormat())
obj, err := storage.inner.CreateObject("ext1")
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewRemoteWritable(obj), writeOpts)
require.NoError(t, w.Set([]byte("a"), []byte("val-a")))
require.NoError(t, w.Set([]byte("z"), []byte("val-z")))
require.NoError(t, w.Close())
sz, err := storage.inner.Size("ext1")
require.NoError(t, err)
// Ingest the external file. This triggers table stats loading in the
// background, which will try to read from the blocking storage.
_, err = d.IngestExternalFiles(context.Background(), []ExternalFile{{
Locator: remote.MakeLocator("blocking"),
ObjName: "ext1",
Size: uint64(sz),
StartKey: []byte("a"),
EndKey: []byte("z"),
EndKeyIsInclusive: true,
HasPointKey: true,
}})
require.NoError(t, err)
// Enable blocking on reads. Any background operation that tries to read
// from the remote storage will now block until the context is cancelled.
storage.block()
// Close should cancel bgCtx, which unblocks any in-progress remote reads,
// allowing background goroutines to finish.
done := make(chan error, 1)
go func() {
done <- d.Close()
}()
select {
case err := <-done:
require.NoError(t, err)
case <-time.After(10 * time.Second):
t.Fatal("DB.Close() hung; background operations likely blocked on remote I/O")
}
}
// blockingRemoteStorage wraps an in-memory remote.Storage. When blocking is
// enabled, all ReadAt calls block until the context is cancelled.
type blockingRemoteStorage struct {
inner remote.Storage
mu sync.Mutex
blocking bool
blockCh chan struct{} // closed when blocking is enabled
}
func newBlockingRemoteStorage() *blockingRemoteStorage {
return &blockingRemoteStorage{
inner: remote.NewInMem(),
blockCh: make(chan struct{}),
}
}
func (s *blockingRemoteStorage) block() {
s.mu.Lock()
defer s.mu.Unlock()
s.blocking = true
close(s.blockCh)
}
func (s *blockingRemoteStorage) isBlocking() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.blocking
}
func (s *blockingRemoteStorage) Close() error { return s.inner.Close() }
func (s *blockingRemoteStorage) CreateObject(name string) (io.WriteCloser, error) {
return s.inner.CreateObject(name)
}
func (s *blockingRemoteStorage) List(prefix, delimiter string) ([]string, error) {
return s.inner.List(prefix, delimiter)
}
func (s *blockingRemoteStorage) Delete(name string) error { return s.inner.Delete(name) }
func (s *blockingRemoteStorage) Size(name string) (int64, error) { return s.inner.Size(name) }
func (s *blockingRemoteStorage) IsNotExistError(err error) bool { return s.inner.IsNotExistError(err) }
func (s *blockingRemoteStorage) ReadObject(
ctx context.Context, objName string,
) (_ remote.ObjectReader, objSize int64, _ error) {
reader, size, err := s.inner.ReadObject(ctx, objName)
if err != nil {
return nil, 0, err
}
return &blockingObjectReader{inner: reader, storage: s}, size, nil
}
type blockingObjectReader struct {
inner remote.ObjectReader
storage *blockingRemoteStorage
}
func (r *blockingObjectReader) ReadAt(ctx context.Context, p []byte, offset int64) error {
if r.storage.isBlocking() {
// Block until the context is cancelled.
<-ctx.Done()
return ctx.Err()
}
return r.inner.ReadAt(ctx, p, offset)
}
func (r *blockingObjectReader) Close() error {
return r.inner.Close()
}