Skip to content
Merged
17 changes: 9 additions & 8 deletions client/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"fmt"

rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
Expand Down Expand Up @@ -33,39 +34,39 @@ func (c *Client) CreateBatch() clientTypes.Batch {
}
}

func (c *Client) GetLatestBlockNumber() (uint64, error) {
status, err := c.client.Status()
func (c *Client) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
status, err := c.client.Status(ctx, nil)
if err != nil {
return 0, fmt.Errorf("unable to get chain status, %w", err)
}

return uint64(status.SyncInfo.LatestBlockHeight), nil
}

func (c *Client) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) {
func (c *Client) GetBlock(ctx context.Context, blockNum uint64) (*core_types.ResultBlock, error) {
bn := int64(blockNum)

block, err := c.client.Block(&bn)
block, err := c.client.Block(ctx, &bn)
if err != nil {
return nil, fmt.Errorf("unable to get block, %w", err)
}

return block, nil
}

func (c *Client) GetGenesis() (*core_types.ResultGenesis, error) {
genesis, err := c.client.Genesis()
func (c *Client) GetGenesis(ctx context.Context) (*core_types.ResultGenesis, error) {
genesis, err := c.client.Genesis(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get genesis block, %w", err)
}

return genesis, nil
}

func (c *Client) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
func (c *Client) GetBlockResults(ctx context.Context, blockNum uint64) (*core_types.ResultBlockResults, error) {
bn := int64(blockNum)

results, err := c.client.BlockResults(&bn)
results, err := c.client.BlockResults(ctx, &bn)
if err != nil {
return nil, fmt.Errorf("unable to get block results, %w", err)
}
Expand Down
18 changes: 9 additions & 9 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func New(
return f
}

func (f *Fetcher) fetchGenesisData() error {
func (f *Fetcher) fetchGenesisData(ctx context.Context) error {
_, err := f.storage.GetLatestHeight()
// Possible cases:
// - err is ErrNotFound: the storage is empty, we execute the rest of the routine and fetch+write genesis data
Expand All @@ -86,12 +86,12 @@ func (f *Fetcher) fetchGenesisData() error {

f.logger.Info("Fetching genesis")

block, err := getGenesisBlock(f.client)
block, err := getGenesisBlock(ctx, f.client)
if err != nil {
return fmt.Errorf("failed to fetch genesis block: %w", err)
}

results, err := f.client.GetBlockResults(0)
results, err := f.client.GetBlockResults(ctx, 0)
if err != nil {
return fmt.Errorf("failed to fetch genesis results: %w", err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (f *Fetcher) fetchGenesisData() error {
// blockchain data
func (f *Fetcher) FetchChainData(ctx context.Context) error {
// Attempt to fetch the genesis data
if err := f.fetchGenesisData(); err != nil {
if err := f.fetchGenesisData(ctx); err != nil {
// We treat this error as soft, to ease migration, since
// some versions of gno networks don't support this.
// In the future, we should hard fail if genesis is not fetch-able
Expand All @@ -156,7 +156,7 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error {
}

// Fetch the latest block from the chain
latestRemote, latestErr := f.client.GetLatestBlockNumber()
latestRemote, latestErr := f.client.GetLatestBlockNumber(ctx)
if latestErr != nil {
f.logger.Error("unable to fetch latest block number", zap.Error(latestErr))

Expand Down Expand Up @@ -322,22 +322,22 @@ func (f *Fetcher) writeSlot(s *slot) error {
return nil
}

func (f *Fetcher) IsReady() (bool, error) {
func (f *Fetcher) IsReady(ctx context.Context) (bool, error) {
if f.latestChunkSize == int(f.maxChunkSize) {
return false, fmt.Errorf("the data synchronization process is still in progress and hasn't "+
"caught up with the current blockchain state. Chunk size: %d", f.latestChunkSize)
}

_, err := f.client.GetLatestBlockNumber()
_, err := f.client.GetLatestBlockNumber(ctx)
if err != nil {
return false, fmt.Errorf("node RPC method is not reachable: %w", err)
}

return true, nil
}

func getGenesisBlock(client Client) (*bft_types.Block, error) {
gblock, err := client.GetGenesis()
func getGenesisBlock(ctx context.Context, client Client) (*bft_types.Block, error) {
gblock, err := client.GetGenesis(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get genesis block: %w", err)
}
Expand Down
14 changes: 7 additions & 7 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ func TestFetcher_Genesis(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.NoError(t, f.fetchGenesisData())
require.NoError(t, f.fetchGenesisData(context.Background()))

require.Len(t, capturedEvents, 1)

Expand Down Expand Up @@ -1140,7 +1140,7 @@ func TestFetcher_GenesisAlreadyFetched(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.NoError(t, f.fetchGenesisData())
require.NoError(t, f.fetchGenesisData(context.Background()))
}

func TestFetcher_GenesisFetchError(t *testing.T) {
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func TestFetcher_GenesisFetchError(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.ErrorIs(t, f.fetchGenesisData(), remoteErr)
require.ErrorIs(t, f.fetchGenesisData(context.Background()), remoteErr)
}

func TestFetcher_GenesisInvalidState(t *testing.T) {
Expand Down Expand Up @@ -1224,7 +1224,7 @@ func TestFetcher_GenesisInvalidState(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.ErrorContains(t, f.fetchGenesisData(), "unknown genesis state kind 'int'")
require.ErrorContains(t, f.fetchGenesisData(context.Background()), "unknown genesis state kind 'int'")
}

func TestFetcher_GenesisFetchResultsError(t *testing.T) {
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestFetcher_GenesisFetchResultsError(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.ErrorIs(t, f.fetchGenesisData(), remoteErr)
require.ErrorIs(t, f.fetchGenesisData(context.Background()), remoteErr)
}

func TestFetcher_GenesisNilGenesisDoc(t *testing.T) {
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func TestFetcher_GenesisNilGenesisDoc(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.Error(t, f.fetchGenesisData())
require.Error(t, f.fetchGenesisData(context.Background()))
}

func TestFetcher_GenesisNilResults(t *testing.T) {
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func TestFetcher_GenesisNilResults(t *testing.T) {

f := New(mockStorage, mockClient, mockEvents)

require.Error(t, f.fetchGenesisData())
require.Error(t, f.fetchGenesisData(context.Background()))
}

// generateTransactions generates dummy transactions
Expand Down
8 changes: 4 additions & 4 deletions fetch/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ type mockClient struct {
createBatchFn createBatchDelegate
}

func (m *mockClient) GetLatestBlockNumber() (uint64, error) {
func (m *mockClient) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
if m.getLatestBlockNumberFn != nil {
return m.getLatestBlockNumberFn()
}

return 0, nil
}

func (m *mockClient) GetBlock(blockNum uint64) (*core_types.ResultBlock, error) {
func (m *mockClient) GetBlock(ctx context.Context, blockNum uint64) (*core_types.ResultBlock, error) {
if m.getBlockFn != nil {
return m.getBlockFn(blockNum)
}

return nil, nil
}

func (m *mockClient) GetGenesis() (*core_types.ResultGenesis, error) {
func (m *mockClient) GetGenesis(ctx context.Context) (*core_types.ResultGenesis, error) {
if m.getGenesisFn != nil {
return m.getGenesisFn()
}

return nil, nil
}

func (m *mockClient) GetBlockResults(blockNum uint64) (*core_types.ResultBlockResults, error) {
func (m *mockClient) GetBlockResults(ctx context.Context, blockNum uint64) (*core_types.ResultBlockResults, error) {
if m.getBlockResultsFn != nil {
return m.getBlockResultsFn(blockNum)
}
Expand Down
10 changes: 6 additions & 4 deletions fetch/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fetch

import (
"context"

core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"

clientTypes "github.com/gnolang/tx-indexer/client/types"
Expand All @@ -10,17 +12,17 @@ import (
// Client defines the interface for the node (client) communication
type Client interface {
// GetLatestBlockNumber returns the latest block height from the chain
GetLatestBlockNumber() (uint64, error)
GetLatestBlockNumber(context.Context) (uint64, error)

// GetBlock returns specified block
GetBlock(uint64) (*core_types.ResultBlock, error)
GetBlock(context.Context, uint64) (*core_types.ResultBlock, error)

// GetGenesis returns the genesis block
GetGenesis() (*core_types.ResultGenesis, error)
GetGenesis(context.Context) (*core_types.ResultGenesis, error)

// GetBlockResults returns the results of executing the transactions
// for the specified block
GetBlockResults(uint64) (*core_types.ResultBlockResults, error)
GetBlockResults(context.Context, uint64) (*core_types.ResultBlockResults, error)

// CreateBatch creates a new client batch
CreateBatch() clientTypes.Batch
Expand Down
20 changes: 10 additions & 10 deletions fetch/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ func handleChunk(
errs := make([]error, 0)

// Get block data from the node
blocks, err := getBlocksFromBatch(info.chunkRange, client)
blocks, err := getBlocksFromBatch(ctx, info.chunkRange, client)
errs = append(errs, err)

results, err := getTxResultFromBatch(blocks, client)
results, err := getTxResultFromBatch(ctx, blocks, client)
errs = append(errs, err)

return &chunk{
Expand All @@ -61,7 +61,7 @@ func handleChunk(
// getBlocksFromBatch gets the blocks using batch requests.
// In case of encountering an error during fetching (remote temporarily closed, batch error...),
// the fetch is attempted again using sequential block fetches
func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, error) {
func getBlocksFromBatch(ctx context.Context, chunkRange chunkRange, client Client) ([]*types.Block, error) {
var (
batch = client.CreateBatch()
fetchedBlocks = make([]*types.Block, 0)
Expand All @@ -82,7 +82,7 @@ func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, e
blocksRaw, err := batch.Execute(context.Background())
if err != nil {
// Try to fetch sequentially
return getBlocksSequentially(chunkRange, client)
return getBlocksSequentially(ctx, chunkRange, client)
}

// Extract the blocks
Expand All @@ -100,15 +100,15 @@ func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, e
}

// getBlocksSequentially attempts to fetch blocks from the client, using sequential requests
func getBlocksSequentially(chunkRange chunkRange, client Client) ([]*types.Block, error) {
func getBlocksSequentially(ctx context.Context, chunkRange chunkRange, client Client) ([]*types.Block, error) {
var (
errs = make([]error, 0)
blocks = make([]*types.Block, 0)
)

for blockNum := chunkRange.from; blockNum <= chunkRange.to; blockNum++ {
// Get block info from the chain
block, err := client.GetBlock(blockNum)
block, err := client.GetBlock(ctx, blockNum)
if err != nil {
errs = append(errs, fmt.Errorf("unable to get block %d, %w", blockNum, err))

Expand All @@ -124,7 +124,7 @@ func getBlocksSequentially(chunkRange chunkRange, client Client) ([]*types.Block
// getTxResultFromBatch gets the tx results using batch requests.
// In case of encountering an error during fetching (remote temporarily closed, batch error...),
// the fetch is attempted again using sequential tx result fetches
func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
func getTxResultFromBatch(ctx context.Context, blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
var (
batch = client.CreateBatch()
fetchedResults = make([][]*types.TxResult, len(blocks))
Expand Down Expand Up @@ -158,7 +158,7 @@ func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxRe
blockResultsRaw, err := batch.Execute(context.Background())
if err != nil {
// Try to fetch sequentially
return getTxResultsSequentially(blocks, client)
return getTxResultsSequentially(ctx, blocks, client)
}

indexOfBlockHeight := make(map[int64]int, len(blocks))
Expand Down Expand Up @@ -198,7 +198,7 @@ func getTxResultFromBatch(blocks []*types.Block, client Client) ([][]*types.TxRe
}

// getTxResultsSequentially attempts to fetch tx results from the client, using sequential requests
func getTxResultsSequentially(blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
func getTxResultsSequentially(ctx context.Context, blocks []*types.Block, client Client) ([][]*types.TxResult, error) {
var (
errs = make([]error, 0)
results = make([][]*types.TxResult, len(blocks))
Expand All @@ -210,7 +210,7 @@ func getTxResultsSequentially(blocks []*types.Block, client Client) ([][]*types.
}

// Get the transaction execution results
blockResults, err := client.GetBlockResults(uint64(block.Height))
blockResults, err := client.GetBlockResults(ctx, uint64(block.Height))
if err != nil {
errs = append(
errs,
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/99designs/gqlgen v0.17.56
github.com/ajnavarro/gqlfiltergen v0.1.2
github.com/cockroachdb/pebble v1.1.5
github.com/gnolang/gno v0.0.0-20250716085632-95d5f5e743c9
github.com/gnolang/gno v0.0.0-20250903085916-8441a1e81345
github.com/go-chi/chi/v5 v5.2.1
github.com/go-chi/httprate v0.15.0
github.com/google/uuid v1.6.0
Expand All @@ -24,6 +24,7 @@ require (
require (
github.com/ajg/form v1.5.1 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gofrs/flock v0.12.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/sig-0/insertion-queue v0.0.0-20241004125609-6b3ca841346b // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand Down Expand Up @@ -81,13 +82,13 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/tools v0.32.0 // indirect
golang.org/x/mod v0.26.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/tools v0.35.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.69.4 // indirect
Expand Down
Loading
Loading