Skip to content

Commit c2695c3

Browse files
committed
Perform mvcc read validation in parallel
Signed-off-by: Liran Funaro <[email protected]>
1 parent 50ec7c0 commit c2695c3

File tree

3 files changed

+163
-5
lines changed

3 files changed

+163
-5
lines changed

internal/txvalidation/data_tx_validator.go

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (v *dataTxValidator) parallelValidation(txsEnv []*types.DataTxEnvelope, use
113113
}
114114

115115
func (v *dataTxValidator) validateSignatures(txEnv *types.DataTxEnvelope) ([]string, *types.ValidationInfo, error) {
116-
var userIDsWithValidSign []string
116+
userIDsWithValidSign := make([]string, 0, len(txEnv.Signatures))
117117
for userID, signature := range txEnv.Signatures {
118118
valRes, err := v.sigValidator.validate(userID, signature, txEnv.Payload)
119119
if err != nil {
@@ -484,6 +484,108 @@ func (v *dataTxValidator) validateACLForWriteOrDelete(userIDs []string, dbName,
484484
}, nil
485485
}
486486

487+
type readCache struct {
488+
dbName string
489+
key string
490+
ver *types.Version
491+
err error
492+
wg sync.WaitGroup
493+
}
494+
495+
func (v *dataTxValidator) parallelReadMvccValidation(
496+
valInfoArray []*types.ValidationInfo, dataTxEnvs []*types.DataTxEnvelope,
497+
) error {
498+
reads := make(map[string]map[string]*readCache)
499+
errorChan := make(chan error)
500+
501+
// Submit a "get-version" Go routine for each key in the envelope.
502+
// We avoid reading the same key twice.
503+
for txNum, txEnv := range dataTxEnvs {
504+
if valInfoArray[txNum].Flag != types.Flag_VALID {
505+
continue
506+
}
507+
for _, txOps := range txEnv.Payload.DbOperations {
508+
dbName := txOps.DbName
509+
dbReads, ok := reads[dbName]
510+
if !ok {
511+
dbReads = make(map[string]*readCache)
512+
reads[dbName] = dbReads
513+
}
514+
515+
for _, r := range txOps.DataReads {
516+
key := r.Key
517+
if _, ok := dbReads[key]; ok {
518+
continue
519+
}
520+
521+
c := &readCache{
522+
dbName: dbName,
523+
key: key,
524+
}
525+
c.wg.Add(1)
526+
dbReads[key] = c
527+
go func(txNum int, c *readCache) {
528+
defer c.wg.Done()
529+
c.ver, c.err = v.db.GetVersion(c.dbName, c.key)
530+
if c.err != nil {
531+
v.logger.Errorf("error validating signatures in tx number %d, error: %s", txNum, c.err)
532+
defer func() {
533+
// Ignore panic when errorChan is closed
534+
recover()
535+
}()
536+
errorChan <- c.err
537+
}
538+
}(txNum, c)
539+
}
540+
}
541+
}
542+
543+
// Submit a "validation" Go routine for read operation in the envelope.
544+
wg := sync.WaitGroup{}
545+
for txNum, txEnv := range dataTxEnvs {
546+
for _, txOps := range txEnv.Payload.DbOperations {
547+
for _, r := range txOps.DataReads {
548+
if valInfoArray[txNum].Flag != types.Flag_VALID {
549+
continue
550+
}
551+
552+
wg.Add(1)
553+
go func(txNum int, c *readCache, expectedVer *types.Version) {
554+
defer wg.Done()
555+
if c == nil {
556+
panic("all reads keys should be in the map")
557+
}
558+
559+
c.wg.Wait()
560+
if valInfoArray[txNum].Flag != types.Flag_VALID || c.err != nil {
561+
return
562+
}
563+
if proto.Equal(expectedVer, c.ver) {
564+
return
565+
}
566+
valInfoArray[txNum] = &types.ValidationInfo{
567+
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE,
568+
ReasonIfInvalid: "mvcc conflict has occurred as the committed state for the key [" + c.key + "] in database [" + c.dbName + "] changed",
569+
}
570+
}(txNum, reads[txOps.DbName][r.Key], r.Version)
571+
}
572+
}
573+
}
574+
575+
// Wait for all the validation routines to end.
576+
go func() {
577+
wg.Wait()
578+
// Inject nil to make sure we have data to read from the channel if no error occurred.
579+
errorChan <- nil
580+
}()
581+
582+
select {
583+
case err := <-errorChan:
584+
close(errorChan)
585+
return err
586+
}
587+
}
588+
487589
func (v *dataTxValidator) mvccValidation(dbName string, txOps *types.DBOperation, pendingOps *pendingOperations) (*types.ValidationInfo, error) {
488590
for _, r := range txOps.DataReads {
489591
if pendingOps.exist(dbName, r.Key) {
@@ -532,3 +634,38 @@ func (v *dataTxValidator) mvccValidation(dbName string, txOps *types.DBOperation
532634
Flag: types.Flag_VALID,
533635
}, nil
534636
}
637+
638+
func (v *dataTxValidator) mvccValidationPending(dbName string, txOps *types.DBOperation, pendingOps *pendingOperations) (*types.ValidationInfo, error) {
639+
for _, r := range txOps.DataReads {
640+
if pendingOps.exist(dbName, r.Key) {
641+
return &types.ValidationInfo{
642+
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
643+
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + r.Key + "] in database [" + dbName + "]",
644+
}, nil
645+
}
646+
}
647+
// as state trie generation work at the boundary of block, we cannot allow more than one write per key. This is because, the state trie
648+
// generation considers only the final updates and not intermediate updates within a block boundary. As a result, we would have intermediate
649+
// entries in the provenance store but cannot generate proof of existence for the same using the state trie. As blind writes/deletes are quite
650+
// rare, we allow only one write per key within a block. In general, user reads the key before writing to it.
651+
for _, w := range txOps.DataWrites {
652+
if pendingOps.exist(dbName, w.Key) {
653+
return &types.ValidationInfo{
654+
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
655+
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + w.Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once",
656+
}, nil
657+
}
658+
}
659+
for _, d := range txOps.DataDeletes {
660+
if pendingOps.exist(dbName, d.Key) {
661+
return &types.ValidationInfo{
662+
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
663+
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + d.Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once",
664+
}, nil
665+
}
666+
}
667+
668+
return &types.ValidationInfo{
669+
Flag: types.Flag_VALID,
670+
}, nil
671+
}

internal/txvalidation/validator.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,26 @@ func (v *Validator) ValidateBlock(block *types.Block) ([]*types.ValidationInfo,
9191
return nil, err
9292
}
9393

94-
if err = v.dataTxValidator.parallelValidation(dataTxEnvs, usersWithValidSigPerTX, valInfoArray); err != nil {
95-
return nil, errors.WithMessage(err, "error while validating data transaction")
94+
var wg sync.WaitGroup
95+
var parallelValidErr error
96+
var parallelMvccReadErr error
97+
wg.Add(2)
98+
go func() {
99+
defer wg.Done()
100+
parallelValidErr = v.dataTxValidator.parallelValidation(dataTxEnvs, usersWithValidSigPerTX, valInfoArray)
101+
}()
102+
go func() {
103+
defer wg.Done()
104+
parallelMvccReadErr = v.dataTxValidator.parallelReadMvccValidation(valInfoArray, dataTxEnvs)
105+
}()
106+
wg.Wait()
107+
108+
if parallelValidErr != nil {
109+
return nil, errors.WithMessage(parallelValidErr, "error while validating data transaction")
110+
}
111+
112+
if parallelMvccReadErr != nil {
113+
return nil, errors.WithMessage(parallelMvccReadErr, "error while validating data transaction's read set")
96114
}
97115

98116
pendingOps := newPendingOperations()
@@ -116,7 +134,7 @@ func (v *Validator) ValidateBlock(block *types.Block) ([]*types.ValidationInfo,
116134

117135
go func() {
118136
defer wg.Done()
119-
mvccValResult, mvccValError = v.dataTxValidator.mvccValidation(txOps.DbName, txOps, pendingOps)
137+
mvccValResult, mvccValError = v.dataTxValidator.mvccValidationPending(txOps.DbName, txOps, pendingOps)
120138
}()
121139

122140
wg.Wait()

internal/txvalidation/validator_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hyperledger-labs/orion-server/pkg/logger"
1616
"github.com/hyperledger-labs/orion-server/pkg/server/testutils"
1717
"github.com/hyperledger-labs/orion-server/pkg/types"
18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -657,7 +658,9 @@ func TestValidateDataBlock(t *testing.T) {
657658

658659
results, err := env.validator.ValidateBlock(tt.block)
659660
require.NoError(t, err)
660-
require.Equal(t, tt.expectedResults, results)
661+
for i := range tt.expectedResults {
662+
assert.Equal(t, tt.expectedResults[i], results[i], "index: %d", i)
663+
}
661664
})
662665
}
663666
}

0 commit comments

Comments
 (0)