Skip to content

Commit 332b7a6

Browse files
authored
Remove redundant lock (#469)
* Remove redundant lock Signed-off-by: Liran Funaro <[email protected]>
1 parent b4e9071 commit 332b7a6

File tree

8 files changed

+118
-100
lines changed

8 files changed

+118
-100
lines changed

internal/bcdb/transaction_processor.go

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
// Copyright IBM Corp. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
3+
34
package bcdb
45

56
import (
67
"encoding/json"
7-
"fmt"
8-
"sync"
98
"time"
109

1110
"github.com/google/uuid"
@@ -45,7 +44,6 @@ type transactionProcessor struct {
4544
blockStore *blockstore.Store
4645
pendingTxs *queue.PendingTxs
4746
logger *logger.SugarLogger
48-
sync.Mutex
4947
}
5048

5149
type txProcessorConfig struct {
@@ -266,37 +264,47 @@ func (t *transactionProcessor) SubmitTransaction(tx interface{}, timeout time.Du
266264
return nil, err
267265
}
268266

269-
t.Lock()
270-
duplicate, err := t.isTxIDDuplicate(txID)
271-
if err != nil {
272-
t.Unlock()
273-
return nil, err
274-
}
275-
if duplicate {
276-
t.Unlock()
267+
// We attempt to insert the txID atomically.
268+
// If we succeed, then future TX fill fail at this point.
269+
// However, if the TX already exists in the block store, then we will fail the subsequent check.
270+
// Since a TX will be removed from the pending queue only after it is inserted to the block store,
271+
// then it is guaranteed that we won't use the same txID twice.
272+
// TODO: add limit on the number of pending sync tx
273+
promise := queue.NewCompletionPromise(timeout)
274+
if existed := t.pendingTxs.Add(txID, promise); existed {
277275
return nil, &internalerror.DuplicateTxIDError{TxID: txID}
278276
}
279277

280-
if t.txQueue.IsFull() {
281-
t.Unlock()
282-
return nil, fmt.Errorf("transaction queue is full. It means the server load is high. Try after sometime")
278+
duplicate, err := t.blockStore.DoesTxIDExist(txID)
279+
if err != nil || duplicate {
280+
t.pendingTxs.DeleteWithNoAction(txID)
281+
if err == nil {
282+
err = &internalerror.DuplicateTxIDError{TxID: txID}
283+
}
284+
return nil, err
283285
}
284286

285-
jsonBytes, err := json.MarshalIndent(tx, "", "\t")
286-
if err != nil {
287-
t.Unlock()
288-
return nil, fmt.Errorf("failed to marshal transaction: %v", err)
287+
// Avoids marshaling the TX in production mode
288+
if t.logger.IsDebug() {
289+
if jsonBytes, err := json.MarshalIndent(tx, "", "\t"); err != nil {
290+
t.logger.Debugf("failed to marshal transaction: %v", err)
291+
} else {
292+
t.logger.Debugf("enqueuing transaction %s\n", jsonBytes)
293+
}
289294
}
290-
t.logger.Debugf("enqueuing transaction %s\n", string(jsonBytes))
291295

292-
t.txQueue.Enqueue(tx)
296+
if timeout <= 0 {
297+
// Enqueue will block until the queue is not full
298+
t.txQueue.Enqueue(tx)
299+
} else {
300+
// EnqueueWithTimeout will block until the queue is not full or timeout occurs
301+
if success := t.txQueue.EnqueueWithTimeout(tx, timeout); !success {
302+
t.pendingTxs.DeleteWithNoAction(txID)
303+
return nil, &internalerror.TimeoutErr{ErrMsg: "timeout has occurred while inserting the transaction to the queue"}
304+
}
305+
}
293306
t.logger.Debug("transaction is enqueued for re-ordering")
294307

295-
promise := queue.NewCompletionPromise(timeout)
296-
// TODO: add limit on the number of pending sync tx
297-
t.pendingTxs.Add(txID, promise)
298-
t.Unlock()
299-
300308
receipt, err := promise.Wait()
301309

302310
if err != nil {
@@ -336,49 +344,31 @@ func (t *transactionProcessor) PostBlockCommitProcessing(block *types.Block) err
336344
return errors.Errorf("unexpected transaction envelope in the block")
337345
}
338346

339-
t.pendingTxs.DoneWithReceipt(txIDs, block.Header)
347+
go t.pendingTxs.DoneWithReceipt(txIDs, block.Header)
340348

341349
return nil
342350
}
343351

344-
func (t *transactionProcessor) isTxIDDuplicate(txID string) (bool, error) {
345-
if t.pendingTxs.Has(txID) {
346-
return true, nil
347-
}
348-
349-
isTxIDAlreadyCommitted, err := t.blockStore.DoesTxIDExist(txID)
350-
if err != nil {
351-
return false, err
352-
}
353-
return isTxIDAlreadyCommitted, nil
354-
}
355-
356352
func (t *transactionProcessor) Close() error {
357-
t.Lock()
358-
defer t.Unlock()
359-
353+
// It is safe to use without locks because all the following calls are protected internally.
360354
t.txReorderer.Stop()
361355
t.blockCreator.Stop()
362-
t.blockReplicator.Close()
356+
_ = t.blockReplicator.Close()
363357
t.peerTransport.Close()
364358
t.blockProcessor.Stop()
365359

366360
return nil
367361
}
368362

369363
func (t *transactionProcessor) IsLeader() *internalerror.NotLeaderError {
370-
t.Lock()
371-
defer t.Unlock()
372-
364+
// It is safe to use without locks because the following call is protected internally.
373365
return t.blockReplicator.IsLeader()
374366
}
375367

376368
// ClusterStatus returns the leader NodeID, and the active nodes NodeIDs.
377369
// Note: leader is always in active.
378370
func (t *transactionProcessor) ClusterStatus() (leader string, active []string) {
379-
t.Lock()
380-
defer t.Unlock()
381-
371+
// It is safe to use without locks because the following call is protected internally.
382372
leaderID, activePeers := t.blockReplicator.GetClusterStatus()
383373
for _, peer := range activePeers {
384374
active = append(active, peer.NodeId)

internal/blockcreator/blockcreator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func TestBlockCreator_ReleaseAsync(t *testing.T) {
332332
})
333333

334334
for i := 1; i < 6; i++ {
335-
testEnv.pendingTxs.Add(fmt.Sprintf("txid:%d", i), nil)
335+
require.False(t, testEnv.pendingTxs.Add(fmt.Sprintf("txid:%d", i), nil))
336336
}
337337

338338
for _, txBatch := range txBatches {
@@ -358,7 +358,7 @@ func TestBlockCreator_ReleaseSync(t *testing.T) {
358358
wg.Add(5)
359359
for i := 1; i < 6; i++ {
360360
promise := queue.NewCompletionPromise(5 * time.Second)
361-
testEnv.pendingTxs.Add(fmt.Sprintf("txid:%d", i), promise)
361+
require.False(t, testEnv.pendingTxs.Add(fmt.Sprintf("txid:%d", i), promise))
362362
go func() {
363363
receipt, err := promise.Wait()
364364
require.Nil(t, receipt)

internal/blockprocessor/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
package blockprocessor
55

66
import (
7-
"github.com/hyperledger-labs/orion-server/config"
87
"sync"
98

9+
"github.com/hyperledger-labs/orion-server/config"
1010
"github.com/hyperledger-labs/orion-server/internal/blockstore"
1111
"github.com/hyperledger-labs/orion-server/internal/mptrie"
1212
"github.com/hyperledger-labs/orion-server/internal/mtree"

internal/blockstore/commit_and_query.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,7 @@ func (s *Store) GetHeaderByHash(blockHash []byte) (*types.BlockHeader, error) {
477477
// DoesTxIDExist returns true if any of the committed block has a transaction with
478478
// the given txID. Otherwise, it returns false
479479
func (s *Store) DoesTxIDExist(txID string) (bool, error) {
480-
s.mu.RLock()
481-
defer s.mu.RUnlock()
482-
480+
// It is safe to use without locks because the following call is protected internally.
483481
return s.txValidationInfoDB.Has([]byte(txID), &opt.ReadOptions{})
484482
}
485483

internal/queue/pending_txs.go

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ import (
1111
)
1212

1313
type PendingTxs struct {
14-
sync.RWMutex
15-
txs map[string]*CompletionPromise
16-
14+
txs map[string]*CompletionPromise
15+
lock sync.Mutex // We use a simple mutex because we only access the pending TXs to add or delete TXs
1716
logger *logger.SugarLogger
1817
}
1918

@@ -24,30 +23,45 @@ func NewPendingTxs(logger *logger.SugarLogger) *PendingTxs {
2423
}
2524
}
2625

27-
func (p *PendingTxs) Add(txID string, promise *CompletionPromise) {
28-
p.Lock()
29-
defer p.Unlock()
26+
// Add returns true if the txId was already taken
27+
func (p *PendingTxs) Add(txID string, promise *CompletionPromise) bool {
28+
p.lock.Lock()
29+
defer p.lock.Unlock()
30+
_, loaded := p.txs[txID]
31+
if !loaded {
32+
p.txs[txID] = promise
33+
}
34+
return loaded
35+
}
36+
37+
func (p *PendingTxs) DeleteWithNoAction(txID string) {
38+
p.lock.Lock()
39+
defer p.lock.Unlock()
40+
delete(p.txs, txID)
41+
}
3042

31-
p.txs[txID] = promise
43+
func (p *PendingTxs) loadAndDelete(txID string) (*CompletionPromise, bool) {
44+
p.lock.Lock()
45+
defer p.lock.Unlock()
46+
promise, loaded := p.txs[txID]
47+
delete(p.txs, txID)
48+
return promise, loaded
3249
}
3350

3451
// DoneWithReceipt is called after the commit of a block.
3552
// The `txIDs` slice must be in the same order that transactions appear in the block.
3653
func (p *PendingTxs) DoneWithReceipt(txIDs []string, blockHeader *types.BlockHeader) {
3754
p.logger.Debugf("Done with receipt, block number: %d; txIDs: %v", blockHeader.GetBaseHeader().GetNumber(), txIDs)
3855

39-
p.Lock()
40-
defer p.Unlock()
41-
4256
for txIndex, txID := range txIDs {
43-
p.txs[txID].done(
44-
&types.TxReceipt{
45-
Header: blockHeader,
46-
TxIndex: uint64(txIndex),
47-
},
48-
)
49-
50-
delete(p.txs, txID)
57+
if promise, loaded := p.loadAndDelete(txID); loaded {
58+
promise.done(
59+
&types.TxReceipt{
60+
Header: blockHeader,
61+
TxIndex: uint64(txIndex),
62+
},
63+
)
64+
}
5165
}
5266
}
5367

@@ -57,27 +71,24 @@ func (p *PendingTxs) DoneWithReceipt(txIDs []string, blockHeader *types.BlockHea
5771
func (p *PendingTxs) ReleaseWithError(txIDs []string, err error) {
5872
p.logger.Debugf("Release with error: %s; txIDs: %v", err, txIDs)
5973

60-
p.Lock()
61-
defer p.Unlock()
62-
6374
for _, txID := range txIDs {
64-
p.txs[txID].error(err)
65-
66-
delete(p.txs, txID)
75+
if promise, loaded := p.loadAndDelete(txID); loaded {
76+
promise.error(err)
77+
}
6778
}
6879
}
6980

81+
// Has is used only for testing.
7082
func (p *PendingTxs) Has(txID string) bool {
71-
p.RLock()
72-
defer p.RUnlock()
73-
83+
p.lock.Lock()
84+
defer p.lock.Unlock()
7485
_, ok := p.txs[txID]
7586
return ok
7687
}
7788

89+
// Empty is used only for testing.
7890
func (p *PendingTxs) Empty() bool {
79-
p.RLock()
80-
defer p.RUnlock()
81-
91+
p.lock.Lock()
92+
defer p.lock.Unlock()
8293
return len(p.txs) == 0
8394
}

internal/queue/pending_txs_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
package queue_test
55

66
import (
7-
ierrors "github.com/hyperledger-labs/orion-server/internal/errors"
87
"sync"
98
"testing"
109
"time"
1110

11+
"github.com/golang/protobuf/proto"
12+
ierrors "github.com/hyperledger-labs/orion-server/internal/errors"
1213
"github.com/hyperledger-labs/orion-server/internal/queue"
1314
"github.com/hyperledger-labs/orion-server/pkg/types"
14-
"github.com/golang/protobuf/proto"
1515
"github.com/stretchr/testify/require"
1616
)
1717

@@ -20,10 +20,10 @@ func TestPendingTxs_Async(t *testing.T) {
2020

2121
var p *queue.CompletionPromise
2222
require.True(t, pendingTxs.Empty())
23-
pendingTxs.Add("tx1", p)
23+
require.False(t, pendingTxs.Add("tx1", p))
2424
require.True(t, pendingTxs.Has("tx1"))
2525
require.False(t, pendingTxs.Has("tx2"))
26-
pendingTxs.Add("tx2", p)
26+
require.False(t, pendingTxs.Add("tx2", p))
2727
require.True(t, pendingTxs.Has("tx2"))
2828
pendingTxs.DoneWithReceipt([]string{"tx1", "tx2"}, nil)
2929
require.True(t, pendingTxs.Empty())
@@ -45,7 +45,7 @@ func TestPendingTxs_Sync(t *testing.T) {
4545

4646
t.Run("Wait before Done", func(t *testing.T) {
4747
p := queue.NewCompletionPromise(time.Hour)
48-
pendingTxs.Add("tx3", p)
48+
require.False(t, pendingTxs.Add("tx3", p))
4949

5050
go func() {
5151
time.Sleep(10 * time.Millisecond)
@@ -59,7 +59,7 @@ func TestPendingTxs_Sync(t *testing.T) {
5959

6060
t.Run("Done before Wait", func(t *testing.T) {
6161
p := queue.NewCompletionPromise(time.Hour)
62-
pendingTxs.Add("tx3", p)
62+
require.False(t, pendingTxs.Add("tx3", p))
6363
pendingTxs.DoneWithReceipt([]string{"tx3"}, blockHeader)
6464
actualReceipt, err := p.Wait()
6565
require.NoError(t, err)
@@ -68,7 +68,7 @@ func TestPendingTxs_Sync(t *testing.T) {
6868

6969
t.Run("Wait before Release with Error", func(t *testing.T) {
7070
p := queue.NewCompletionPromise(time.Hour)
71-
pendingTxs.Add("tx3", p)
71+
require.False(t, pendingTxs.Add("tx3", p))
7272

7373
go func() {
7474
time.Sleep(10 * time.Millisecond)
@@ -82,7 +82,7 @@ func TestPendingTxs_Sync(t *testing.T) {
8282

8383
t.Run("Release with Error before Wait", func(t *testing.T) {
8484
p := queue.NewCompletionPromise(time.Hour)
85-
pendingTxs.Add("tx3", p)
85+
require.False(t, pendingTxs.Add("tx3", p))
8686
pendingTxs.ReleaseWithError([]string{"tx3"}, &ierrors.NotLeaderError{LeaderID: 1, LeaderHostPort: "10.10.10.10:666"})
8787
actualReceipt, err := p.Wait()
8888
require.EqualError(t, err, "not a leader, leader is RaftID: 1, with HostPort: 10.10.10.10:666")
@@ -94,7 +94,7 @@ func TestPendingTxs_Timeout(t *testing.T) {
9494
pendingTxs := queue.NewPendingTxs(testLogger(t, "debug"))
9595

9696
p := queue.NewCompletionPromise(1 * time.Millisecond)
97-
pendingTxs.Add("tx3", p)
97+
require.False(t, pendingTxs.Add("tx3", p))
9898

9999
var wg sync.WaitGroup
100100
wg.Add(1)

0 commit comments

Comments
 (0)