@@ -80,6 +80,7 @@ func NewService(serviceKey string, targetType interface{}, profileSuffixPlacehol
80
80
serviceKey : serviceKey ,
81
81
targetType : targetType ,
82
82
profileSuffixPlaceholder : profileSuffixPlaceholder ,
83
+ configurableFactorMap : make (map [string ]interfaces.ConfigurableFactory ),
83
84
}
84
85
}
85
86
@@ -104,6 +105,7 @@ type Service struct {
104
105
flags * flags.Default
105
106
configProcessor * config.Processor
106
107
requestTimeout time.Duration
108
+ configurableFactorMap map [string ]interfaces.ConfigurableFactory
107
109
}
108
110
109
111
type commandLineFlags struct {
@@ -281,7 +283,11 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
281
283
return nil , fmt .Errorf ("pipline TargetType of '%s' is not supported" , svc .config .GetWritableInfo ().Pipeline .TargetType )
282
284
}
283
285
284
- configurable := reflect .ValueOf (NewConfigurable (svc .lc , svc .SecretProvider ()))
286
+ configurables := []reflect.Value {reflect .ValueOf (NewConfigurable (svc .lc , svc .SecretProvider ()))}
287
+ for _ , factory := range svc .configurableFactorMap {
288
+ instance := factory (svc .lc , svc .SecretProvider ())
289
+ configurables = append (configurables , reflect .ValueOf (instance ))
290
+ }
285
291
pipelineConfig := svc .config .GetWritableInfo ().Pipeline
286
292
287
293
defaultExecutionOrder := strings .TrimSpace (pipelineConfig .ExecutionOrder )
@@ -294,7 +300,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
294
300
svc .lc .Debugf ("Default Function Pipeline Execution Order: [%s]" , pipelineConfig .ExecutionOrder )
295
301
functionNames := util .DeleteEmptyAndTrim (strings .FieldsFunc (defaultExecutionOrder , util .SplitComma ))
296
302
297
- transforms , err := svc .loadConfigurablePipelineTransforms (interfaces .DefaultPipelineId , functionNames , pipelineConfig .Functions , configurable )
303
+ transforms , err := svc .loadConfigurablePipelineTransforms (interfaces .DefaultPipelineId , functionNames , pipelineConfig .Functions , configurables )
298
304
if err != nil {
299
305
return nil , err
300
306
}
@@ -312,7 +318,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
312
318
313
319
functionNames := util .DeleteEmptyAndTrim (strings .FieldsFunc (perTopicPipeline .ExecutionOrder , util .SplitComma ))
314
320
315
- transforms , err := svc .loadConfigurablePipelineTransforms (perTopicPipeline .Id , functionNames , pipelineConfig .Functions , configurable )
321
+ transforms , err := svc .loadConfigurablePipelineTransforms (perTopicPipeline .Id , functionNames , pipelineConfig .Functions , configurables )
316
322
if err != nil {
317
323
return nil , err
318
324
}
@@ -334,7 +340,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
334
340
pipelineId string ,
335
341
executionOrder []string ,
336
342
functions map [string ]common.PipelineFunction ,
337
- configurable reflect.Value ) ([]interfaces.AppFunction , error ) {
343
+ configurables [] reflect.Value ) ([]interfaces.AppFunction , error ) {
338
344
var transforms []interfaces.AppFunction
339
345
340
346
// set pipeline function parameter names to lowercase to avoid casing issues from what is in source configuration
@@ -347,7 +353,7 @@ func (svc *Service) loadConfigurablePipelineTransforms(
347
353
return nil , fmt .Errorf ("function '%s' configuration not found in Pipeline.Functions section for pipeline '%s'" , functionName , pipelineId )
348
354
}
349
355
350
- functionValue , functionType , err := svc .findMatchingFunction ( configurable , functionName )
356
+ functionValue , functionType , err := svc .findMatchingFunctionWrapper ( configurables , functionName )
351
357
if err != nil {
352
358
return nil , fmt .Errorf ("%s for pipeline '%s'" , err .Error (), pipelineId )
353
359
}
@@ -766,3 +772,11 @@ func (svc *Service) PublishWithTopic(topic string, data any, contentType string)
766
772
767
773
return nil
768
774
}
775
+
776
+ func (svc * Service ) RegisterExternalConfigurable (name string , f interfaces.ConfigurableFactory ) {
777
+ svc .configurableFactorMap [name ] = f
778
+ }
779
+
780
+ func (svc * Service ) UnregisterExternalConfigurable (name string ) {
781
+ delete (svc .configurableFactorMap , name )
782
+ }
0 commit comments