Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 62 additions & 34 deletions blockchain/blockdao/grpcblockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"crypto/tls"
"encoding/hex"
"fmt"
"sync/atomic"

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
Expand All @@ -27,14 +26,14 @@ import (
"github.com/iotexproject/iotex-core/v2/pkg/log"
)

type GrpcBlockDAO struct {
type grpcBlockDAO struct {
url string
insecure bool
conn *grpc.ClientConn
client blockdaopb.BlockDAOServiceClient
containsTransactionLog bool
deserializer *block.Deserializer
localHeight atomic.Uint64
cache *memoryDao
}

var (
Expand All @@ -44,19 +43,22 @@ var (
ErrAlreadyExist = fmt.Errorf("block already exists")
)

// NewGrpcBlockDAO returns a GrpcBlockDAO instance
func NewGrpcBlockDAO(
url string,
insecure bool,
deserializer *block.Deserializer,
) *GrpcBlockDAO {
return &GrpcBlockDAO{
cacheSize uint64,
) BlockStore {
return &grpcBlockDAO{
url: url,
insecure: insecure,
deserializer: deserializer,
cache: newMemoryDao(cacheSize),
}
}

func (gbd *GrpcBlockDAO) Start(ctx context.Context) error {
func (gbd *grpcBlockDAO) Start(ctx context.Context) error {
log.L().Debug("Starting gRPC block DAO...", zap.String("url", gbd.url))
var err error
opts := []grpc.DialOption{}
Expand All @@ -81,41 +83,54 @@ func (gbd *GrpcBlockDAO) Start(ctx context.Context) error {
if err != nil {
return err
}
gbd.localHeight.Store(height)
return nil
blk, err := gbd.blockByHeight(height)
if err != nil {
return err
}

// NOTE: it won't work correctly if block height is zero
return gbd.cache.PutBlock(blk)
}

func (gbd *GrpcBlockDAO) Stop(ctx context.Context) error {
func (gbd *grpcBlockDAO) Stop(ctx context.Context) error {
return gbd.conn.Close()
}

func (gbd *GrpcBlockDAO) Height() (uint64, error) {
return gbd.localHeight.Load(), nil
func (gbd *grpcBlockDAO) Height() (uint64, error) {
return gbd.cache.TipHeight(), nil
}

func (gbd *GrpcBlockDAO) rpcHeight() (uint64, error) {
func (gbd *grpcBlockDAO) rpcHeight() (uint64, error) {
response, err := gbd.client.Height(context.Background(), &emptypb.Empty{})
if err != nil {
return 0, err
}
return response.Height, nil
}

func (gbd *GrpcBlockDAO) GetBlockHash(height uint64) (hash.Hash256, error) {
func (gbd *grpcBlockDAO) GetBlockHash(height uint64) (hash.Hash256, error) {
h, ok := gbd.cache.BlockHash(height)
if ok {
return h, nil
}
response, err := gbd.client.GetBlockHash(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
if err != nil {
return hash.ZeroHash256, err
}
h, err := hash.HexStringToHash256(response.Hash)
h, err = hash.HexStringToHash256(response.Hash)
if err != nil {
return hash.ZeroHash256, err
}
return h, nil
}

func (gbd *GrpcBlockDAO) GetBlockHeight(h hash.Hash256) (uint64, error) {
func (gbd *grpcBlockDAO) GetBlockHeight(h hash.Hash256) (uint64, error) {
height, ok := gbd.cache.BlockHeight(h)
if ok {
return height, nil
}
response, err := gbd.client.GetBlockHeight(context.Background(), &blockdaopb.BlockHashRequest{
Hash: hex.EncodeToString(h[:]),
})
Expand All @@ -126,7 +141,11 @@ func (gbd *GrpcBlockDAO) GetBlockHeight(h hash.Hash256) (uint64, error) {
return response.Height, nil
}

func (gbd *GrpcBlockDAO) GetBlock(h hash.Hash256) (*block.Block, error) {
func (gbd *grpcBlockDAO) GetBlock(h hash.Hash256) (*block.Block, error) {
blk, ok := gbd.cache.BlockByHash(h)
if ok {
return blk, nil
}
response, err := gbd.client.GetBlock(context.Background(), &blockdaopb.BlockHashRequest{
Hash: hex.EncodeToString(h[:]),
})
Expand All @@ -137,10 +156,19 @@ func (gbd *GrpcBlockDAO) GetBlock(h hash.Hash256) (*block.Block, error) {
return gbd.deserializer.FromBlockProto(response.Block)
}

func (gbd *GrpcBlockDAO) GetBlockByHeight(height uint64) (*block.Block, error) {
func (gbd *grpcBlockDAO) GetBlockByHeight(height uint64) (*block.Block, error) {
if height == 0 {
return block.GenesisBlock(), nil
}
blk, ok := gbd.cache.BlockByHeight(height)
if ok {
return blk, nil
}

return gbd.blockByHeight(height)
}

func (gbd *grpcBlockDAO) blockByHeight(height uint64) (*block.Block, error) {
response, err := gbd.client.GetBlockByHeight(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
Expand All @@ -151,7 +179,7 @@ func (gbd *GrpcBlockDAO) GetBlockByHeight(height uint64) (*block.Block, error) {
return gbd.deserializer.FromBlockProto(response.Block)
}

func (gbd *GrpcBlockDAO) GetReceipts(height uint64) ([]*action.Receipt, error) {
func (gbd *grpcBlockDAO) GetReceipts(height uint64) ([]*action.Receipt, error) {
response, err := gbd.client.GetReceipts(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
Expand All @@ -169,11 +197,11 @@ func (gbd *GrpcBlockDAO) GetReceipts(height uint64) ([]*action.Receipt, error) {
return receipts, nil
}

func (gbd *GrpcBlockDAO) ContainsTransactionLog() bool {
func (gbd *grpcBlockDAO) ContainsTransactionLog() bool {
return gbd.containsTransactionLog
}

func (gbd *GrpcBlockDAO) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, error) {
func (gbd *grpcBlockDAO) TransactionLogs(height uint64) (*iotextypes.TransactionLogs, error) {
response, err := gbd.client.TransactionLogs(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
Expand All @@ -184,28 +212,22 @@ func (gbd *GrpcBlockDAO) TransactionLogs(height uint64) (*iotextypes.Transaction
return response.TransactionLogs, nil
}

func (gbd *GrpcBlockDAO) PutBlock(ctx context.Context, blk *block.Block) error {
localHeight := gbd.localHeight.Load()
switch {
case blk.Height() <= localHeight:
return errors.Wrapf(ErrAlreadyExist, "block height %d, local height %d", blk.Height(), localHeight)
case blk.Height() > localHeight+1:
return errors.Errorf("block height %d is larger than local height %d + 1", blk.Height(), localHeight)
}

func (gbd *grpcBlockDAO) PutBlock(ctx context.Context, blk *block.Block) error {
remoteHeight, err := gbd.rpcHeight()
if err != nil {
return err
}
if blk.Height() <= remoteHeight {
gbd.localHeight.Store(blk.Height())
// remote block is already exist
return nil
return gbd.cache.PutBlock(blk)
}
return errors.Wrapf(ErrRemoteHeightTooLow, "block height %d, remote height %d", blk.Height(), remoteHeight)
}

func (gbd *GrpcBlockDAO) Header(h hash.Hash256) (*block.Header, error) {
func (gbd *grpcBlockDAO) Header(h hash.Hash256) (*block.Header, error) {
if header, ok := gbd.cache.HeaderByHash(h); ok {
return header, nil
}
response, err := gbd.client.Header(context.Background(), &blockdaopb.BlockHashRequest{
Hash: hex.EncodeToString(h[:]),
})
Expand All @@ -219,7 +241,10 @@ func (gbd *GrpcBlockDAO) Header(h hash.Hash256) (*block.Header, error) {
return header, nil
}

func (gbd *GrpcBlockDAO) HeaderByHeight(height uint64) (*block.Header, error) {
func (gbd *grpcBlockDAO) HeaderByHeight(height uint64) (*block.Header, error) {
if header, ok := gbd.cache.HeaderByHeight(height); ok {
return header, nil
}
response, err := gbd.client.HeaderByHeight(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
Expand All @@ -233,7 +258,10 @@ func (gbd *GrpcBlockDAO) HeaderByHeight(height uint64) (*block.Header, error) {
return header, nil
}

func (gbd *GrpcBlockDAO) FooterByHeight(height uint64) (*block.Footer, error) {
func (gbd *grpcBlockDAO) FooterByHeight(height uint64) (*block.Footer, error) {
if footer, ok := gbd.cache.FooterByHeight(height); ok {
return footer, nil
}
response, err := gbd.client.FooterByHeight(context.Background(), &blockdaopb.BlockHeightRequest{
Height: height,
})
Expand Down
135 changes: 135 additions & 0 deletions blockchain/blockdao/memorydao.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2024 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package blockdao

import (
"sync"

"github.com/iotexproject/go-pkgs/hash"
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/blockchain/block"
)

type (
memoryDao struct {
mu sync.RWMutex
tipHeight uint64
blocks map[uint64]*block.Block
hashesToBlockHeight map[hash.Hash256]uint64
blockHashes map[uint64]hash.Hash256
cap uint64
}
)

func newMemoryDao(cap uint64) *memoryDao {
return &memoryDao{
mu: sync.RWMutex{},
blocks: make(map[uint64]*block.Block, cap),
hashesToBlockHeight: make(map[hash.Hash256]uint64, cap),
blockHashes: make(map[uint64]hash.Hash256, cap),
cap: cap,
}
}

func (md *memoryDao) TipHeight() uint64 {
md.mu.RLock()
defer md.mu.RUnlock()
return md.tipHeight
}

func (md *memoryDao) PutBlock(blk *block.Block) error {
md.mu.Lock()
defer md.mu.Unlock()
blkHeight := blk.Height()
if blk.Height() <= md.tipHeight {
return errors.Wrapf(ErrAlreadyExist, "block height %d, local height %d", blk.Height(), md.tipHeight)
}
if blk.Height() > md.tipHeight+1 {
return errors.Errorf("block height %d is larger than local height %d + 1", blk.Height(), md.tipHeight)
}
blkHash := blk.HashBlock()
if uint64(len(md.blocks)) >= md.cap && blkHeight > md.cap {
toDelete, ok := md.blockHashes[blkHeight-md.cap]
if ok {
delete(md.blocks, blkHeight-md.cap)
delete(md.blockHashes, blkHeight-md.cap)
delete(md.hashesToBlockHeight, toDelete)
}
}
md.blocks[blkHeight] = blk
md.hashesToBlockHeight[blkHash] = blkHeight
md.blockHashes[blkHeight] = blkHash
md.tipHeight = blkHeight

return nil
}

func (md *memoryDao) BlockByHeight(height uint64) (*block.Block, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
blk, ok := md.blocks[height]
return blk, ok
}

func (md *memoryDao) HeaderByHeight(height uint64) (*block.Header, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
blk, ok := md.blocks[height]
if !ok {
return nil, false
}
return &blk.Header, true
}

func (md *memoryDao) FooterByHeight(height uint64) (*block.Footer, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
blk, ok := md.blocks[height]
if !ok {
return nil, false
}
return &blk.Footer, true
}

func (md *memoryDao) BlockHash(height uint64) (hash.Hash256, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
hash, ok := md.blockHashes[height]
return hash, ok
}

func (md *memoryDao) BlockHeight(h hash.Hash256) (uint64, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
height, ok := md.hashesToBlockHeight[h]
return height, ok
}

func (md *memoryDao) BlockByHash(h hash.Hash256) (*block.Block, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
height, ok := md.hashesToBlockHeight[h]
if !ok {
return nil, false
}
blk, ok := md.blocks[height]
return blk, ok
}

func (md *memoryDao) HeaderByHash(h hash.Hash256) (*block.Header, bool) {
md.mu.RLock()
defer md.mu.RUnlock()
height, ok := md.hashesToBlockHeight[h]
if !ok {
return nil, false
}
blk, ok := md.blocks[height]
if !ok {
return nil, false
}
return &blk.Header, true
}
14 changes: 12 additions & 2 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,23 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
if err != nil {
return errors.Wrapf(err, "failed to parse chain db path %s", path)
}
serializer := block.NewDeserializer(builder.cfg.Chain.EVMNetworkID)
switch uri.Scheme {
case "grpc":
store = blockdao.NewGrpcBlockDAO(uri.Host, uri.Query().Get("insecure") == "true", block.NewDeserializer(builder.cfg.Chain.EVMNetworkID))
cacheSize := uint64(256)
if cfg.Chain.HistoryBlockRetention > 0 {
cacheSize = cfg.Chain.HistoryBlockRetention
}
store = blockdao.NewGrpcBlockDAO(
uri.Host,
uri.Query().Get("insecure") == "true",
serializer,
cacheSize,
)
case "file", "":
dbConfig := cfg.DB
dbConfig.DbPath = uri.Path
store, err = filedao.NewFileDAO(dbConfig, block.NewDeserializer(builder.cfg.Chain.EVMNetworkID))
store, err = filedao.NewFileDAO(dbConfig, serializer)
default:
return errors.Errorf("unsupported blockdao scheme %s", uri.Scheme)
}
Expand Down