Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 25 additions & 59 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {

spans, err := ca.setupSpansAndFrontier()
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error setting up spans and frontier: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error setting up spans and frontier: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -348,9 +346,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
scope, _ := opts.GetMetricScope()
ca.sliMetrics, err = ca.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sli metrics: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -360,21 +356,16 @@ func (ca *changeAggregator) Start(ctx context.Context) {
recorder := metricsRecorder(ca.sliMetrics)
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error wrapping metrics controller: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
ca.spec.User(), ca.spec.JobID, recorder)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting sink: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error getting sink: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -406,9 +397,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.FlowCtx.Cfg.Settings.SV)
ca.eventProducer, ca.kvFeedDoneCh, ca.errCh, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, feed, pool, limit, opts)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error starting kv feed: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error starting kv feed: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
Expand All @@ -418,9 +407,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ctx, ca.FlowCtx.Cfg, ca.spec, feed, ca.frontier, kvFeedHighWater,
ca.sink, ca.metrics, ca.sliMetrics, ca.knobs)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error creating event consumer: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error creating event consumer: %v", err)
ca.MoveToDraining(err)
ca.cancel()
return
Expand Down Expand Up @@ -635,7 +622,7 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
// Checkpointed spans are spans that were above the highwater mark, and we
// must preserve that information in the frontier for future checkpointing.
if err := checkpoint.Restore(ca.frontier, ca.spec.SpanLevelCheckpoint); err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to restore span-level checkpoint")
}

return spans, nil
Expand Down Expand Up @@ -751,9 +738,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
// NB: we do not invoke ca.cancel here -- just merely moving
// to drain state so that the trailing metadata callback
// has a chance to produce shutdown checkpoint.
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error while checking for node drain: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error while checking for node drain: %v", err)
ca.MoveToDraining(err)
break
}
Expand All @@ -775,9 +760,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
}
// Shut down the poller if it wasn't already.
ca.cancel()
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error from tick: %v", err)
}
log.Dev.Warningf(ca.Ctx(), "moving to draining due to error from tick: %v", err)
ca.MoveToDraining(err)
break
}
Expand Down Expand Up @@ -1315,9 +1298,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sli metrics: %v", err)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sli metrics: %v", err)
cf.MoveToDraining(err)
return
}
Expand All @@ -1327,9 +1308,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.spec.User(), cf.spec.JobID, sli)
if err != nil {
err = changefeedbase.MarkRetryableError(err)
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error getting sink: %v", err)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error getting sink: %v", err)
cf.MoveToDraining(err)
return
}
Expand All @@ -1342,9 +1321,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.evalCtx.ChangefeedState == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to missing changefeed state")
}
log.Dev.Warningf(cf.Ctx(), "moving to draining due to missing changefeed state")
cf.MoveToDraining(errors.AssertionFailedf("expected initialized local state"))
return
}
Expand All @@ -1356,9 +1333,7 @@ func (cf *changeFrontier) Start(ctx context.Context) {
if cf.spec.JobID != 0 {
job, err := cf.FlowCtx.Cfg.JobRegistry.LoadClaimedJob(ctx, cf.spec.JobID)
if err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error loading claimed job: %v", err)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error loading claimed job: %v", err)
cf.MoveToDraining(err)
return
}
Expand Down Expand Up @@ -1403,15 +1378,16 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// Set up the resolved span frontier.
cf.frontier, err = resolvedspan.NewCoordinatorFrontier(cf.spec.Feed.StatementTime, initialHighwater, cf.spec.TrackedSpans...)
if err != nil {
log.Infof(cf.Ctx(), "change frontier moving to draining due to error setting up frontier: %v", err)
log.Dev.Warningf(cf.Ctx(), "moving to draining due to error setting up frontier: %v", err)
cf.MoveToDraining(err)
return
}

if err := checkpoint.Restore(cf.frontier, cf.spec.SpanLevelCheckpoint); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier encountered error on checkpoint restore: %v", err)
}
log.Dev.Warningf(cf.Ctx(),
"moving to draining due to error restoring span-level checkpoint: %v", err)
cf.MoveToDraining(err)
return
}

if cf.knobs.AfterCoordinatorFrontierRestore != nil {
Expand Down Expand Up @@ -1560,39 +1536,31 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}
}

if log.V(2) {
log.Infof(cf.Ctx(),
"change frontier moving to draining after reaching resolved span boundary (%s): %v",
boundaryType, err)
}
log.Dev.Warningf(cf.Ctx(),
"moving to draining after reaching resolved span boundary (%s): %v",
boundaryType, err)
cf.MoveToDraining(err)
break
}

row, meta := cf.input.Next()
if meta != nil {
if meta.Err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting error from aggregator: %v", meta.Err)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting error from aggregator: %v", meta.Err)
cf.MoveToDraining(nil /* err */)
}
if meta.Changefeed != nil && meta.Changefeed.DrainInfo != nil {
// Seeing changefeed drain info metadata from the aggregator means
// that the aggregator exited due to node shutdown. Transition to
// draining so that the remaining aggregators will shut down and
// transmit their up-to-date frontier.
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining due to aggregator shutdown: %s", meta.Changefeed)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining due to aggregator shutdown: %s", meta.Changefeed)
cf.MoveToDraining(changefeedbase.ErrNodeDraining)
}
return nil, meta
}
if row == nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after getting nil row from aggregator")
}
log.Dev.Warningf(cf.Ctx(), "moving to draining after getting nil row from aggregator")
cf.MoveToDraining(nil /* err */)
break
}
Expand All @@ -1607,9 +1575,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
}

if err := cf.noteAggregatorProgress(row[0]); err != nil {
if log.V(2) {
log.Infof(cf.Ctx(), "change frontier moving to draining after error while processing aggregator progress: %v", err)
}
log.Dev.Warningf(cf.Ctx(), "moving to draining after error while processing aggregator progress: %v", err)
cf.MoveToDraining(err)
break
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ type SpanForwarder interface {
func Restore(sf SpanForwarder, checkpoint *jobspb.TimestampSpansMap) error {
for ts, spans := range checkpoint.All() {
if ts.IsEmpty() {
return errors.New("checkpoint timestamp is empty")
return errors.AssertionFailedf("checkpoint timestamp is empty")
}
for _, sp := range spans {
if _, err := sf.Forward(sp, ts); err != nil {
return err
return errors.Wrapf(err, "forwarding span %v to %v", sp, ts)
}
}
}
Expand Down