Skip to content

Commit 32c3740

Browse files
committed
metamorphic: add op for crash during Open
1 parent 6665d61 commit 32c3740

File tree

9 files changed

+170
-33
lines changed

9 files changed

+170
-33
lines changed

metamorphic/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
OpDBFlush
2929
OpDBRatchetFormatMajorVersion
3030
OpDBRestart
31+
OpDBCrashDuringOpen
3132
OpDBEstimateDiskUsage
3233
OpIterClose
3334
OpIterFirst
@@ -159,6 +160,7 @@ func DefaultOpConfig() OpConfig {
159160
OpDBFlush: 2,
160161
OpDBRatchetFormatMajorVersion: 1,
161162
OpDBRestart: 2,
163+
OpDBCrashDuringOpen: 1,
162164
OpDBEstimateDiskUsage: 1,
163165
OpIterClose: 5,
164166
OpIterFirst: 100,
@@ -221,6 +223,7 @@ func ReadOpConfig() OpConfig {
221223
OpDBFlush: 0,
222224
OpDBRatchetFormatMajorVersion: 0,
223225
OpDBRestart: 0,
226+
OpDBCrashDuringOpen: 0,
224227
OpDBEstimateDiskUsage: 0,
225228
OpIterClose: 5,
226229
OpIterFirst: 100,
@@ -280,6 +283,7 @@ func WriteOpConfig() OpConfig {
280283
OpDBFlush: 2,
281284
OpDBRatchetFormatMajorVersion: 1,
282285
OpDBRestart: 2,
286+
OpDBCrashDuringOpen: 1,
283287
OpDBEstimateDiskUsage: 1,
284288
OpIterClose: 0,
285289
OpIterFirst: 0,

metamorphic/generator.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ func (g *generator) generate(count uint64) []op {
167167
OpDBDownload: g.dbDownload,
168168
OpDBFlush: g.dbFlush,
169169
OpDBRatchetFormatMajorVersion: g.dbRatchetFormatMajorVersion,
170-
OpDBRestart: g.dbRestart,
170+
OpDBRestart: g.dbRestart(false /* shouldCrashDuringOpen */),
171+
OpDBCrashDuringOpen: g.dbRestart(true /* shouldCrashDuringOpen */),
171172
OpDBEstimateDiskUsage: g.dbEstimateDiskUsage,
172173
OpIterClose: g.randIter(g.iterClose),
173174
OpIterFirst: g.randIter(g.iterFirst),
@@ -465,27 +466,33 @@ func (g *generator) dbRatchetFormatMajorVersion() {
465466
g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers})
466467
}
467468

468-
func (g *generator) dbRestart() {
469-
// Close any live iterators and snapshots, so that we can close the DB
470-
// cleanly.
471-
dbID := g.dbs.rand(g.rng)
472-
for len(g.liveIters) > 0 {
473-
g.randIter(g.iterClose)()
474-
}
475-
for len(g.liveSnapshots) > 0 {
476-
g.snapshotClose()
477-
}
478-
// Close the batches.
479-
for len(g.liveBatches) > 0 {
480-
batchID := g.liveBatches[0]
481-
g.removeBatchFromGenerator(batchID)
482-
g.add(&closeOp{objID: batchID})
483-
}
484-
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
485-
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
486-
len(g.liveReaders), len(g.liveWriters)))
469+
func (g *generator) dbRestart(shouldCrashDuringOpen bool) func() {
470+
return func() {
471+
// Close any live iterators and snapshots, so that we can close the DB
472+
// cleanly.
473+
dbID := g.dbs.rand(g.rng)
474+
for len(g.liveIters) > 0 {
475+
g.randIter(g.iterClose)()
476+
}
477+
for len(g.liveSnapshots) > 0 {
478+
g.snapshotClose()
479+
}
480+
// Close the batches.
481+
for len(g.liveBatches) > 0 {
482+
batchID := g.liveBatches[0]
483+
g.removeBatchFromGenerator(batchID)
484+
g.add(&closeOp{objID: batchID})
485+
}
486+
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
487+
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
488+
len(g.liveReaders), len(g.liveWriters)))
489+
}
490+
if shouldCrashDuringOpen {
491+
g.add(&dbUncleanRestartOp{dbID: dbID})
492+
} else {
493+
g.add(&dbRestartOp{dbID: dbID})
494+
}
487495
}
488-
g.add(&dbRestartOp{dbID: dbID})
489496
}
490497

491498
// maybeSetSnapshotIterBounds must be called whenever creating a new iterator or

metamorphic/key_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,7 @@ func opWrittenKeys(untypedOp op) [][]byte {
883883
case *closeOp:
884884
case *compactOp:
885885
case *dbRestartOp:
886+
case *dbUncleanRestartOp:
886887
case *deleteOp:
887888
return [][]byte{t.key}
888889
case *deleteRangeOp:

metamorphic/meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts .
560560
// multi-instance mode.
561561
testOpts.Opts.WALFailover = nil
562562
} else {
563-
testOpts.Opts.WALFailover.Secondary.FS = opts.FS
563+
testOpts.Opts.WALFailover.Secondary.FS = vfs.NewCrashableMem()
564564
}
565565
}
566566

metamorphic/ops.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,7 +1965,7 @@ type dbRestartOp struct {
19651965
}
19661966

19671967
func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1968-
if err := t.restartDB(o.dbID); err != nil {
1968+
if err := t.restartDB(o.dbID, false /* shouldCrashDuringOpen */); err != nil {
19691969
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
19701970
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
19711971
} else {
@@ -1980,6 +1980,37 @@ func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjec
19801980
func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {}
19811981
func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
19821982

1983+
// dbUncleanRestartOp performs an unclean restart like dbRestartOp, but also
1984+
// starts a concurrent goroutine that calls CrashClone during the Open and uses
1985+
// that clone to do a second Open. This tests crashing during Open with
1986+
// concurrent operations.
1987+
type dbUncleanRestartOp struct {
1988+
dbID objID
1989+
1990+
// affectedObjects is the list of additional objects that are affected by this
1991+
// operation, and which syncObjs() must return so that we don't perform the
1992+
// restart in parallel with other operations to affected objects.
1993+
affectedObjects []objID
1994+
}
1995+
1996+
func (o *dbUncleanRestartOp) run(t *Test, h historyRecorder) {
1997+
if err := t.restartDB(o.dbID, true /* shouldCrashDuringOpen */); err != nil {
1998+
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1999+
h.history.err.Store(errors.Wrap(err, "dbCrashDuringOpenOp"))
2000+
} else {
2001+
h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
2002+
}
2003+
}
2004+
2005+
func (o *dbUncleanRestartOp) formattedString(KeyFormat) string {
2006+
return fmt.Sprintf("%s.RestartWithCrashClone()", o.dbID)
2007+
}
2008+
func (o *dbUncleanRestartOp) receiver() objID { return o.dbID }
2009+
func (o *dbUncleanRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
2010+
2011+
func (o *dbUncleanRestartOp) rewriteKeys(func(UserKey) UserKey) {}
2012+
func (o *dbUncleanRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
2013+
19832014
func formatOps(kf KeyFormat, ops []op) string {
19842015
var buf strings.Builder
19852016
for _, op := range ops {

metamorphic/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,8 +740,9 @@ func RandomOptions(rng *rand.Rand, kf KeyFormat, cfg RandomOptionsCfg) *TestOpti
740740
// maintains a maximum history 120 entries, so the healthy interval
741741
// must not exceed 119x the probe interval.
742742
healthyInterval := scaleDuration(probeInterval, 1.0, 119.0)
743+
newMem := vfs.NewCrashableMem()
743744
opts.WALFailover = &pebble.WALFailoverOptions{
744-
Secondary: wal.Dir{FS: vfs.Default, Dirname: pebble.MakeStoreRelativePath(vfs.Default, "wal_secondary")},
745+
Secondary: wal.Dir{FS: newMem, Dirname: pebble.MakeStoreRelativePath(newMem, "wal_secondary")},
745746
FailoverOptions: wal.FailoverOptions{
746747
PrimaryDirProbeInterval: probeInterval,
747748
HealthyProbeLatencyThreshold: healthyThreshold,

metamorphic/parser.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) {
6262
return &t.dbID, nil, []interface{}{&t.vers}
6363
case *dbRestartOp:
6464
return &t.dbID, nil, nil
65+
case *dbUncleanRestartOp:
66+
return &t.dbID, nil, nil
6567
case *deleteOp:
6668
return &t.writerID, nil, []interface{}{&t.key}
6769
case *deleteRangeOp:
@@ -177,6 +179,7 @@ var methods = map[string]*methodInfo{
177179
"RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag),
178180
"Replicate": makeMethod(replicateOp{}, dbTag),
179181
"Restart": makeMethod(dbRestartOp{}, dbTag),
182+
"RestartWithCrashClone": makeMethod(dbUncleanRestartOp{}, dbTag),
180183
"SeekGE": makeMethod(iterSeekGEOp{}, iterTag),
181184
"SeekLT": makeMethod(iterSeekLTOp{}, iterTag),
182185
"SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag),
@@ -749,6 +752,16 @@ func computeDerivedFields(ops []op) {
749752
}
750753
// Sort so the output is deterministic.
751754
slices.Sort(v.affectedObjects)
755+
case *dbUncleanRestartOp:
756+
// Find all objects that use this db.
757+
v.affectedObjects = nil
758+
for obj, db := range objToDB {
759+
if db == v.dbID {
760+
v.affectedObjects = append(v.affectedObjects, obj)
761+
}
762+
}
763+
// Sort so the output is deterministic.
764+
slices.Sort(v.affectedObjects)
752765
case *ingestOp:
753766
v.derivedDBIDs = make([]objID, len(v.batchIDs))
754767
for i := range v.batchIDs {

metamorphic/test.go

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package metamorphic
77
import (
88
"fmt"
99
"io"
10+
"math/rand/v2"
1011
"os"
1112
"path"
1213
"runtime/debug"
@@ -321,10 +322,12 @@ func (t *Test) minFMV() pebble.FormatMajorVersion {
321322
return minVersion
322323
}
323324

324-
func (t *Test) restartDB(dbID objID) error {
325+
// copy the walls explicity
326+
func (t *Test) restartDB(dbID objID, shouldCrashDuringOpen bool) error {
325327
db := t.getDB(dbID)
326-
// If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
327-
// restart the database (even if we don't revert to synced data).
328+
// If strictFS is not used, no-op since we end up using pebble.NoSync for
329+
// writeOpts. In the case of pebble.NoSync, we can't restart the database
330+
// even if we don't revert to synced data.
328331
if !t.testOpts.strictFS {
329332
return nil
330333
}
@@ -348,15 +351,26 @@ func (t *Test) restartDB(dbID objID) error {
348351
}
349352
}
350353
t.opts.FS = crashFS
354+
var slowFS *errorfs.FS
355+
// If we should crash during Open, inject some latency into the filesystem
356+
// so that the first Open is slow enough for us to capture some arbitrary
357+
// intermediate state.
358+
if shouldCrashDuringOpen {
359+
seed := time.Now().UnixNano()
360+
t.opts.Logger.Infof("seed %d", seed)
361+
mean := time.Duration(rand.IntN(20) + 10*int(time.Millisecond))
362+
t.opts.Logger.Infof("Injecting mean %s of latency with p=%.3f", mean, 1.0)
363+
slowFS = errorfs.Wrap(crashFS,
364+
errorfs.RandomLatency(errorfs.Randomly(1.0, seed), mean, seed, time.Second))
365+
t.opts.FS = slowFS
366+
}
351367
t.opts.WithFSDefaults()
352368
// We want to set the new FS in testOpts too, so they are propagated to the
353369
// TestOptions that were used with metamorphic.New().
354370
t.testOpts.Opts.FS = t.opts.FS
355-
if t.opts.WALFailover != nil {
356-
t.opts.WALFailover.Secondary.FS = t.opts.FS
357-
t.testOpts.Opts.WALFailover.Secondary.FS = t.opts.FS
358-
}
359371

372+
secondOpenDone := make(chan struct{})
373+
firstOpenDone := make(chan struct{})
360374
// TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
361375
// are well defined within the context of retries.
362376
err := t.withRetries(func() (err error) {
@@ -373,15 +387,72 @@ func (t *Test) restartDB(dbID objID) error {
373387
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
374388
}
375389
o := t.finalizeOptions()
390+
fmt.Println("HEREREEERERE shouldCrashDuringOpen", shouldCrashDuringOpen)
391+
if shouldCrashDuringOpen {
392+
go func() {
393+
err = t.simulateCrashDuringOpen(dbID, slowFS, secondOpenDone, firstOpenDone)
394+
}()
395+
if err != nil {
396+
return err
397+
}
398+
}
376399
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
377-
if err != nil {
378-
return err
400+
if shouldCrashDuringOpen {
401+
firstOpenDone <- struct{}{}
379402
}
380403
return err
381404
})
405+
if shouldCrashDuringOpen {
406+
<-secondOpenDone
407+
}
382408
return err
383409
}
384410

411+
func (t *Test) simulateCrashDuringOpen(
412+
dbID objID, slowFS *errorfs.FS, secondOpenDone, firstOpenDone chan struct{},
413+
) error {
414+
fmt.Println("HEREREEERERE simulateCrashDuringOpen")
415+
defer func() { secondOpenDone <- struct{}{} }()
416+
417+
// Wait a bit for the first Open to make some progress.
418+
time.Sleep(30 * time.Millisecond)
419+
420+
// Create a crash clone of the current filesystem state.
421+
dir := t.dir
422+
if len(t.dbs) > 1 {
423+
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
424+
}
425+
crashCloneFS, err := slowFS.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
426+
if err != nil {
427+
return err
428+
}
429+
430+
// After the first Open has completed, close the resulting DB and open the
431+
// second DB.
432+
<-firstOpenDone
433+
err = t.dbs[dbID.slot()-1].Close()
434+
if err != nil {
435+
return err
436+
}
437+
t.opts.FS = crashCloneFS
438+
if t.opts.WALFailover != nil {
439+
fmt.Println("WALFAILOVER")
440+
ccsmemFS := t.opts.WALFailover.Secondary.FS.(*vfs.MemFS)
441+
crashCloneSecondaryFS := ccsmemFS.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
442+
t.testOpts.Opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
443+
t.opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
444+
fmt.Println("[HERE] crashCloneFS", crashCloneFS.String())
445+
fmt.Println("[HERE] crashCloneSecondaryFS", crashCloneSecondaryFS.String())
446+
}
447+
// Create a copy of options for the second DB.
448+
o := t.finalizeOptions()
449+
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
450+
if err != nil {
451+
return err
452+
}
453+
return nil
454+
}
455+
385456
func (t *Test) saveInMemoryDataInternal() error {
386457
if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
387458
// t.opts.FS is an in-memory system; copy it to disk.

vfs/errorfs/errorfs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,15 @@ func (fs *FS) Stat(name string) (vfs.FileInfo, error) {
457457
return fs.fs.Stat(name)
458458
}
459459

460+
// CrashClone implements MemFS.CrashClone.
461+
func (fs *FS) CrashClone(cfg vfs.CrashCloneCfg) (*vfs.MemFS, error) {
462+
memFs, ok := fs.fs.(*vfs.MemFS)
463+
if !ok {
464+
return nil, errors.New("not a MemFS")
465+
}
466+
return memFs.CrashClone(cfg), nil
467+
}
468+
460469
// errorFile implements vfs.File. The interface is implemented on the pointer
461470
// type to allow pointer equality comparisons.
462471
type errorFile struct {

0 commit comments

Comments
 (0)