Skip to content

Commit a62e689

Browse files
committed
record: add sync offsets to manifest writer
Add sync offsets to manifest writer to enable read ahead and detect corruption.
1 parent c70ddd4 commit a62e689

File tree

12 files changed

+189
-78
lines changed

12 files changed

+189
-78
lines changed

checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ func (d *DB) writeCheckpointManifest(
494494
// append a record after a raw data copy; see
495495
// https://github.com/cockroachdb/cockroach/issues/100935).
496496
r := record.NewReader(&io.LimitedReader{R: src, N: manifestSize}, manifestFileNum)
497-
w := record.NewWriter(dst)
497+
w := record.NewWriter(dst, manifestFileNum, d.FormatMajorVersion() >= FormatManifestSyncChunks)
498498
for {
499499
rr, err := r.Next()
500500
if err != nil {

format_major_version.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,10 @@ const (
226226
// once stable.
227227
FormatExperimentalValueSeparation
228228

229+
// FormatManifestSyncChunks is a format major version enabling the writing of
230+
// sync offset chunks for Manifest files. See comment for FormatWALSyncChunks.
231+
FormatManifestSyncChunks
232+
229233
// -- Add new versions here --
230234

231235
// FormatNewest is the most recent format major version.
@@ -266,7 +270,7 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
266270
return sstable.TableFormatPebblev4
267271
case FormatColumnarBlocks, FormatWALSyncChunks:
268272
return sstable.TableFormatPebblev5
269-
case FormatTableFormatV6, FormatExperimentalValueSeparation:
273+
case FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
270274
return sstable.TableFormatPebblev6
271275
default:
272276
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
@@ -280,7 +284,7 @@ func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
280284
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
281285
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
282286
FormatFlushableIngestExcises, FormatColumnarBlocks, FormatWALSyncChunks,
283-
FormatTableFormatV6, FormatExperimentalValueSeparation:
287+
FormatTableFormatV6, FormatExperimentalValueSeparation, FormatManifestSyncChunks:
284288
return sstable.TableFormatPebblev1
285289
default:
286290
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
@@ -332,6 +336,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
332336
FormatExperimentalValueSeparation: func(d *DB) error {
333337
return d.finalizeFormatVersUpgrade(FormatExperimentalValueSeparation)
334338
},
339+
FormatManifestSyncChunks: func(d *DB) error {
340+
return d.finalizeFormatVersUpgrade(FormatManifestSyncChunks)
341+
},
335342
}
336343

337344
const formatVersionMarkerName = `format-version`

format_major_version_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
2929
require.Equal(t, FormatWALSyncChunks, FormatMajorVersion(20))
3030
require.Equal(t, FormatTableFormatV6, FormatMajorVersion(21))
3131
require.Equal(t, FormatExperimentalValueSeparation, FormatMajorVersion(22))
32+
require.Equal(t, FormatManifestSyncChunks, FormatMajorVersion(23))
3233

3334
// When we add a new version, we should add a check for the new version in
3435
// addition to updating these expected values.
3536
require.Equal(t, FormatNewest, FormatMajorVersion(21))
36-
require.Equal(t, internalFormatNewest, FormatMajorVersion(22))
37+
require.Equal(t, internalFormatNewest, FormatMajorVersion(23))
3738
}
3839

3940
func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
@@ -70,6 +71,8 @@ func TestRatchetFormat(t *testing.T) {
7071
require.Equal(t, FormatTableFormatV6, d.FormatMajorVersion())
7172
require.NoError(t, d.RatchetFormatMajorVersion(FormatExperimentalValueSeparation))
7273
require.Equal(t, FormatExperimentalValueSeparation, d.FormatMajorVersion())
74+
require.NoError(t, d.RatchetFormatMajorVersion(FormatManifestSyncChunks))
75+
require.Equal(t, FormatManifestSyncChunks, d.FormatMajorVersion())
7376

7477
require.NoError(t, d.Close())
7578

@@ -230,6 +233,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
230233
FormatWALSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev5},
231234
FormatTableFormatV6: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
232235
FormatExperimentalValueSeparation: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
236+
FormatManifestSyncChunks: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev6},
233237
}
234238

235239
// Valid versions.

objstorage/objstorageprovider/remoteobjcat/catalog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
339339
if err != nil {
340340
return err
341341
}
342-
recWriter := record.NewWriter(file)
342+
recWriter := record.NewWriter(file, 0, false)
343343
err = func() error {
344344
// Create a VersionEdit that gets us from an empty catalog to the current state.
345345
var ve VersionEdit

open_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func TestNewDBFilenames(t *testing.T) {
333333
"LOCK",
334334
"MANIFEST-000001",
335335
"OPTIONS-000003",
336-
"marker.format-version.000009.022",
336+
"marker.format-version.000010.023",
337337
"marker.manifest.000001.MANIFEST-000001",
338338
},
339339
}

record/record.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -692,10 +692,22 @@ type Writer struct {
692692
err error
693693
// buf is the buffer.
694694
buf [blockSize]byte
695+
696+
// logNum is the log number of the log.
697+
logNum base.DiskFileNum
698+
699+
// fillHeader fills in the header for the pending chunk.
700+
// Its implementation is decided at runtime and is decided
701+
// by whether or not the log is writing sync offsets or not.
702+
fillHeader func(bool)
703+
704+
// headerSize is the size of the type of header that the writer
705+
// is writing. It can either be legacyHeaderSize or walSyncHeaderSize.
706+
headerSize int
695707
}
696708

697709
// NewWriter returns a new Writer.
698-
func NewWriter(w io.Writer) *Writer {
710+
func NewWriter(w io.Writer, logNum base.DiskFileNum, writingSyncOffsets bool) *Writer {
699711
f, _ := w.(flusher)
700712

701713
var o int64
@@ -705,16 +717,24 @@ func NewWriter(w io.Writer) *Writer {
705717
o = 0
706718
}
707719
}
708-
return &Writer{
720+
wr := &Writer{
709721
w: w,
710722
f: f,
711723
baseOffset: o,
712724
lastRecordOffset: -1,
725+
logNum: logNum,
726+
}
727+
if writingSyncOffsets {
728+
wr.fillHeader = wr.fillHeaderSyncOffsets
729+
wr.headerSize = walSyncHeaderSize
730+
} else {
731+
wr.fillHeader = wr.fillHeaderLegacy
732+
wr.headerSize = legacyHeaderSize
713733
}
734+
return wr
714735
}
715736

716-
// fillHeader fills in the header for the pending chunk.
717-
func (w *Writer) fillHeader(last bool) {
737+
func (w *Writer) fillHeaderLegacy(last bool) {
718738
if w.i+legacyHeaderSize > w.j || w.j > blockSize {
719739
panic("pebble/record: bad writer state")
720740
}
@@ -735,12 +755,35 @@ func (w *Writer) fillHeader(last bool) {
735755
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-legacyHeaderSize))
736756
}
737757

758+
func (w *Writer) fillHeaderSyncOffsets(last bool) {
759+
if w.i+walSyncHeaderSize > w.j || w.j > blockSize {
760+
panic("pebble/record: bad writer state")
761+
}
762+
if last {
763+
if w.first {
764+
w.buf[w.i+6] = walSyncFullChunkEncoding
765+
} else {
766+
w.buf[w.i+6] = walSyncLastChunkEncoding
767+
}
768+
} else {
769+
if w.first {
770+
w.buf[w.i+6] = walSyncFirstChunkEncoding
771+
} else {
772+
w.buf[w.i+6] = walSyncMiddleChunkEncoding
773+
}
774+
}
775+
binary.LittleEndian.PutUint32(w.buf[w.i+7:w.i+11], uint32(w.logNum))
776+
binary.LittleEndian.PutUint64(w.buf[w.i+11:w.i+19], uint64(w.lastRecordOffset)+uint64(w.written))
777+
binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], crc.New(w.buf[w.i+6:w.j]).Value())
778+
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-walSyncHeaderSize))
779+
}
780+
738781
// writeBlock writes the buffered block to the underlying writer, and reserves
739782
// space for the next chunk's header.
740783
func (w *Writer) writeBlock() {
741784
_, w.err = w.w.Write(w.buf[w.written:])
742785
w.i = 0
743-
w.j = legacyHeaderSize
786+
w.j = w.headerSize
744787
w.written = 0
745788
w.blockNumber++
746789
}
@@ -796,7 +839,7 @@ func (w *Writer) Next() (io.Writer, error) {
796839
w.fillHeader(true)
797840
}
798841
w.i = w.j
799-
w.j = w.j + legacyHeaderSize
842+
w.j = w.j + w.headerSize
800843
// Check if there is room in the block for the header.
801844
if w.j > blockSize {
802845
// Fill in the rest of the block with zeroes.

record/record_test.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func testGeneratorWriter(
8989
func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) {
9090
t.Run("Writer", func(t *testing.T) {
9191
testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter {
92-
return NewWriter(w)
92+
return NewWriter(w, 0, false)
9393
})
9494
})
9595

@@ -175,7 +175,7 @@ func TestBoundary(t *testing.T) {
175175

176176
func TestFlush(t *testing.T) {
177177
buf := new(bytes.Buffer)
178-
w := NewWriter(buf)
178+
w := NewWriter(buf, 0, false)
179179
// Write a couple of records. Everything should still be held
180180
// in the record.Writer buffer, so that buf.Len should be 0.
181181
w0, _ := w.Next()
@@ -240,7 +240,7 @@ func TestNonExhaustiveRead(t *testing.T) {
240240
p := make([]byte, 10)
241241
rnd := rand.New(rand.NewPCG(0, 1))
242242

243-
w := NewWriter(buf)
243+
w := NewWriter(buf, 0, false)
244244
for i := 0; i < n; i++ {
245245
length := len(p) + rnd.IntN(3*blockSize)
246246
s := string(uint8(i)) + "123456789abcdefgh"
@@ -267,7 +267,7 @@ func TestNonExhaustiveRead(t *testing.T) {
267267
func TestStaleReader(t *testing.T) {
268268
buf := new(bytes.Buffer)
269269

270-
w := NewWriter(buf)
270+
w := NewWriter(buf, 0, false)
271271
_, err := w.WriteRecord([]byte("0"))
272272
require.NoError(t, err)
273273

@@ -313,7 +313,7 @@ func makeTestRecords(recordLengths ...int) (*testRecords, error) {
313313
}
314314

315315
buf := new(bytes.Buffer)
316-
w := NewWriter(buf)
316+
w := NewWriter(buf, base.DiskFileNum(0), false)
317317
for i, rec := range ret.records {
318318
wRec, err := w.Next()
319319
if err != nil {
@@ -616,7 +616,7 @@ func TestLastRecordOffset(t *testing.T) {
616616

617617
func TestNoLastRecordOffset(t *testing.T) {
618618
buf := new(bytes.Buffer)
619-
w := NewWriter(buf)
619+
w := NewWriter(buf, 0, false)
620620
defer w.Close()
621621

622622
if _, err := w.LastRecordOffset(); err != ErrNoLastRecord {
@@ -682,7 +682,7 @@ func TestInvalidLogNum(t *testing.T) {
682682
func TestSize(t *testing.T) {
683683
var buf bytes.Buffer
684684
zeroes := make([]byte, 8<<10)
685-
w := NewWriter(&buf)
685+
w := NewWriter(&buf, 0, false)
686686
for i := 0; i < 100; i++ {
687687
n := rand.IntN(len(zeroes))
688688
_, err := w.WriteRecord(zeroes[:n])
@@ -1099,6 +1099,41 @@ func describeWALSyncBlocks(
10991099
f.ToTreePrinter(n)
11001100
}
11011101

1102+
func TestManifestSyncOffset(t *testing.T) {
1103+
buf := new(bytes.Buffer)
1104+
w := NewWriter(buf, 0, true)
1105+
w.WriteRecord(bytes.Repeat([]byte{1}, blockSize-walSyncHeaderSize))
1106+
w.WriteRecord(bytes.Repeat([]byte{2}, blockSize-walSyncHeaderSize))
1107+
1108+
raw := buf.Bytes()
1109+
r := NewReader(bytes.NewReader(raw), 0)
1110+
r.loggerForTesting = &readerLogger{}
1111+
for {
1112+
_, err := r.Next()
1113+
if err != nil {
1114+
require.True(t, errors.Is(err, io.EOF))
1115+
require.True(t, r.loggerForTesting.(*readerLogger).getLog() == "")
1116+
break
1117+
}
1118+
}
1119+
1120+
// Check that corrupting a chunk should result in us reading ahead and returning
1121+
// an ErrInvalidChunk.
1122+
raw[0] ^= 0xFF
1123+
r = NewReader(bytes.NewReader(raw), 0)
1124+
r.loggerForTesting = &readerLogger{}
1125+
for {
1126+
_, err := r.Next()
1127+
if err != nil {
1128+
require.True(t, errors.Is(err, ErrInvalidChunk))
1129+
logStr := r.loggerForTesting.(*readerLogger).getLog()
1130+
require.True(t, logStr != "")
1131+
println(logStr)
1132+
break
1133+
}
1134+
}
1135+
}
1136+
11021137
func BenchmarkRecordWrite(b *testing.B) {
11031138
for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} {
11041139
b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) {

0 commit comments

Comments
 (0)