Skip to content

Commit 37b28ae

Browse files
authored
Fixed an issue where the StopSending message was incorrectly used to cancel a dispatch receiver (#23163)
Fixed an issue where the StopSending message was incorrectly used to cancel a dispatch receiver Approved by: @ouyuanning
1 parent 393ea81 commit 37b28ae

File tree

3 files changed

+426
-13
lines changed

3 files changed

+426
-13
lines changed

pkg/sql/colexec/dispatch/sendfunc.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525

2626
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
2727
"github.com/matrixorigin/matrixone/pkg/container/batch"
28+
"github.com/matrixorigin/matrixone/pkg/logutil"
2829
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
2930
"github.com/matrixorigin/matrixone/pkg/vm/process"
31+
"go.uber.org/zap"
3032
)
3133

3234
// receiverFailureMode defines how to handle receiver failures
@@ -397,6 +399,10 @@ func sendBatchToClientSession(
397399
if failureMode == FailureModeStrict {
398400
// Strict mode: receiver done indicates data loss
399401
// This happens when remote CN crashes or cancels
402+
logutil.Debug("sendBatchToClientSession: ReceiverDone=true in strict mode",
403+
zap.String("receiverID", receiverID),
404+
zap.Uint64("msgId", wcs.MsgId),
405+
zap.String("uid", wcs.Uid.String()))
400406
return true, moerr.NewInternalError(ctx, fmt.Sprintf(
401407
"remote receiver %s is already done, data loss may occur. "+
402408
"This usually indicates the remote CN has failed or been canceled",

pkg/sql/colexec/types2.go

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,48 @@ package colexec
1616

1717
import (
1818
"github.com/matrixorigin/matrixone/pkg/common/morpc"
19+
"github.com/matrixorigin/matrixone/pkg/logutil"
1920
"github.com/matrixorigin/matrixone/pkg/vm/process"
21+
"go.uber.org/zap"
2022
)
2123

2224
func (srv *Server) RecordDispatchPipeline(
2325
session morpc.ClientSession, streamID uint64, dispatchReceiver *process.WrapCs) {
2426

2527
key := generateRecordKey(session, streamID)
2628

29+
logutil.Debug("RecordDispatchPipeline called",
30+
zap.Uint64("streamID", streamID),
31+
zap.String("receiverUid", dispatchReceiver.Uid.String()))
32+
2733
srv.receivedRunningPipeline.Lock()
2834
defer srv.receivedRunningPipeline.Unlock()
2935

3036
// check if sender has sent a stop running message.
3137
if v, ok := srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key]; ok && v.alreadyDone {
32-
dispatchReceiver.Lock()
33-
dispatchReceiver.ReceiverDone = true
34-
dispatchReceiver.Unlock()
35-
return
38+
// Fix: Check if this is a stale record created by CancelPipelineSending
39+
// before RecordDispatchPipeline was called (race condition).
40+
// If receiver is nil, it means CancelPipelineSending created this record
41+
// when the pipeline wasn't registered yet. We should clean it up and
42+
// allow the normal registration to proceed.
43+
if v.receiver == nil || v.receiver.Uid != dispatchReceiver.Uid {
44+
// This is a stale record created by CancelPipelineSending before
45+
// RecordDispatchPipeline was called. Clean it up and proceed with
46+
// normal registration.
47+
logutil.Debug("RecordDispatchPipeline cleaning stale record",
48+
zap.Uint64("streamID", streamID))
49+
delete(srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline, key)
50+
} else {
51+
// This is a legitimate cancellation - the same receiver was already registered
52+
// and then cancelled. Set ReceiverDone to true.
53+
logutil.Debug("RecordDispatchPipeline setting ReceiverDone=true (legitimate cancellation)",
54+
zap.Uint64("streamID", streamID),
55+
zap.String("existingReceiverUid", v.receiver.Uid.String()))
56+
dispatchReceiver.Lock()
57+
dispatchReceiver.ReceiverDone = true
58+
dispatchReceiver.Unlock()
59+
return
60+
}
3661
}
3762

3863
value := runningPipelineInfo{
@@ -43,6 +68,9 @@ func (srv *Server) RecordDispatchPipeline(
4368
}
4469

4570
srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key] = value
71+
logutil.Debug("RecordDispatchPipeline registered successfully",
72+
zap.Uint64("streamID", streamID),
73+
zap.String("receiverUid", dispatchReceiver.Uid.String()))
4674
}
4775

4876
func (srv *Server) RecordBuiltPipeline(
@@ -74,29 +102,35 @@ func (srv *Server) CancelPipelineSending(
74102

75103
key := generateRecordKey(session, streamID)
76104

105+
logutil.Debug("CancelPipelineSending called",
106+
zap.Uint64("streamID", streamID))
107+
77108
srv.receivedRunningPipeline.Lock()
78109
defer srv.receivedRunningPipeline.Unlock()
79110

80111
if v, ok := srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key]; ok {
81-
v.cancelPipeline()
82-
} else {
83-
srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key] = generateCanceledRecord()
112+
logutil.Debug("CancelPipelineSending found existing record",
113+
zap.Uint64("streamID", streamID),
114+
zap.Bool("alreadyDone", v.alreadyDone),
115+
zap.Bool("hasReceiver", v.receiver != nil),
116+
zap.Bool("isDispatch", v.isDispatch))
117+
118+
if !v.isDispatch {
119+
// Only cancel non-dispatch pipelines (query execution pipelines)
120+
logutil.Debug("CancelPipelineSending canceling non-dispatch pipeline",
121+
zap.Uint64("streamID", streamID))
122+
v.cancelPipeline()
123+
}
84124
}
85125
}
86126

87127
func (srv *Server) RemoveRelatedPipeline(session morpc.ClientSession, streamID uint64) {
88128
key := generateRecordKey(session, streamID)
89-
90129
srv.receivedRunningPipeline.Lock()
91130
defer srv.receivedRunningPipeline.Unlock()
92-
93131
delete(srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline, key)
94132
}
95133

96-
func generateCanceledRecord() runningPipelineInfo {
97-
return runningPipelineInfo{alreadyDone: true}
98-
}
99-
100134
func generateRecordKey(session morpc.ClientSession, streamID uint64) rpcClientItem {
101135
return rpcClientItem{tcp: session, id: streamID}
102136
}

0 commit comments

Comments
 (0)