Skip to content

Commit c8c8164

Browse files
authored
shutdownFlow: ignore cancel failing due to workflow missing (#3471)
this can happen when dropping mirror which completed or failed over 30 days ago
1 parent 11404aa commit c8c8164

File tree

4 files changed

+56
-22
lines changed

4 files changed

+56
-22
lines changed

flow/activities/flowable.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,12 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF
644644
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
645645
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName)
646646
if err != nil {
647+
var notFound *exceptions.NotFoundError
648+
if errors.As(err, &notFound) {
649+
logger := internal.LoggerFromCtx(ctx)
650+
logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName))
651+
return nil
652+
}
647653
return a.Alerter.LogFlowError(ctx, req.FlowJobName,
648654
exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)),
649655
)
@@ -668,6 +674,12 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.
668674
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
669675
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName)
670676
if err != nil {
677+
var notFound *exceptions.NotFoundError
678+
if errors.As(err, &notFound) {
679+
logger := internal.LoggerFromCtx(ctx)
680+
logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName))
681+
return nil
682+
}
671683
return a.Alerter.LogFlowError(ctx, req.FlowJobName,
672684
exceptions.NewDropFlowError(fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)),
673685
)

flow/cmd/handler.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"strings"
89
"time"
910

1011
"github.com/google/uuid"
@@ -326,8 +327,8 @@ func (h *FlowRequestHandler) FlowStateChange(
326327
"",
327328
req.FlowConfigUpdate.GetCdcFlowConfigUpdate(),
328329
); err != nil {
329-
slog.ErrorContext(ctx, "unable to signal workflow", logs, slog.Any("error", err))
330-
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to signal workflow: %w", err))
330+
slog.ErrorContext(ctx, "unable to signal workflow update", logs, slog.Any("error", err))
331+
return nil, exceptions.NewInternalApiError(fmt.Errorf("unable to signal workflow update: %w", err))
331332
}
332333
}
333334

@@ -405,10 +406,10 @@ func (h *FlowRequestHandler) handleCancelWorkflow(ctx context.Context, workflowI
405406

406407
select {
407408
case <-errLatch.Chan():
408-
if err := errLatch.Wait(); err != nil {
409-
slog.ErrorContext(ctx, fmt.Sprintf("unable to cancel PeerFlow workflow: %s. Attempting to terminate.", err.Error()))
410-
terminationReason := fmt.Sprintf("workflow %s did not cancel in time.", workflowID)
411-
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, terminationReason); err != nil {
409+
if err := errLatch.Wait(); err != nil &&
410+
err.Error() != "workflow execution already completed" && !strings.HasPrefix(err.Error(), "workflow not found for ID:") {
411+
slog.ErrorContext(ctx, "unable to cancel PeerFlow workflow. Attempting to terminate.", slog.Any("error", err))
412+
if err := h.temporalClient.TerminateWorkflow(ctx, workflowID, runID, "workflow did not cancel in time."); err != nil {
412413
return fmt.Errorf("unable to terminate PeerFlow workflow: %w", err)
413414
}
414415
}

flow/e2e/api/api_test.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,14 @@ func (s Suite) checkCatalogTableMapping(
120120
expectedSourceTableNames []string,
121121
) (bool, error) {
122122
var configBytes sql.RawBytes
123-
err := conn.QueryRow(ctx,
124-
"SELECT config_proto FROM flows WHERE name = $1", flowName).Scan(&configBytes)
125-
if err != nil {
123+
if err := conn.QueryRow(ctx,
124+
"SELECT config_proto FROM flows WHERE name = $1", flowName,
125+
).Scan(&configBytes); err != nil {
126126
return false, err
127127
}
128128

129129
var config protos.FlowConnectionConfigs
130-
err = proto.Unmarshal(configBytes, &config)
131-
if err != nil {
130+
if err := proto.Unmarshal(configBytes, &config); err != nil {
132131
return false, err
133132
}
134133

@@ -1302,3 +1301,22 @@ func (s Suite) TestTableAdditionWithoutInitialLoad() {
13021301
env.Cancel(s.t.Context())
13031302
e2e.RequireEnvCanceled(s.t, env)
13041303
}
1304+
1305+
func (s Suite) TestDropMissing() {
1306+
conn := s.pg.PostgresConnector.Conn()
1307+
peer := s.source.GeneratePeer(s.t)
1308+
var peerId int32
1309+
require.NoError(s.t, conn.QueryRow(s.t.Context(), "select id from peers where name = $1", peer.Name).Scan(&peerId))
1310+
1311+
_, err := conn.Exec(s.t.Context(),
1312+
"insert into flows (name,source_peer,destination_peer,workflow_id,status) values ('test-drop-missing',$1,$1,'drop-missing-wf-id',$2)",
1313+
peerId, protos.FlowStatus_STATUS_COMPLETED,
1314+
)
1315+
require.NoError(s.t, err)
1316+
1317+
_, err = s.FlowStateChange(s.t.Context(), &protos.FlowStateChangeRequest{
1318+
FlowJobName: "test-drop-missing",
1319+
RequestedFlowState: protos.FlowStatus_STATUS_TERMINATING,
1320+
})
1321+
require.NoError(s.t, err)
1322+
}

flow/otel_metrics/observables.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,23 +103,26 @@ func (a *Float64SyncGauge) Record(ctx context.Context, value float64, options ..
103103
}
104104

105105
func buildContextualAttributes(ctx context.Context) metric.MeasurementOption {
106-
attributes := make([]attribute.KeyValue, 0)
106+
attributes := make([]attribute.KeyValue, 0, 12)
107107
flowMetadata := internal.GetFlowMetadata(ctx)
108108
if flowMetadata != nil {
109+
attributes = append(attributes, attribute.String(FlowNameKey, flowMetadata.FlowName))
110+
if flowMetadata.Source != nil {
111+
attributes = append(attributes,
112+
attribute.Stringer(SourcePeerType, flowMetadata.Source.Type),
113+
attribute.String(SourcePeerName, flowMetadata.Source.Name))
114+
}
115+
if flowMetadata.Destination != nil {
116+
attributes = append(attributes,
117+
attribute.Stringer(DestinationPeerType, flowMetadata.Destination.Type),
118+
attribute.String(DestinationPeerName, flowMetadata.Destination.Name))
119+
}
109120
attributes = append(attributes,
110-
attribute.String(FlowNameKey, flowMetadata.FlowName),
111-
attribute.Stringer(SourcePeerType, flowMetadata.Source.Type),
112-
attribute.Stringer(DestinationPeerType, flowMetadata.Destination.Type),
113-
attribute.String(SourcePeerName, flowMetadata.Source.Name),
114-
attribute.String(DestinationPeerName, flowMetadata.Destination.Name),
115121
attribute.Stringer(FlowStatusKey, flowMetadata.Status),
116-
attribute.Bool(IsFlowResyncKey, flowMetadata.IsResync),
117-
)
122+
attribute.Bool(IsFlowResyncKey, flowMetadata.IsResync))
118123
}
119124
additionalMetadata := internal.GetAdditionalMetadata(ctx)
120-
attributes = append(attributes,
121-
attribute.Stringer(FlowOperationKey, additionalMetadata.Operation),
122-
)
125+
attributes = append(attributes, attribute.Stringer(FlowOperationKey, additionalMetadata.Operation))
123126

124127
if activity.IsActivity(ctx) {
125128
activityInfo := activity.GetInfo(ctx)

0 commit comments

Comments
 (0)