diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index dbade706a780..7836c481fe1d 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -320,9 +320,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { spans, err := ca.setupSpansAndFrontier() if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error setting up spans and frontier: %v", err) ca.MoveToDraining(err) ca.cancel() return @@ -348,9 +346,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { scope, _ := opts.GetMetricScope() ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope) if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sli metrics: %v", err) ca.MoveToDraining(err) ca.cancel() return @@ -360,21 +356,16 @@ func (ca *changeAggregator) Start(ctx context.Context) { recorder := metricsRecorder(ca.sliMetrics) recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder) if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error wrapping metrics controller: %v", err) ca.MoveToDraining(err) ca.cancel() - return } ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle, ca.spec.User(), ca.spec.JobID, recorder) if err != nil { err = changefeedbase.MarkRetryableError(err) - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sink: %v", err) ca.MoveToDraining(err) ca.cancel() return @@ -406,9 +397,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV) ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts) if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error starting kv feed: %v", err) ca.MoveToDraining(err) ca.cancel() return @@ -418,9 +407,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater, ca.sink, ca.metrics, ca.sliMetrics, ca.knobs) if err != nil { - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error creating event consumer: %v", err) ca.MoveToDraining(err) ca.cancel() return @@ -635,7 +622,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e // Checkpointed spans are spans that were above the highwater mark, and we // must preserve that information in the frontier for future checkpointing. if err := checkpoint.Restore(ca.frontier, ca.spec.SpanLevelCheckpoint); err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to restore span-level checkpoint") } return spans, nil @@ -751,9 +738,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // NB: we do not invoke ca.cancel here -- just merely moving // to drain state so that the trailing metadata callback // has a chance to produce shutdown checkpoint. - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error while checking for node drain: %v", err) ca.MoveToDraining(err) break } @@ -775,9 +760,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Shut down the poller if it wasn't already. ca.cancel() - if log.V(2) { - log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err) - } + log.Dev.Warningf(ca.Ctx(), "moving to draining due to error from tick: %v", err) ca.MoveToDraining(err) break } @@ -1315,9 +1298,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope] sli, err := cf.metrics.getSLIMetrics(scope) if err != nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sli metrics: %v", err) cf.MoveToDraining(err) return } @@ -1327,9 +1308,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.spec.User(), cf.spec.JobID, sli) if err != nil { err = changefeedbase.MarkRetryableError(err) - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sink: %v", err) cf.MoveToDraining(err) return } @@ -1342,9 +1321,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.highWaterAtStart = cf.spec.Feed.StatementTime if cf.evalCtx.ChangefeedState == nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state") - } + log.Dev.Warningf(cf.Ctx(), "moving to draining due to missing changefeed state") cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state")) return } @@ -1356,9 +1333,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { if cf.spec.JobID != 0 { job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID) if err != nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining due to error loading claimed job: %v", err) cf.MoveToDraining(err) return } @@ -1403,15 +1378,16 @@ func (cf *changeFrontier) Start(ctx context.Context) { // Set up the resolved span frontier. cf.frontier, err = resolvedspan.NewCoordinatorFrontier(cf.spec.Feed.StatementTime, initialHighwater, cf.spec.TrackedSpans...) if err != nil { - log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err) + log.Dev.Warningf(cf.Ctx(), "moving to draining due to error setting up frontier: %v", err) cf.MoveToDraining(err) return } if err := checkpoint.Restore(cf.frontier, cf.spec.SpanLevelCheckpoint); err != nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier encountered error on checkpoint restore: %v", err) - } + log.Dev.Warningf(cf.Ctx(), + "moving to draining due to error restoring span-level checkpoint: %v", err) + cf.MoveToDraining(err) + return } if cf.knobs.AfterCoordinatorFrontierRestore != nil { @@ -1560,11 +1536,9 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } } - if log.V(2) { - log.Infof(cf.Ctx(), - "change frontier moving to draining after reaching resolved span boundary (%s): %v", - boundaryType, err) - } + log.Dev.Warningf(cf.Ctx(), + "moving to draining after reaching resolved span boundary (%s): %v", + boundaryType, err) cf.MoveToDraining(err) break } @@ -1572,9 +1546,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad row, meta := cf.input.Next() if meta != nil { if meta.Err != nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining after getting error from aggregator: %v", meta.Err) cf.MoveToDraining(nil /* err */) } if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil { @@ -1582,17 +1554,13 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad // that the aggregator exited due to node shutdown. Transition to // draining so that the remaining aggregators will shut down and // transmit their up-to-date frontier. - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining due to aggregator shutdown: %s", meta.Changefeed) cf.MoveToDraining(changefeedbase.ErrNodeDraining) } return nil, meta } if row == nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator") - } + log.Dev.Warningf(cf.Ctx(), "moving to draining after getting nil row from aggregator") cf.MoveToDraining(nil /* err */) break } @@ -1607,9 +1575,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } if err := cf.noteAggregatorProgress(row[0]); err != nil { - if log.V(2) { - log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err) - } + log.Dev.Warningf(cf.Ctx(), "moving to draining after error while processing aggregator progress: %v", err) cf.MoveToDraining(err) break } diff --git a/pkg/ccl/changefeedccl/checkpoint/checkpoint.go b/pkg/ccl/changefeedccl/checkpoint/checkpoint.go index 93fce498496f..93b5476d60c6 100644 --- a/pkg/ccl/changefeedccl/checkpoint/checkpoint.go +++ b/pkg/ccl/changefeedccl/checkpoint/checkpoint.go @@ -78,11 +78,11 @@ type SpanForwarder interface { func Restore(sf SpanForwarder, checkpoint *jobspb.TimestampSpansMap) error { for ts, spans := range checkpoint.All() { if ts.IsEmpty() { - return errors.New("checkpoint timestamp is empty") + return errors.AssertionFailedf("checkpoint timestamp is empty") } for _, sp := range spans { if _, err := sf.Forward(sp, ts); err != nil { - return err + return errors.Wrapf(err, "forwarding span %v to %v", sp, ts) } } }