@@ -497,14 +497,19 @@ func (v *dataTxValidator) parallelReadMvccValidation(
497
497
) error {
498
498
reads := make (map [string ]map [string ]* readCache )
499
499
errorChan := make (chan error )
500
+ defer close (errorChan )
500
501
501
- // Submit a "get-version" Go routine for each key in the envelope.
502
+ // Submit a "get-version" Go routine for each read key in the envelope.
502
503
// We avoid reading the same key twice.
503
504
for txNum , txEnv := range dataTxEnvs {
504
505
if valInfoArray [txNum ].Flag != types .Flag_VALID {
505
506
continue
506
507
}
507
508
for _ , txOps := range txEnv .Payload .DbOperations {
509
+ if len (txOps .DataReads ) == 0 {
510
+ continue
511
+ }
512
+
508
513
dbName := txOps .DbName
509
514
dbReads , ok := reads [dbName ]
510
515
if ! ok {
@@ -513,25 +518,23 @@ func (v *dataTxValidator) parallelReadMvccValidation(
513
518
}
514
519
515
520
for _ , r := range txOps .DataReads {
516
- key := r .Key
517
- if _ , ok := dbReads [key ]; ok {
521
+ if _ , ok := dbReads [r .Key ]; ok {
518
522
continue
519
523
}
520
524
521
525
c := & readCache {
522
526
dbName : dbName ,
523
- key : key ,
527
+ key : r . Key ,
524
528
}
525
529
c .wg .Add (1 )
526
- dbReads [key ] = c
530
+ dbReads [r . Key ] = c
527
531
go func (txNum int , c * readCache ) {
528
532
defer c .wg .Done ()
529
533
c .ver , c .err = v .db .GetVersion (c .dbName , c .key )
530
534
if c .err != nil {
531
535
v .logger .Errorf ("error validating signatures in tx number %d, error: %s" , txNum , c .err )
532
536
defer func () {
533
- // Ignore panic when errorChan is closed
534
- recover ()
537
+ recover () // Ignore panic when errorChan is closed
535
538
}()
536
539
errorChan <- c .err
537
540
}
@@ -543,45 +546,56 @@ func (v *dataTxValidator) parallelReadMvccValidation(
543
546
// Submit a "validation" Go routine for read operation in the envelope.
544
547
wg := sync.WaitGroup {}
545
548
for txNum , txEnv := range dataTxEnvs {
549
+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
550
+ continue
551
+ }
546
552
for _ , txOps := range txEnv .Payload .DbOperations {
553
+ if len (txOps .DataReads ) == 0 {
554
+ continue
555
+ }
556
+ dbReads , ok := reads [txOps .DbName ]
557
+ if ! ok {
558
+ panic ("all read DBs should be in the map" )
559
+ }
547
560
for _ , r := range txOps .DataReads {
548
- if valInfoArray [txNum ].Flag != types .Flag_VALID {
549
- continue
561
+ keyRead , ok := dbReads [r .Key ]
562
+ if ! ok {
563
+ panic ("all read keys should be in the map" )
550
564
}
551
565
552
566
wg .Add (1 )
553
567
go func (txNum int , c * readCache , expectedVer * types.Version ) {
554
568
defer wg .Done ()
555
- if c == nil {
556
- panic ("all reads keys should be in the map" )
569
+ // Stop early in case another validation routine already invalidated this TX.
570
+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
571
+ return
557
572
}
558
573
559
574
c .wg .Wait ()
560
- if valInfoArray [txNum ].Flag != types .Flag_VALID || c .err != nil {
561
- return
562
- }
563
- if proto .Equal (expectedVer , c .ver ) {
564
- return
565
- }
566
- valInfoArray [txNum ] = & types.ValidationInfo {
567
- Flag : types .Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE ,
568
- ReasonIfInvalid : "mvcc conflict has occurred as the committed state for the key [" + c .key + "] in database [" + c .dbName + "] changed" ,
575
+
576
+ // We check the flag again after waiting for the read version.
577
+ // The version comparison is last to avoid redundant comparison (short circuit evaluation).
578
+ if c .err == nil && valInfoArray [txNum ].Flag == types .Flag_VALID && ! proto .Equal (expectedVer , c .ver ) {
579
+ valInfoArray [txNum ] = & types.ValidationInfo {
580
+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE ,
581
+ ReasonIfInvalid : "mvcc conflict has occurred as the committed state for the key [" + c .key + "] in database [" + c .dbName + "] changed" ,
582
+ }
569
583
}
570
- }(txNum , reads [txOps. DbName ][r. Key ] , r .Version )
584
+ }(txNum , keyRead , r .Version )
571
585
}
572
586
}
573
587
}
574
588
575
- // Wait for all the validation routines to end.
589
+ // Wait in the background for all the validation routines to end, then inject nil to make sure we have data
590
+ // to read from the channel if no error occurred.
576
591
go func () {
577
592
wg .Wait ()
578
- // Inject nil to make sure we have data to read from the channel if no error occurred.
579
593
errorChan <- nil
580
594
}()
581
595
596
+ // Wait for all the validation routines to end or for the first error.
582
597
select {
583
598
case err := <- errorChan :
584
- close (errorChan )
585
599
return err
586
600
}
587
601
}
0 commit comments