Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 152 additions & 1 deletion internal/txvalidation/data_tx_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (v *dataTxValidator) parallelValidation(txsEnv []*types.DataTxEnvelope, use
}

func (v *dataTxValidator) validateSignatures(txEnv *types.DataTxEnvelope) ([]string, *types.ValidationInfo, error) {
var userIDsWithValidSign []string
userIDsWithValidSign := make([]string, 0, len(txEnv.Signatures))
for userID, signature := range txEnv.Signatures {
valRes, err := v.sigValidator.validate(userID, signature, txEnv.Payload)
if err != nil {
Expand Down Expand Up @@ -484,6 +484,122 @@ func (v *dataTxValidator) validateACLForWriteOrDelete(userIDs []string, dbName,
}, nil
}

type readCache struct {
dbName string
key string
ver *types.Version
err error
wg sync.WaitGroup
}

func (v *dataTxValidator) parallelReadMvccValidation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplicity of code flow is very important for a longer term code maintenance. Here, the flow looks complicated due to go-routine dependencies. Why not make first set of goroutines finish before starting the second. Are there significant performance between these two flows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first set of goroutines are strictly IO-bounded and the seconds are strictly compute-bounded.
I believe that a good portion of the benefit here comes from pipelining IO and compute-bounded tasks to utilize both the storage and CPU in parallel, rather than one after the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part in the second set of goroutine is CPU intensive? If we are verifying signatures, I would agree but we are doing simple checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not intensive, but it is CPU bound. A single version comparison might not be much, but we may have 1K to 1M comparisons. All of them together are nonnegligible, but hiding them by pipelining them with the IO-bound tasks reduces their apparent latency to zero.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel strongly about using this flow, go ahead. I will anyway share my view.

When we use cache for the read (when the client reads), there is a high possibility that the read content might be already available in the cache and disk reads may not be needed. Hence, it may not be a pure IO bounded. Further, active goroutines are dependent on the number of vCPUs. If we are taking about 1M goroutine, I doubt the typical server to handle that many number of goroutines and many might be waiting for the CPU thread. For every read and every version comparison, we are using a goroutine. Also, there is interdependencies between goroutines. Maybe I am wrong but it looks to me like over optimisation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This improvement alone yielded a 25%-50% read-write-TXs throughput (TPS) increase.
Even if the pipelining amount to 10% of that increase, that would be a ±5% increase in TPS.
Let's postpone this discussion until after we'll add all the metrics to the main branch, then we could compare both flows.

image

Copy link
Contributor

@cendhu cendhu Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I will push a PR where version check for each tx is executed in parallel but no parallelism within a single transaction. This would simplify the code too and cache at the world state would anyway help with duplicate reads. In the end to end performance, my hypothesis is that this flow would be sufficient and simple enough. Let's check.

valInfoArray []*types.ValidationInfo, dataTxEnvs []*types.DataTxEnvelope,
) error {
reads := make(map[string]map[string]*readCache)
errorChan := make(chan error)
defer close(errorChan)

// Submit a "get-version" Go routine for each read key in the envelope.
// We avoid reading the same key twice.
for txNum, txEnv := range dataTxEnvs {
if valInfoArray[txNum].Flag != types.Flag_VALID {
continue
}
for _, txOps := range txEnv.Payload.DbOperations {
if len(txOps.DataReads) == 0 {
continue
}

dbName := txOps.DbName
dbReads, ok := reads[dbName]
if !ok {
dbReads = make(map[string]*readCache)
reads[dbName] = dbReads
}

for _, r := range txOps.DataReads {
if _, ok := dbReads[r.Key]; ok {
continue
}

c := &readCache{
dbName: dbName,
key: r.Key,
}
c.wg.Add(1)
dbReads[r.Key] = c
go func(txNum int, c *readCache) {
defer c.wg.Done()
c.ver, c.err = v.db.GetVersion(c.dbName, c.key)
if c.err != nil {
v.logger.Errorf("error validating signatures in tx number %d, error: %s", txNum, c.err)
defer func() {
recover() // Ignore panic when errorChan is closed
}()
errorChan <- c.err
}
}(txNum, c)
}
}
}

// Submit a "validation" Go routine for read operation in the envelope.
wg := sync.WaitGroup{}
for txNum, txEnv := range dataTxEnvs {
if valInfoArray[txNum].Flag != types.Flag_VALID {
continue
}
for _, txOps := range txEnv.Payload.DbOperations {
if len(txOps.DataReads) == 0 {
continue
}
dbReads, ok := reads[txOps.DbName]
if !ok {
panic("all read DBs should be in the map")
}
for _, r := range txOps.DataReads {
keyRead, ok := dbReads[r.Key]
if !ok {
panic("all read keys should be in the map")
}

wg.Add(1)
go func(txNum int, c *readCache, expectedVer *types.Version) {
defer wg.Done()
// Stop early in case another validation routine already invalidated this TX.
if valInfoArray[txNum].Flag != types.Flag_VALID {
return
}

c.wg.Wait()

// We check the flag again after waiting for the read version.
// The version comparison is last to avoid redundant comparison (short circuit evaluation).
if c.err == nil && valInfoArray[txNum].Flag == types.Flag_VALID && !proto.Equal(expectedVer, c.ver) {
valInfoArray[txNum] = &types.ValidationInfo{
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE,
ReasonIfInvalid: "mvcc conflict has occurred as the committed state for the key [" + c.key + "] in database [" + c.dbName + "] changed",
}
}
}(txNum, keyRead, r.Version)
}
}
}

// Wait in the background for all the validation routines to end, then inject nil to make sure we have data
// to read from the channel if no error occurred.
go func() {
wg.Wait()
errorChan <- nil
}()

// Wait for all the validation routines to end or for the first error.
select {
case err := <-errorChan:
return err
}
}

func (v *dataTxValidator) mvccValidation(dbName string, txOps *types.DBOperation, pendingOps *pendingOperations) (*types.ValidationInfo, error) {
for _, r := range txOps.DataReads {
if pendingOps.exist(dbName, r.Key) {
Expand Down Expand Up @@ -532,3 +648,38 @@ func (v *dataTxValidator) mvccValidation(dbName string, txOps *types.DBOperation
Flag: types.Flag_VALID,
}, nil
}

func (v *dataTxValidator) mvccValidationPending(dbName string, txOps *types.DBOperation, pendingOps *pendingOperations) (*types.ValidationInfo, error) {
for _, r := range txOps.DataReads {
if pendingOps.exist(dbName, r.Key) {
return &types.ValidationInfo{
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + r.Key + "] in database [" + dbName + "]",
}, nil
}
}
// as state trie generation work at the boundary of block, we cannot allow more than one write per key. This is because, the state trie
// generation considers only the final updates and not intermediate updates within a block boundary. As a result, we would have intermediate
// entries in the provenance store but cannot generate proof of existence for the same using the state trie. As blind writes/deletes are quite
// rare, we allow only one write per key within a block. In general, user reads the key before writing to it.
for _, w := range txOps.DataWrites {
if pendingOps.exist(dbName, w.Key) {
return &types.ValidationInfo{
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + w.Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once",
}, nil
}
}
for _, d := range txOps.DataDeletes {
if pendingOps.exist(dbName, d.Key) {
return &types.ValidationInfo{
Flag: types.Flag_INVALID_MVCC_CONFLICT_WITHIN_BLOCK,
ReasonIfInvalid: "mvcc conflict has occurred within the block for the key [" + d.Key + "] in database [" + dbName + "]. Within a block, a key can be modified only once",
}, nil
}
}

return &types.ValidationInfo{
Flag: types.Flag_VALID,
}, nil
}
24 changes: 21 additions & 3 deletions internal/txvalidation/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,26 @@ func (v *Validator) ValidateBlock(block *types.Block) ([]*types.ValidationInfo,
return nil, err
}

if err = v.dataTxValidator.parallelValidation(dataTxEnvs, usersWithValidSigPerTX, valInfoArray); err != nil {
return nil, errors.WithMessage(err, "error while validating data transaction")
var wg sync.WaitGroup
var parallelValidErr error
var parallelMvccReadErr error
wg.Add(2)
go func() {
defer wg.Done()
parallelValidErr = v.dataTxValidator.parallelValidation(dataTxEnvs, usersWithValidSigPerTX, valInfoArray)
}()
go func() {
defer wg.Done()
parallelMvccReadErr = v.dataTxValidator.parallelReadMvccValidation(valInfoArray, dataTxEnvs)
}()
wg.Wait()

if parallelValidErr != nil {
return nil, errors.WithMessage(parallelValidErr, "error while validating data transaction")
}

if parallelMvccReadErr != nil {
return nil, errors.WithMessage(parallelMvccReadErr, "error while validating data transaction's read set")
}

pendingOps := newPendingOperations()
Expand All @@ -116,7 +134,7 @@ func (v *Validator) ValidateBlock(block *types.Block) ([]*types.ValidationInfo,

go func() {
defer wg.Done()
mvccValResult, mvccValError = v.dataTxValidator.mvccValidation(txOps.DbName, txOps, pendingOps)
mvccValResult, mvccValError = v.dataTxValidator.mvccValidationPending(txOps.DbName, txOps, pendingOps)
}()

wg.Wait()
Expand Down
5 changes: 4 additions & 1 deletion internal/txvalidation/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger-labs/orion-server/pkg/logger"
"github.com/hyperledger-labs/orion-server/pkg/server/testutils"
"github.com/hyperledger-labs/orion-server/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -657,7 +658,9 @@ func TestValidateDataBlock(t *testing.T) {

results, err := env.validator.ValidateBlock(tt.block)
require.NoError(t, err)
require.Equal(t, tt.expectedResults, results)
for i := range tt.expectedResults {
assert.Equal(t, tt.expectedResults[i], results[i], "index: %d", i)
}
})
}
}
Expand Down