Skip to content

Commit 2a716a1

Browse files
author
jiasheng.yu
committed
Fully replace pipelines on Writable.Pipeline update
Previously only updated topics/transforms per pipeline ID. Now perform full replacement: remove pipelines not in config (unregister metrics), create/register new pipelines, and build full topics with base prefix. Signed-off-by: jiasheng.yu <[email protected]>
1 parent 5dd841c commit 2a716a1

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

internal/app/configupdates.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,15 @@ func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() {
151151
sdk.runtime.TargetType = sdk.targetType
152152

153153
// Update the pipelines with their new transforms
154-
for _, pipeline := range pipelines {
155-
// TODO: Look at better way to apply pipeline updates
156-
sdk.runtime.SetFunctionsPipelineTransforms(pipeline.Id, pipeline.Transforms)
157-
sdk.runtime.SetFunctionsPipelineTopics(pipeline.Id, pipeline.Topics)
154+
for i, pipline := range pipelines {
155+
var fullTopics []string
156+
for _, topic := range pipline.Topics {
157+
fullTopics = append(fullTopics, coreCommon.BuildTopic(sdk.config.MessageBus.GetBaseTopicPrefix(), topic))
158+
}
159+
pipline.Topics = fullTopics
160+
pipelines[i] = pipline
158161
}
162+
sdk.runtime.UpdateFunctionsPipelines(pipelines)
159163

160164
sdk.LoggingClient().Info("Configurable Pipeline successfully reloaded from new configuration")
161165
}

internal/runtime/runtime.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,28 @@ func (fpr *FunctionsPipelineRuntime) RemoveAllFunctionPipelines() {
162162
fpr.isBusyCopying.Unlock()
163163
}
164164

165+
func (fpr *FunctionsPipelineRuntime) UpdateFunctionsPipelines(piplines map[string]interfaces.FunctionPipeline) {
166+
fpr.isBusyCopying.Lock()
167+
defer fpr.isBusyCopying.Unlock()
168+
169+
metricManager := bootstrapContainer.MetricsManagerFrom(fpr.dic.Get)
170+
for _, curPipline := range fpr.pipelines {
171+
if _, ok := piplines[curPipline.Id]; !ok {
172+
fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, curPipline.Id)
173+
fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, curPipline.Id)
174+
fpr.unregisterPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, curPipline.Id)
175+
delete(fpr.pipelines, curPipline.Id)
176+
}
177+
}
178+
for _, toAdd := range piplines {
179+
pipeline := NewFunctionPipeline(toAdd.Id, toAdd.Topics, toAdd.Transforms)
180+
fpr.pipelines[toAdd.Id] = &pipeline
181+
fpr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed)
182+
fpr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime)
183+
fpr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors)
184+
}
185+
}
186+
165187
// AddFunctionsPipeline is thread safe to set transforms
166188
func (fpr *FunctionsPipelineRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error {
167189
_, exists := fpr.pipelines[id]

0 commit comments

Comments
 (0)