diff --git a/pkg/client/event/event.go b/pkg/client/event/event.go index 0062d99206..9bc3a87c44 100644 --- a/pkg/client/event/event.go +++ b/pkg/client/event/event.go @@ -34,6 +34,7 @@ type Client struct { fromBlock uint64 seekType seek.Type eventConsumerTimeout *time.Duration + noCacheInit bool } // New returns a Client instance. Client receives events such as block, filtered block, @@ -58,24 +59,7 @@ func New(channelProvider context.ChannelProvider, opts ...ClientOption) (*Client return nil, errors.New("channel service not initialized") } - var es fab.EventService - if eventClient.permitBlockEvents { - var opts []options.Opt - opts = append(opts, client.WithBlockEvents()) - if eventClient.seekType != "" { - opts = append(opts, deliverclient.WithSeekType(eventClient.seekType)) - if eventClient.seekType == seek.FromBlock { - opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock)) - } - } - if eventClient.eventConsumerTimeout != nil { - opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout)) - } - es, err = channelContext.ChannelService().EventService(opts...) - } else { - es, err = channelContext.ChannelService().EventService() - } - + es, err := eventService(channelContext, &eventClient) if err != nil { return nil, errors.WithMessage(err, "event service creation failed") } @@ -130,3 +114,31 @@ func (c *Client) RegisterTxStatusEvent(txID string) (fab.Registration, <-chan *f func (c *Client) Unregister(reg fab.Registration) { c.eventService.Unregister(reg) } + +func blockEventOpts(eventClient *Client) (opts []options.Opt) { + opts = append(opts, client.WithBlockEvents()) + if eventClient.seekType != "" { + opts = append(opts, deliverclient.WithSeekType(eventClient.seekType)) + if eventClient.seekType == seek.FromBlock { + opts = append(opts, deliverclient.WithBlockNum(eventClient.fromBlock)) + } + } + if eventClient.eventConsumerTimeout != nil { + opts = append(opts, dispatcher.WithEventConsumerTimeout(*eventClient.eventConsumerTimeout)) + } + return +} + +func eventService(channelContext context.Channel, eventClient *Client) (es fab.EventService, err error) { + if eventClient.permitBlockEvents { + opts := blockEventOpts(eventClient) + if eventClient.noCacheInit { + es, err = channelContext.ChannelService().EventServiceNoCache(opts...) + } else { + es, err = channelContext.ChannelService().EventService(opts...) + } + } else { + es, err = channelContext.ChannelService().EventService() + } + return +} diff --git a/pkg/client/event/opts.go b/pkg/client/event/opts.go index 5803fabc56..3868daf631 100644 --- a/pkg/client/event/opts.go +++ b/pkg/client/event/opts.go @@ -15,6 +15,14 @@ import ( // ClientOption describes a functional parameter for the New constructor type ClientOption func(*Client) error +// WithNoCache indicates that event service must be initialized without cache. +func WithNoCache() ClientOption { + return func(c *Client) error { + c.noCacheInit = true + return nil + } +} + // WithBlockEvents indicates that block events are to be received. // Note that the caller must have sufficient privileges for this option. func WithBlockEvents() ClientOption { diff --git a/pkg/client/resmgmt/mockchannelservice.gen.go b/pkg/client/resmgmt/mockchannelservice.gen.go index 3958f3b6e7..2cbe66bae2 100644 --- a/pkg/client/resmgmt/mockchannelservice.gen.go +++ b/pkg/client/resmgmt/mockchannelservice.gen.go @@ -155,6 +155,23 @@ func (fake *MockChannelService) EventService(opts ...options.Opt) (fab.EventServ return fake.eventServiceReturns.result1, fake.eventServiceReturns.result2 } +func (fake *MockChannelService) EventServiceNoCache(opts ...options.Opt) (fab.EventService, error) { + fake.eventServiceMutex.Lock() + ret, specificReturn := fake.eventServiceReturnsOnCall[len(fake.eventServiceArgsForCall)] + fake.eventServiceArgsForCall = append(fake.eventServiceArgsForCall, struct { + opts []options.Opt + }{opts}) + fake.recordInvocation("EventService", []interface{}{opts}) + fake.eventServiceMutex.Unlock() + if fake.EventServiceStub != nil { + return fake.EventServiceStub(opts...) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fake.eventServiceReturns.result1, fake.eventServiceReturns.result2 +} + func (fake *MockChannelService) EventServiceCallCount() int { fake.eventServiceMutex.RLock() defer fake.eventServiceMutex.RUnlock() diff --git a/pkg/common/providers/fab/context.go b/pkg/common/providers/fab/context.go index 0f87fda5e7..91038c9073 100644 --- a/pkg/common/providers/fab/context.go +++ b/pkg/common/providers/fab/context.go @@ -16,6 +16,7 @@ import ( type ChannelService interface { Config() (ChannelConfig, error) EventService(opts ...options.Opt) (EventService, error) + EventServiceNoCache(opts ...options.Opt) (EventService, error) Membership() (ChannelMembership, error) ChannelConfig() (ChannelCfg, error) Transactor(reqCtx reqContext.Context) (Transactor, error) diff --git a/pkg/fab/mocks/mockchprovider.go b/pkg/fab/mocks/mockchprovider.go index 5bb1a4a314..b5312801d8 100644 --- a/pkg/fab/mocks/mockchprovider.go +++ b/pkg/fab/mocks/mockchprovider.go @@ -72,6 +72,11 @@ func (cs *MockChannelService) EventService(opts ...options.Opt) (fab.EventServic return NewMockEventService(), nil } +// EventServiceNoCache returns a mock event service +func (cs *MockChannelService) EventServiceNoCache(opts ...options.Opt) (fab.EventService, error) { + return NewMockEventService(), nil +} + // SetTransactor changes the return value of Transactor func (cs *MockChannelService) SetTransactor(t fab.Transactor) { cs.transactor = t diff --git a/pkg/fabsdk/provider/chpvdr/chprovider.go b/pkg/fabsdk/provider/chpvdr/chprovider.go index 0962fe77aa..9fe9e9aca3 100644 --- a/pkg/fabsdk/provider/chpvdr/chprovider.go +++ b/pkg/fabsdk/provider/chpvdr/chprovider.go @@ -104,6 +104,12 @@ func (cs *ChannelService) EventService(opts ...options.Opt) (fab.EventService, e return cs.ctxtCache.GetEventService(cs.channelID, opts...) } +// EventServiceNoCache creates EventService and returns it (without using cache). +func (cs *ChannelService) EventServiceNoCache(opts ...options.Opt) (fab.EventService, error) { + cs.ctxtCache.Drain(DrainEventSevice) + return cs.ctxtCache.GetEventService(cs.channelID, opts...) +} + // Membership returns and caches a channel member identifier // A membership reference is returned that refreshes with the configured interval func (cs *ChannelService) Membership() (fab.ChannelMembership, error) { diff --git a/pkg/fabsdk/provider/chpvdr/contextcache.go b/pkg/fabsdk/provider/chpvdr/contextcache.go index ec97ace47a..47c00f23c4 100644 --- a/pkg/fabsdk/provider/chpvdr/contextcache.go +++ b/pkg/fabsdk/provider/chpvdr/contextcache.go @@ -20,9 +20,25 @@ import ( "github.com/pkg/errors" ) +type drainType int + +const ( + // DrainEventSevice used to drain event service cache + DrainEventSevice drainType = iota + 1 + // DrainMembershipCache used to drain membership service cache + DrainMembershipCache + // DrainChCfgCache used to drain channel config cache + DrainChCfgCache + // DrainSelectionServiceCache used to drain selection service cache + DrainSelectionServiceCache + // DrainDiscoveryServiceCache used to drain discovery service cache + DrainDiscoveryServiceCache +) + type cache interface { Get(lazycache.Key, ...interface{}) (interface{}, error) Close() + DeleteAll() } type contextCache struct { @@ -82,6 +98,22 @@ func newContextCache(ctx fab.ClientContext, opts []options.Opt) *contextCache { return c } +// Drain deletes all entries from the specified cache. +func (c *contextCache) Drain(whatToDrain drainType) { + switch whatToDrain { + case DrainEventSevice: + c.eventServiceCache.DeleteAll() + case DrainMembershipCache: + c.membershipCache.DeleteAll() + case DrainChCfgCache: + c.chCfgCache.DeleteAll() + case DrainSelectionServiceCache: + c.selectionServiceCache.DeleteAll() + case DrainDiscoveryServiceCache: + c.discoveryServiceCache.DeleteAll() + } +} + func (c *contextCache) Close() { logger.Debug("Closing event service cache...") c.eventServiceCache.Close() @@ -170,6 +202,7 @@ func (c *contextCache) GetEventService(channelID string, opts ...options.Opt) (f if err != nil { return nil, err } + eventService, err := c.eventServiceCache.Get(key) if err != nil { return nil, err