Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.

Commit 7a94fbc

Browse files
authored
Added support for fromBlock when generating eventservice cache keys (#184)
* Added support for seek type and fromBlock parameters when generating event service cache keys Signed-off-by: Jim Zhang <[email protected]> * Add chaincode ID to the cache key calculation Signed-off-by: Jim Zhang <[email protected]> * Fix lint errors Signed-off-by: Jim Zhang <[email protected]> * revert local temporary changes checked in by accident Signed-off-by: Jim Zhang <[email protected]> * Unit test coverage Signed-off-by: Jim Zhang <[email protected]> * Add unit test to cachekey.go Signed-off-by: Jim Zhang <[email protected]>
1 parent e866365 commit 7a94fbc

File tree

10 files changed

+99
-5
lines changed

10 files changed

+99
-5
lines changed

pkg/client/channel/chclientrun_std.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build !pprof
12
// +build !pprof
23

34
/*

pkg/client/event/event.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ type Client struct {
3333
permitBlockEvents bool
3434
fromBlock uint64
3535
seekType seek.Type
36+
chaincodeID string
3637
eventConsumerTimeout *time.Duration
3738
}
3839

3940
// New returns a Client instance. Client receives events such as block, filtered block,
4041
// chaincode, and transaction status events.
42+
// nolint: gocyclo
4143
func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client, error) {
4244

4345
channelContext, err := channelProvider()
@@ -68,6 +70,9 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client
6870
opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock))
6971
}
7072
}
73+
if eventClient.chaincodeID != "" {
74+
opts = append(opts, deliverclient.WithChaincodeID(eventClient.chaincodeID))
75+
}
7176
if eventClient.eventConsumerTimeout != nil {
7277
opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout))
7378
}

pkg/client/event/event_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import (
2020
"github.com/pkg/errors"
2121
"github.com/stretchr/testify/assert"
2222

23+
pb "github.com/hyperledger/fabric-protos-go/peer"
2324
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
2425
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service"
2526
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher"
2627
servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks"
27-
pb "github.com/hyperledger/fabric-protos-go/peer"
2828
)
2929

3030
var (
@@ -43,7 +43,7 @@ func TestNewEventClient(t *testing.T) {
4343
t.Fatalf("Failed to create new event client: %s", err)
4444
}
4545

46-
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500 * time.Millisecond))
46+
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.Newest), WithBlockNum(math.MaxUint64), WithEventConsumerTimeout(500*time.Millisecond), WithChaincodeID("testChaincode"))
4747
if err != nil {
4848
t.Fatalf("Failed to create new event client: %s", err)
4949
}
@@ -55,6 +55,22 @@ func TestNewEventClient(t *testing.T) {
5555
}
5656
}
5757

58+
func TestNewEventClientWithFromBlock(t *testing.T) {
59+
60+
fabCtx := setupCustomTestContext(t, nil)
61+
ctx := createChannelContext(fabCtx, channelID)
62+
63+
_, err := New(ctx)
64+
if err != nil {
65+
t.Fatalf("Failed to create new event client: %s", err)
66+
}
67+
68+
_, err = New(ctx, WithBlockEvents(), WithSeekType(seek.FromBlock), WithBlockNum(100), WithChaincodeID("testChaincode"))
69+
if err != nil {
70+
t.Fatalf("Failed to create new event client: %s", err)
71+
}
72+
}
73+
5874
func TestBlockEvents(t *testing.T) {
5975

6076
eventService, eventProducer, err := newServiceWithMockProducer(defaultOpts, withBlockLedger(sourceURL))

pkg/client/event/example_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ func ExampleClient_RegisterChaincodeEvent() {
7777

7878
}
7979

80+
func ExampleClient_RegisterChaincodeEvent_NewService() {
81+
82+
ec, err := New(mockChannelProvider("mychannel"), WithChaincodeID("examplecc"))
83+
if err != nil {
84+
fmt.Println("failed to create client")
85+
}
86+
87+
registration, _, err := ec.RegisterChaincodeEvent("examplecc", "event123")
88+
if err != nil {
89+
fmt.Println("failed to register chaincode event")
90+
}
91+
defer ec.Unregister(registration)
92+
93+
fmt.Println("chaincode event registered successfully")
94+
95+
// Output: chaincode event registered successfully
96+
97+
}
98+
8099
func ExampleClient_RegisterChaincodeEvent_withPayload() {
81100

82101
// If you require payload for chaincode events you have to use WithBlockEvents() option

pkg/client/event/opts.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ func WithSeekType(seek seek.Type) ClientOption {
4242
}
4343
}
4444

45+
// WithChaincodeID indicates the target chaincode
46+
// Only deliverclient supports this
47+
func WithChaincodeID(id string) ClientOption {
48+
return func(c *Client) error {
49+
c.chaincodeID = id
50+
return nil
51+
}
52+
}
53+
4554
// WithEventConsumerTimeout is the timeout when sending events to a registered consumer.
4655
// If < 0, if buffer full, unblocks immediately and does not send.
4756
// If 0, if buffer full, will block and guarantee the event will be sent out.

pkg/fab/events/deliverclient/deliverclient_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func TestClientConnect(t *testing.T) {
7272
),
7373
WithSeekType(seek.FromBlock),
7474
WithBlockNum(0),
75+
WithChaincodeID("testChaincode"),
7576
client.WithResponseTimeout(3*time.Second),
7677
)
7778
if err != nil {

pkg/fab/events/deliverclient/opts.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type params struct {
1919
connProvider api.ConnectionProvider
2020
seekType seek.Type
2121
fromBlock uint64
22+
chaincodeID string
2223
respTimeout time.Duration
2324
}
2425

@@ -48,6 +49,15 @@ func WithBlockNum(value uint64) options.Opt {
4849
}
4950
}
5051

52+
// WithChaincodeID specifies the chaincode from which events are to be received.
53+
func WithChaincodeID(value string) options.Opt {
54+
return func(p options.Params) {
55+
if setter, ok := p.(chaincodeIDSetter); ok {
56+
setter.SetChaincodeID(value)
57+
}
58+
}
59+
}
60+
5161
type seekTypeSetter interface {
5262
SetSeekType(value seek.Type)
5363
}
@@ -56,6 +66,10 @@ type fromBlockSetter interface {
5666
SetFromBlock(value uint64)
5767
}
5868

69+
type chaincodeIDSetter interface {
70+
SetChaincodeID(value string)
71+
}
72+
5973
func (p *params) PermitBlockEvents() {
6074
logger.Debug("PermitBlockEvents")
6175
p.connProvider = deliverProvider
@@ -79,6 +93,11 @@ func (p *params) SetSeekType(value seek.Type) {
7993
}
8094
}
8195

96+
func (p *params) SetChaincodeID(value string) {
97+
logger.Debugf("ChaincodId: %d", value)
98+
p.chaincodeID = value
99+
}
100+
82101
func (p *params) SetResponseTimeout(value time.Duration) {
83102
logger.Debugf("ResponseTimeout: %s", value)
84103
p.respTimeout = value

pkg/fabsdk/fabsdk_std.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build !pprof
12
// +build !pprof
23

34
/*

pkg/fabsdk/provider/chpvdr/cachekey.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ package chpvdr
88

99
import (
1010
"crypto/sha256"
11-
"strconv"
11+
"fmt"
1212

1313
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
1414
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
15+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient/seek"
1516
)
1617

1718
// ctxtCacheKey is a lazy cache key for the context cache
@@ -99,6 +100,9 @@ func (k *eventCacheKey) String() string {
99100

100101
type params struct {
101102
permitBlockEvents bool
103+
seekType seek.Type
104+
fromBlock uint64
105+
chaincodeID string
102106
}
103107

104108
func defaultParams() *params {
@@ -109,8 +113,24 @@ func (p *params) PermitBlockEvents() {
109113
p.permitBlockEvents = true
110114
}
111115

116+
func (p *params) SetFromBlock(value uint64) {
117+
p.fromBlock = value
118+
}
119+
120+
func (p *params) SetSeekType(value seek.Type) {
121+
if value != "" {
122+
p.seekType = value
123+
}
124+
}
125+
126+
func (p *params) SetChaincodeID(value string) {
127+
if value != "" {
128+
p.chaincodeID = value
129+
}
130+
}
131+
112132
func (p *params) getOptKey() string {
113133
// Construct opts portion
114-
optKey := "blockEvents:" + strconv.FormatBool(p.permitBlockEvents)
134+
optKey := fmt.Sprintf("blockEvents:%t,seekType:%s,fromBlock:%d,chaincodeId:%s", p.permitBlockEvents, p.seekType, p.fromBlock, p.chaincodeID)
115135
return optKey
116136
}

pkg/fabsdk/provider/chpvdr/chprovider_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build testing
12
// +build testing
23

34
/*
@@ -24,6 +25,8 @@ import (
2425
"github.com/hyperledger/fabric-sdk-go/pkg/fab/chconfig"
2526
"github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
2627
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
28+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client"
29+
"github.com/hyperledger/fabric-sdk-go/pkg/fab/events/deliverclient"
2730
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
2831
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
2932
"github.com/pkg/errors"
@@ -83,7 +86,7 @@ func TestBasicValidChannel(t *testing.T) {
8386
assert.NotNil(t, channelConfig)
8487
assert.NotEmptyf(t, channelConfig.ID(), "Got empty channel ID from channel config")
8588

86-
eventService, err := channelService.EventService()
89+
eventService, err := channelService.EventService(client.WithBlockEvents(), deliverclient.WithSeekType("from"), deliverclient.WithBlockNum(10), deliverclient.WithChaincodeID("testChaincode"))
8790
require.NoError(t, err)
8891
require.NotNil(t, eventService)
8992

0 commit comments

Comments
 (0)