@@ -107,13 +107,18 @@ package record
107
107
108
108
import (
109
109
"encoding/binary"
110
+ "encoding/hex"
111
+ "fmt"
110
112
"io"
111
113
"math"
114
+ "strings"
112
115
113
116
"github.com/cockroachdb/errors"
114
117
"github.com/cockroachdb/pebble/internal/base"
118
+ "github.com/cockroachdb/pebble/internal/binfmt"
115
119
"github.com/cockroachdb/pebble/internal/bitflip"
116
120
"github.com/cockroachdb/pebble/internal/crc"
121
+ "github.com/cockroachdb/pebble/internal/treeprinter"
117
122
)
118
123
119
124
// These constants are part of the wire format and should not be changed.
@@ -252,12 +257,33 @@ type Reader struct {
252
257
// encountered during WAL replay was the logical EOF or confirmed corruption.
253
258
invalidOffset uint64
254
259
255
- // loggerForTesting is a logging helper used by the Reader to accumulate log messages.
256
- loggerForTesting loggerForTesting
260
+ // logger is a logging helper used by the Reader to accumulate log messages.
261
+ logger * loggerForTesting
262
+
263
+ // visualLogger is a logging helper used by the Reader to accumulate visual logs.
264
+ visualLogger * visualLoggerForTesting
265
+ }
266
+
267
+ type loggerForTesting struct {
268
+ verbose bool
269
+ builder strings.Builder
257
270
}
258
271
259
- type loggerForTesting interface {
260
- logf (format string , args ... interface {})
272
+ func (l * loggerForTesting ) logf (format string , args ... interface {}) {
273
+ fmt .Fprintf (& l .builder , format , args ... )
274
+ }
275
+
276
+ func (l * loggerForTesting ) getLog () string {
277
+ return l .builder .String ()
278
+ }
279
+
280
+ type visualLoggerForTesting struct {
281
+ verbose bool
282
+ f * binfmt.Formatter
283
+ tp * treeprinter.Node
284
+ blockRoot treeprinter.Node
285
+ blockNode treeprinter.Node
286
+ chunkNode treeprinter.Node
261
287
}
262
288
263
289
// NewReader returns a new reader. If the file contains records encoded using
@@ -425,6 +451,23 @@ func (r *Reader) Next() (io.Reader, error) {
425
451
return singleReader {r , r .seq }, nil
426
452
}
427
453
454
+ func (r * Reader ) InvestigateChunks (verbose bool ) (string , string ) {
455
+ tree := treeprinter .New ()
456
+ r .visualLogger = & visualLoggerForTesting {
457
+ f : binfmt .New (r .buf [:]).LineWidth (20 ),
458
+ tp : & tree ,
459
+ verbose : verbose ,
460
+ }
461
+ r .logger = & loggerForTesting {
462
+ verbose : verbose ,
463
+ }
464
+ err := r .readAheadForCorruption ()
465
+ if err != nil {
466
+ panic (errors .Wrap (err , "Error occurred while Investigating Chunks." ))
467
+ }
468
+ return r .visualLogger .tp .String (), r .logger .getLog ()
469
+ }
470
+
428
471
// readAheadForCorruption scans ahead in the log to detect corruption.
429
472
// It loads in blocks and reads chunks until it either detects corruption
430
473
// due to an offset (encoded in a chunk header) exceeding the invalid offset,
@@ -440,19 +483,44 @@ func (r *Reader) Next() (io.Reader, error) {
440
483
// if there is confirmation of a corruption, otherwise ErrUnexpectedEOF is
441
484
// returned after reading all the blocks without corruption confirmation.
442
485
func (r * Reader ) readAheadForCorruption () error {
443
- if r .loggerForTesting != nil {
444
- r .loggerForTesting .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
486
+ if r .logger != nil {
487
+ r .logger .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
488
+ }
489
+ if r .visualLogger != nil {
490
+ r .visualLogger .blockRoot = r .visualLogger .tp .Child ("Block" )
491
+ }
492
+ getBufferDump := func (buf []byte , i int , j int ) string {
493
+ return fmt .Sprintf ("Buffer Dump: %s\n " , hex .EncodeToString (buf [i :j ]))
494
+ }
495
+
496
+ logMsgAndDump := func (logMsg , bufferDump string ) {
497
+ if r .logger != nil {
498
+ r .logger .logf ("\t %s" , logMsg )
499
+ if r .logger .verbose {
500
+ r .logger .logf ("\t %s" , bufferDump )
501
+ }
502
+ }
503
+ if r .visualLogger != nil {
504
+ r .visualLogger .chunkNode .Child (logMsg )
505
+ if r .visualLogger .verbose {
506
+ r .visualLogger .chunkNode .Child (bufferDump )
507
+ }
508
+ }
509
+ }
510
+
511
+ if r .visualLogger != nil {
512
+ defer r .visualLogger .f .SetAnchorOffset ()
513
+ defer r .visualLogger .f .ToTreePrinter (r .visualLogger .blockRoot )
445
514
}
446
515
447
516
for {
448
517
// Load the next block into r.buf.
449
518
n , err := io .ReadFull (r .r , r .buf [:])
450
519
r .begin , r .end , r .n = 0 , 0 , n
451
520
r .blockNum ++
452
- if r .loggerForTesting != nil {
453
- r .loggerForTesting .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
521
+ if r .logger != nil {
522
+ r .logger .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
454
523
}
455
-
456
524
if errors .Is (err , io .EOF ) {
457
525
// io.ErrUnexpectedEOF is returned instead of
458
526
// io.EOF because io library functions clear
@@ -464,8 +532,8 @@ func (r *Reader) readAheadForCorruption() error {
464
532
// invalid chunk should have been valid, the chunk represents
465
533
// an abrupt, unclean termination of the logical log. This
466
534
// abrupt end of file represented by io.ErrUnexpectedEOF.
467
- if r .loggerForTesting != nil {
468
- r .loggerForTesting .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
535
+ if r .logger != nil {
536
+ r .logger .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
469
537
}
470
538
return io .ErrUnexpectedEOF
471
539
}
@@ -475,93 +543,141 @@ func (r *Reader) readAheadForCorruption() error {
475
543
// However, if the error is not ErrUnexpectedEOF, then this
476
544
// error should be surfaced.
477
545
if err != nil && err != io .ErrUnexpectedEOF {
478
- if r .loggerForTesting != nil {
479
- r .loggerForTesting .logf ("\t Error reading block %d: %v" , r .blockNum , err )
546
+ if r .logger != nil {
547
+ r .logger .logf ("\t Error reading block %d: %v" , r .blockNum , err )
480
548
}
481
549
return err
482
550
}
483
551
552
+ chunkCount := 0
484
553
for r .end + legacyHeaderSize <= r .n {
485
554
checksum := binary .LittleEndian .Uint32 (r .buf [r .end + 0 : r .end + 4 ])
486
555
length := binary .LittleEndian .Uint16 (r .buf [r .end + 4 : r .end + 6 ])
487
556
chunkEncoding := r .buf [r .end + 6 ]
557
+ bufferDump := getBufferDump (r .buf [:], r .end , r .n )
558
+ chunkCount ++
488
559
489
- if r .loggerForTesting != nil {
490
- r .loggerForTesting .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
560
+ if r .logger != nil {
561
+ r .logger .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
562
+ }
563
+ if r .visualLogger != nil {
564
+ if chunkCount == 1 {
565
+ r .visualLogger .blockNode = r .visualLogger .blockRoot .Childf ("Block #%d" , r .blockNum )
566
+ }
567
+ r .visualLogger .chunkNode = r .visualLogger .blockNode .Childf ("Chunk #%d at offset %d" , chunkCount , r .end )
568
+ r .visualLogger .chunkNode .Childf ("Checksum: %d" , checksum )
569
+ r .visualLogger .chunkNode .Childf ("Encoded Length: %d" , length )
491
570
}
492
571
493
572
if int (chunkEncoding ) >= len (headerFormatMappings ) {
494
- if r .loggerForTesting != nil {
495
- r .loggerForTesting .logf ("\t Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
496
- }
573
+ logMsg := fmt .Sprintf ("Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
574
+ logMsgAndDump (logMsg , bufferDump )
497
575
break
498
576
}
499
577
500
578
headerFormat := headerFormatMappings [chunkEncoding ]
501
579
chunkPosition , wireFormat , headerSize := headerFormat .chunkPosition , headerFormat .wireFormat , headerFormat .headerSize
580
+ if r .visualLogger != nil {
581
+ encodingStr := chunkEncodingStr (chunkEncoding )
582
+ r .visualLogger .chunkNode .Childf ("Chunk encoding: %s(%d) (chunkPosition: %d, wireFormat: %d)" , encodingStr , chunkEncoding , chunkPosition , wireFormat )
583
+ }
584
+
502
585
if checksum == 0 && length == 0 && chunkPosition == invalidChunkPosition {
503
- if r .loggerForTesting != nil {
504
- r .loggerForTesting .logf ("\t Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
505
- }
586
+ logMsg := fmt .Sprintf ("Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
587
+ logMsgAndDump (logMsg , bufferDump )
506
588
break
507
589
}
508
590
if wireFormat == invalidWireFormat {
509
- if r .loggerForTesting != nil {
510
- r .loggerForTesting .logf ("\t Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
511
- }
591
+ logMsg := fmt .Sprintf ("Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
592
+ logMsgAndDump (logMsg , bufferDump )
512
593
break
513
594
}
514
595
if wireFormat == recyclableWireFormat || wireFormat == walSyncWireFormat {
515
596
if r .end + headerSize > r .n {
516
- if r .loggerForTesting != nil {
517
- r .loggerForTesting .logf ("\t Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
518
- }
597
+ logMsg := fmt .Sprintf ("Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
598
+ logMsgAndDump (logMsg , bufferDump )
519
599
break
520
600
}
521
601
logNum := binary .LittleEndian .Uint32 (r .buf [r .end + 7 : r .end + 11 ])
602
+ if r .visualLogger != nil {
603
+ r .visualLogger .chunkNode .Childf ("Log Num: %d" , logNum )
604
+ }
522
605
if logNum != r .logNum {
523
- if r .loggerForTesting != nil {
524
- r .loggerForTesting .logf ("\t Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
525
- }
606
+ logMsg := fmt .Sprintf ("Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
607
+ logMsgAndDump (logMsg , bufferDump )
526
608
break
527
609
}
528
610
}
529
611
530
612
r .begin = r .end + headerSize
531
613
r .end = r .begin + int (length )
614
+ bufferDump = getBufferDump (r .buf [:], r .begin , min (r .end , r .n ))
532
615
if r .end > r .n {
533
616
// The chunk straddles a 32KB boundary (or the end of file).
534
- if r .loggerForTesting != nil {
535
- r .loggerForTesting .logf ("\t Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
536
- }
617
+ logMsg := fmt .Sprintf ("Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
618
+ logMsgAndDump (logMsg , bufferDump )
537
619
break
538
620
}
539
621
if checksum != crc .New (r .buf [r .begin - headerSize + 6 :r .end ]).Value () {
540
- if r .loggerForTesting != nil {
541
- r .loggerForTesting .logf ("\t Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
542
- }
622
+ logMsg := fmt .Sprintf ("Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
623
+ logMsgAndDump (logMsg , bufferDump )
543
624
break
544
625
}
545
626
546
627
// Decode offset in header when chunk has the WAL Sync wire format.
547
628
if wireFormat == walSyncWireFormat {
548
629
syncedOffset := binary .LittleEndian .Uint64 (r .buf [r .begin - headerSize + 11 : r .begin - headerSize + 19 ])
549
- if r .loggerForTesting != nil {
550
- r .loggerForTesting .logf ("\t Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
630
+ if r .visualLogger != nil {
631
+ r .visualLogger .chunkNode .Childf ("Synced Offset: %d" , syncedOffset )
632
+ }
633
+ if r .logger != nil {
634
+ r .logger .logf ("Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
551
635
}
552
636
// If the encountered chunk offset promises durability beyond the invalid offset,
553
637
// the invalid offset must have been corruption.
554
638
if syncedOffset > r .invalidOffset {
555
- if r .loggerForTesting != nil {
556
- r .loggerForTesting .logf ("\t Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
557
- }
639
+ logMsg := fmt .Sprintf ("Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
640
+ logMsgAndDump (logMsg , bufferDump )
558
641
return r .err
559
642
}
560
643
}
561
644
}
562
645
}
563
646
}
564
647
648
+ func chunkEncodingStr (encoding byte ) string {
649
+ switch encoding {
650
+ case invalidChunkEncoding :
651
+ return "invalidInvalidChunk"
652
+ case fullChunkEncoding :
653
+ return "legacyFullChunk"
654
+ case firstChunkEncoding :
655
+ return "legacyFirstChunk"
656
+ case middleChunkEncoding :
657
+ return "legacyMiddleChunk"
658
+ case lastChunkEncoding :
659
+ return "legacyLastChunk"
660
+ case recyclableFullChunkEncoding :
661
+ return "recyclableFullChunk"
662
+ case recyclableFirstChunkEncoding :
663
+ return "recyclableFirstChunk"
664
+ case recyclableMiddleChunkEncoding :
665
+ return "recyclableMiddleChunk"
666
+ case recyclableLastChunkEncoding :
667
+ return "recyclableLastChunk"
668
+ case walSyncFullChunkEncoding :
669
+ return "walSyncFullChunk"
670
+ case walSyncFirstChunkEncoding :
671
+ return "walSyncFirstChunk"
672
+ case walSyncMiddleChunkEncoding :
673
+ return "walSyncMiddleChunk"
674
+ case walSyncLastChunkEncoding :
675
+ return "walSyncLastChunk"
676
+ default :
677
+ return "unknown encoding"
678
+ }
679
+ }
680
+
565
681
// Offset returns the current offset within the file. If called immediately
566
682
// before a call to Next(), Offset() will return the record offset.
567
683
func (r * Reader ) Offset () int64 {
0 commit comments