Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/sql/colexec/dispatch/sendfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (

"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
)

// receiverFailureMode defines how to handle receiver failures
Expand Down Expand Up @@ -397,6 +399,10 @@ func sendBatchToClientSession(
if failureMode == FailureModeStrict {
// Strict mode: receiver done indicates data loss
// This happens when remote CN crashes or cancels
logutil.Debug("sendBatchToClientSession: ReceiverDone=true in strict mode",
zap.String("receiverID", receiverID),
zap.Uint64("msgId", wcs.MsgId),
zap.String("uid", wcs.Uid.String()))
return true, moerr.NewInternalError(ctx, fmt.Sprintf(
"remote receiver %s is already done, data loss may occur. "+
"This usually indicates the remote CN has failed or been canceled",
Expand Down
60 changes: 47 additions & 13 deletions pkg/sql/colexec/types2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,48 @@ package colexec

import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
)

func (srv *Server) RecordDispatchPipeline(
session morpc.ClientSession, streamID uint64, dispatchReceiver *process.WrapCs) {

key := generateRecordKey(session, streamID)

logutil.Debug("RecordDispatchPipeline called",
zap.Uint64("streamID", streamID),
zap.String("receiverUid", dispatchReceiver.Uid.String()))

srv.receivedRunningPipeline.Lock()
defer srv.receivedRunningPipeline.Unlock()

// check if sender has sent a stop running message.
if v, ok := srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key]; ok && v.alreadyDone {
dispatchReceiver.Lock()
dispatchReceiver.ReceiverDone = true
dispatchReceiver.Unlock()
return
// Fix: Check if this is a stale record created by CancelPipelineSending
// before RecordDispatchPipeline was called (race condition).
// If receiver is nil, it means CancelPipelineSending created this record
// when the pipeline wasn't registered yet. We should clean it up and
// allow the normal registration to proceed.
if v.receiver == nil || v.receiver.Uid != dispatchReceiver.Uid {
// This is a stale record created by CancelPipelineSending before
// RecordDispatchPipeline was called. Clean it up and proceed with
// normal registration.
logutil.Debug("RecordDispatchPipeline cleaning stale record",
zap.Uint64("streamID", streamID))
delete(srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline, key)
} else {
// This is a legitimate cancellation - the same receiver was already registered
// and then cancelled. Set ReceiverDone to true.
logutil.Debug("RecordDispatchPipeline setting ReceiverDone=true (legitimate cancellation)",
zap.Uint64("streamID", streamID),
zap.String("existingReceiverUid", v.receiver.Uid.String()))
dispatchReceiver.Lock()
dispatchReceiver.ReceiverDone = true
dispatchReceiver.Unlock()
return
}
}

value := runningPipelineInfo{
Expand All @@ -43,6 +68,9 @@ func (srv *Server) RecordDispatchPipeline(
}

srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key] = value
logutil.Debug("RecordDispatchPipeline registered successfully",
zap.Uint64("streamID", streamID),
zap.String("receiverUid", dispatchReceiver.Uid.String()))
}

func (srv *Server) RecordBuiltPipeline(
Expand Down Expand Up @@ -74,29 +102,35 @@ func (srv *Server) CancelPipelineSending(

key := generateRecordKey(session, streamID)

logutil.Debug("CancelPipelineSending called",
zap.Uint64("streamID", streamID))

srv.receivedRunningPipeline.Lock()
defer srv.receivedRunningPipeline.Unlock()

if v, ok := srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key]; ok {
v.cancelPipeline()
} else {
srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline[key] = generateCanceledRecord()
logutil.Debug("CancelPipelineSending found existing record",
zap.Uint64("streamID", streamID),
zap.Bool("alreadyDone", v.alreadyDone),
zap.Bool("hasReceiver", v.receiver != nil),
zap.Bool("isDispatch", v.isDispatch))

if !v.isDispatch {
// Only cancel non-dispatch pipelines (query execution pipelines)
logutil.Debug("CancelPipelineSending canceling non-dispatch pipeline",
zap.Uint64("streamID", streamID))
v.cancelPipeline()
}
}
}

func (srv *Server) RemoveRelatedPipeline(session morpc.ClientSession, streamID uint64) {
key := generateRecordKey(session, streamID)

srv.receivedRunningPipeline.Lock()
defer srv.receivedRunningPipeline.Unlock()

delete(srv.receivedRunningPipeline.fromRpcClientToRelatedPipeline, key)
}

func generateCanceledRecord() runningPipelineInfo {
return runningPipelineInfo{alreadyDone: true}
}

func generateRecordKey(session morpc.ClientSession, streamID uint64) rpcClientItem {
return rpcClientItem{tcp: session, id: streamID}
}
Loading
Loading