Skip to content

Commit 8b0dff9

Browse files
Expose node component management
1 parent 4c8ac17 commit 8b0dff9

File tree

4 files changed

+133
-87
lines changed

4 files changed

+133
-87
lines changed

cmd/node.go

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,53 @@ type Node interface {
2929
// The Run function starts all the components, and is blocked until either a termination
3030
// signal is received or a irrecoverable error is encountered.
3131
type FlowNodeImp struct {
32-
component.Component
32+
NodeImp
3333
*NodeConfig
34+
}
35+
36+
// NodeImp can be used to create a node instance from:
37+
// - a logger: to be used during startup and shutdown
38+
// - a component: that will be started with Run
39+
// - a cleanup function: that will be called after the component has been stopped
40+
// - a fatal error handler: to handle any error received from the component
41+
type NodeImp struct {
42+
component.Component
3443
logger zerolog.Logger
3544
postShutdown func() error
3645
fatalHandler func(error)
3746
}
3847

3948
// NewNode returns a new node instance
40-
func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node {
49+
func NewNode(
50+
component component.Component,
51+
cfg *NodeConfig,
52+
logger zerolog.Logger,
53+
cleanup func() error,
54+
handleFatal func(error),
55+
) Node {
4156
return &FlowNodeImp{
57+
NodeConfig: cfg,
58+
NodeImp: NewBaseNode(
59+
component,
60+
logger.With().
61+
Str("node_role", cfg.BaseConfig.NodeRole).
62+
Hex("spork_id", logging.ID(cfg.SporkID)).
63+
Logger(),
64+
cleanup,
65+
handleFatal,
66+
),
67+
}
68+
}
69+
70+
// NewBaseNode returns a new base node instance
71+
func NewBaseNode(
72+
component component.Component,
73+
logger zerolog.Logger,
74+
cleanup func() error,
75+
handleFatal func(error),
76+
) NodeImp {
77+
return NodeImp{
4278
Component: component,
43-
NodeConfig: cfg,
4479
logger: logger,
4580
postShutdown: cleanup,
4681
fatalHandler: handleFatal,
@@ -51,13 +86,11 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg
5186
// which point it gracefully shuts down.
5287
// Any unhandled irrecoverable errors thrown in child components will propagate up to here and
5388
// result in a fatal error.
54-
func (node *FlowNodeImp) Run() {
55-
// Cancelling this context notifies all child components that it's time to shutdown
56-
ctx, cancel := context.WithCancel(context.Background())
57-
defer cancel()
89+
func (node *NodeImp) Run() {
90+
ctx := context.Background()
5891

5992
// Block until node is shutting down
60-
err := node.run(ctx, cancel)
93+
err := node.run(ctx)
6194

6295
// Any error received is considered fatal.
6396
if err != nil {
@@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() {
73106
node.logger.Error().Err(err).Msg("error encountered during cleanup")
74107
}
75108

76-
node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole)
109+
node.logger.Info().Msg("node shutdown complete")
77110
}
78111

79112
// run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered.
80113
// It returns:
81114
// - nil if a termination signal is received, and all components have been gracefully stopped.
82-
// - error if a irrecoverable error is received
83-
func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error {
115+
// - error if an irrecoverable error is received
116+
func (node *NodeImp) run(ctx context.Context) error {
117+
// Cancelling this context notifies all child components that it's time to shut down
118+
ctx, shutdown := context.WithCancel(ctx)
119+
defer shutdown()
120+
84121
// Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a
85122
// child context). Any errors received on this channel should halt the node.
86123
signalerCtx, errChan := irrecoverable.WithSignaler(ctx)
@@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e
97134
select {
98135
case <-node.Ready():
99136
node.logger.Info().
100-
Hex("spork_id", logging.ID(node.SporkID)).
101-
Msgf("%s node startup complete", node.BaseConfig.NodeRole)
137+
Msg("node startup complete")
102138
case <-ctx.Done():
103139
}
104140
}()
@@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e
118154

119155
// 3: Shut down
120156
// Send shutdown signal to components
121-
node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole)
157+
node.logger.Info().Msg("node shutting down")
122158
shutdown()
123159

124160
// Block here until all components have stopped or an irrecoverable error is received.

cmd/node_builder.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ import (
3333
const NotSet = "not set"
3434

3535
type BuilderFunc func(nodeConfig *NodeConfig) error
36-
type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error)
36+
37+
// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if
38+
// the factory cannot create the component
39+
type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error)
3740

3841
// NodeBuilder declares the initialization methods needed to bootstrap up a Flow node
3942
type NodeBuilder interface {
@@ -73,7 +76,7 @@ type NodeBuilder interface {
7376
// The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance.
7477
// In both cases, the object is started according to its interface when the node is run,
7578
// and the node will wait for the component to exit gracefully.
76-
Component(name string, f ReadyDoneFactory) NodeBuilder
79+
Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder
7780

7881
// DependableComponent adds a new component to the node that conforms to the ReadyDoneAware
7982
// interface. The builder will wait until all of the components in the dependencies list are ready
@@ -86,15 +89,15 @@ type NodeBuilder interface {
8689
// IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all
8790
// dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method
8891
// MUST be idempotent.
89-
DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder
92+
DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder
9093

9194
// RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware
9295
// interface, and calls the provided error handler when an irrecoverable error is encountered.
9396
// Use RestartableComponent if the component is not critical to the node's safe operation and
9497
// can/should be independently restarted when an irrecoverable error is encountered.
9598
//
9699
// Any irrecoverable errors thrown by the component will be passed to the provided error handler.
97-
RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder
100+
RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder
98101

99102
// ShutdownFunc adds a callback function that is called after all components have exited.
100103
// All shutdown functions are called regardless of errors returned by previous callbacks. Any
@@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig {
299302
// DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent
300303
// to define the list of dependencies that must be ready before starting the component.
301304
type DependencyList struct {
302-
components []module.ReadyDoneAware
305+
Components []module.ReadyDoneAware
303306
}
304307

305308
func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList {
306309
return &DependencyList{
307-
components: components,
310+
Components: components,
308311
}
309312
}
310313

311314
// Add adds a new ReadyDoneAware implementation to the list of dependencies.
312315
func (d *DependencyList) Add(component module.ReadyDoneAware) {
313-
d.components = append(d.components, component)
316+
d.Components = append(d.Components, component)
314317
}

0 commit comments

Comments
 (0)