Skip to content

Commit 593db68

Browse files
author
jiasheng.yu
committed
support multiple configurable providers
- add ConfigurableFactory and Service.Register/Unregister APIs - build configurables list from builtin + registered factories - iterate configurables when resolving pipeline functions Signed-off-by: jiasheng.yu <[email protected]>
1 parent 5dd841c commit 593db68

File tree

4 files changed

+59
-5
lines changed

4 files changed

+59
-5
lines changed

internal/app/configupdates.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,13 @@ func (svc *Service) findMatchingFunction(configurable reflect.Value, functionNam
189189
functionType := functionValue.Type()
190190
return functionValue, functionType, nil
191191
}
192+
193+
func (svc *Service) findMatchingFunctionWrapper(configurables []reflect.Value, functionName string) (functionValue reflect.Value, functionType reflect.Type, err error) {
194+
for _, conconfigurable := range configurables {
195+
functionValue, functionType, err = svc.findMatchingFunction(conconfigurable, functionName)
196+
if err == nil {
197+
return
198+
}
199+
}
200+
return
201+
}

internal/app/service.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func NewService(serviceKey string, targetType interface{}, profileSuffixPlacehol
8080
serviceKey: serviceKey,
8181
targetType: targetType,
8282
profileSuffixPlaceholder: profileSuffixPlaceholder,
83+
configurableFactorMap: make(map[string]interfaces.ConfigurableFactory),
8384
}
8485
}
8586

@@ -104,6 +105,7 @@ type Service struct {
104105
flags *flags.Default
105106
configProcessor *config.Processor
106107
requestTimeout time.Duration
108+
configurableFactorMap map[string]interfaces.ConfigurableFactory
107109
}
108110

109111
type commandLineFlags struct {
@@ -281,7 +283,11 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
281283
return nil, fmt.Errorf("pipline TargetType of '%s' is not supported", svc.config.GetWritableInfo().Pipeline.TargetType)
282284
}
283285

284-
configurable := reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))
286+
configurables := []reflect.Value{reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))}
287+
for _, factory := range svc.configurableFactorMap {
288+
instance := factory(svc.lc, svc.SecretProvider())
289+
configurables = append(configurables, reflect.ValueOf(instance))
290+
}
285291
pipelineConfig := svc.config.GetWritableInfo().Pipeline
286292

287293
defaultExecutionOrder := strings.TrimSpace(pipelineConfig.ExecutionOrder)
@@ -294,7 +300,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
294300
svc.lc.Debugf("Default Function Pipeline Execution Order: [%s]", pipelineConfig.ExecutionOrder)
295301
functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(defaultExecutionOrder, util.SplitComma))
296302

297-
transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurable)
303+
transforms, err := svc.loadConfigurablePipelineTransforms(interfaces.DefaultPipelineId, functionNames, pipelineConfig.Functions, configurables)
298304
if err != nil {
299305
return nil, err
300306
}
@@ -312,7 +318,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
312318

313319
functionNames := util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.ExecutionOrder, util.SplitComma))
314320

315-
transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurable)
321+
transforms, err := svc.loadConfigurablePipelineTransforms(perTopicPipeline.Id, functionNames, pipelineConfig.Functions, configurables)
316322
if err != nil {
317323
return nil, err
318324
}
@@ -334,7 +340,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
334340
pipelineId string,
335341
executionOrder []string,
336342
functions map[string]common.PipelineFunction,
337-
configurable reflect.Value) ([]interfaces.AppFunction, error) {
343+
configurables []reflect.Value) ([]interfaces.AppFunction, error) {
338344
var transforms []interfaces.AppFunction
339345

340346
// set pipeline function parameter names to lowercase to avoid casing issues from what is in source configuration
@@ -347,7 +353,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
347353
return nil, fmt.Errorf("function '%s' configuration not found in Pipeline.Functions section for pipeline '%s'", functionName, pipelineId)
348354
}
349355

350-
functionValue, functionType, err := svc.findMatchingFunction(configurable, functionName)
356+
functionValue, functionType, err := svc.findMatchingFunctionWrapper(configurables, functionName)
351357
if err != nil {
352358
return nil, fmt.Errorf("%s for pipeline '%s'", err.Error(), pipelineId)
353359
}
@@ -766,3 +772,11 @@ func (svc *Service) PublishWithTopic(topic string, data any, contentType string)
766772

767773
return nil
768774
}
775+
776+
func (svc *Service) RegisterExternalConfigurable(name string, f interfaces.ConfigurableFactory) {
777+
svc.configurableFactorMap[name] = f
778+
}
779+
780+
func (svc *Service) UnregisterExternalConfigurable(name string) {
781+
delete(svc.configurableFactorMap, name)
782+
}

pkg/interfaces/mocks/ApplicationService.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/interfaces/service.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,27 @@ type ApplicationService interface {
193193
Publish(data any, contentType string) error
194194
// PublishWithTopic pushes data to the MessageBus using given topic
195195
PublishWithTopic(topic string, data any, contentType string) error
196+
// RegisterExternalConfigurable registers a named ConfigurableFactory.
197+
//
198+
// This allows runtime users to plug in their own "Configurable"-style provider,
199+
// i.e. a struct exposing methods such as:
200+
//
201+
// func (c *MyConfigurable) FilterByDeviceName(params map[string]string) interfaces.AppFunction
202+
//
203+
// Registration must happen before LoadConfigurableFunctionPipelines() is called,
204+
// typically in init() or early in main(). Once registered, the functions on the
205+
// provided configurable can be referenced in Writable.Pipeline configuration.
206+
RegisterExternalConfigurable(name string, f ConfigurableFactory)
207+
// UnregisterExternalConfigurable removes a previously registered factory by name.
208+
//
209+
// This can be used in tests or dynamic scenarios to clear or replace an external
210+
// configurable provider. Usually registration is done once at startup and does
211+
// not need to be removed in normal application flows.
212+
UnregisterExternalConfigurable(name string)
196213
}
214+
215+
// ConfigurableFactory creates a configurable instance given SDK logging/secret provider.
216+
// Returned value should be a pointer to a struct whose methods have signature
217+
//
218+
// FuncName(parameters map[string]string) interfaces.AppFunction
219+
type ConfigurableFactory func(lc logger.LoggingClient, sp bootstrapInterfaces.SecretProvider) interface{}

0 commit comments

Comments
 (0)