Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2689,7 +2689,7 @@ func executeStmtWithWorkspace(ses FeSession,
defer ses.ExitFPrint(FPExecStmtWithWorkspaceBeforeStart)
//!!!NOTE!!!: statement management
//2. start statement on workspace
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(execCtx.stmt)
//3. end statement on workspace
// defer Start/End Statement management, called after finishTxnFunc()
defer func() {
Expand Down
9 changes: 5 additions & 4 deletions pkg/frontend/test/txn_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 17 additions & 13 deletions pkg/frontend/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ func newTestWorkspace() *testWorkspace {
return &testWorkspace{}
}

func (txn *testWorkspace) StartStatement() {
func (txn *testWorkspace) StartStatement(stmt tree.Statement) {
if txn.start {
panic("BUG: StartStatement called twice")
var sql string
if stmt != nil {
sql = stmt.String()
}
panic(fmt.Sprintf("BUG: StartStatement called twice, sql: %s", sql))
}
txn.start = true
txn.incr = false
Expand Down Expand Up @@ -177,7 +181,7 @@ func TestWorkspace(t *testing.T) {
convey.So(
func() {
wsp := newTestWorkspace()
wsp.StartStatement()
wsp.StartStatement(nil)
wsp.EndStatement()
},
convey.ShouldNotPanic,
Expand All @@ -196,8 +200,8 @@ func TestWorkspace(t *testing.T) {
convey.So(
func() {
wsp := newTestWorkspace()
wsp.StartStatement()
wsp.StartStatement()
wsp.StartStatement(nil)
wsp.StartStatement(nil)
},
convey.ShouldPanic,
)
Expand All @@ -217,7 +221,7 @@ func TestWorkspace(t *testing.T) {
convey.So(
func() {
wsp := newTestWorkspace()
wsp.StartStatement()
wsp.StartStatement(nil)
err := wsp.IncrStatementID(context.TODO(), false)
convey.So(err, convey.ShouldBeNil)
//incr twice
Expand All @@ -231,7 +235,7 @@ func TestWorkspace(t *testing.T) {
convey.So(
func() {
wsp := newTestWorkspace()
wsp.StartStatement()
wsp.StartStatement(nil)
err := wsp.RollbackLastStatement(context.TODO())
convey.So(err, convey.ShouldBeNil)
},
Expand All @@ -242,7 +246,7 @@ func TestWorkspace(t *testing.T) {
convey.So(
func() {
wsp := newTestWorkspace()
wsp.StartStatement()
wsp.StartStatement(nil)
err := wsp.IncrStatementID(context.TODO(), false)
convey.So(err, convey.ShouldBeNil)
err = wsp.RollbackLastStatement(context.TODO())
Expand Down Expand Up @@ -484,7 +488,7 @@ func Test_rollbackStatement(t *testing.T) {
NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse)
convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue)
//called incrStatement
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
err = txnOp.GetWorkspace().IncrStatementID(ctx, false)
convey.So(err, convey.ShouldBeNil)
ec.stmt = &tree.Insert{}
Expand Down Expand Up @@ -514,7 +518,7 @@ func Test_rollbackStatement(t *testing.T) {
NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse)
convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue)
//called incrStatement
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
err = txnOp.GetWorkspace().IncrStatementID(ctx, false)
convey.So(err, convey.ShouldBeNil)
ec.stmt = &tree.Insert{}
Expand Down Expand Up @@ -671,7 +675,7 @@ func Test_rollbackStatement5(t *testing.T) {
NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse)
convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue)
//called incrStatement
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
err = txnOp.GetWorkspace().IncrStatementID(ctx, false)
convey.So(err, convey.ShouldBeNil)
ec.stmt = &tree.Insert{}
Expand Down Expand Up @@ -710,7 +714,7 @@ func Test_rollbackStatement6(t *testing.T) {
NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse)
convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue)
//called incrStatement
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
err = txnOp.GetWorkspace().IncrStatementID(ctx, false)
convey.So(err, convey.ShouldBeNil)
ec.stmt = &tree.Insert{}
Expand Down Expand Up @@ -745,7 +749,7 @@ func Test_rollbackStatement6(t *testing.T) {
NeedToBeCommittedInActiveTransaction(&tree.Insert{}), convey.ShouldBeFalse)
convey.So(txnOp != nil && !ses.IsDerivedStmt(), convey.ShouldBeTrue)
//called incrStatement
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
err = txnOp.GetWorkspace().IncrStatementID(ctx, false)
convey.So(err, convey.ShouldBeNil)
ec.stmt = &tree.Insert{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func TestGetExprValue(t *testing.T) {
ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ws.EXPECT().IncrSQLCount().AnyTimes()
ws.EXPECT().GetSQLCount().AnyTimes()
ws.EXPECT().StartStatement().AnyTimes()
ws.EXPECT().StartStatement(nil).AnyTimes()
ws.EXPECT().EndStatement().AnyTimes()
ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes()
ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes()
Expand Down Expand Up @@ -729,7 +729,7 @@ func TestGetExprValue(t *testing.T) {

ws := mock_frontend.NewMockWorkspace(ctrl)
ws.EXPECT().IncrStatementID(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ws.EXPECT().StartStatement().AnyTimes()
ws.EXPECT().StartStatement(nil).AnyTimes()
ws.EXPECT().EndStatement().AnyTimes()
ws.EXPECT().GetSnapshotWriteOffset().Return(0).AnyTimes()
ws.EXPECT().UpdateSnapshotWriteOffset().AnyTimes()
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ func (w *Ws) Adjust(_ uint64) error {
return nil
}

func (w *Ws) StartStatement() {}
func (w *Ws) EndStatement() {}
func (w *Ws) IncrSQLCount() {}
func (w *Ws) GetSQLCount() uint64 { return 0 }
func (w *Ws) StartStatement(statement tree.Statement) {}
func (w *Ws) EndStatement() {}
func (w *Ws) IncrSQLCount() {}
func (w *Ws) GetSQLCount() uint64 { return 0 }

func (w *Ws) CloneSnapshotWS() client.Workspace {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/compile/sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *sqlExecutor) NewTxnOperator(ctx context.Context) client.TxnOperator {
return nil
}
}
opts.Txn().GetWorkspace().StartStatement()
opts.Txn().GetWorkspace().StartStatement(nil)
opts.Txn().GetWorkspace().IncrStatementID(ctx, false)
return opts.Txn()
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (exec *txnExecutor) Exec(
// maybe we should fix it.
txnOp := exec.opts.Txn()
if txnOp != nil && !exec.opts.DisableIncrStatement() {
txnOp.GetWorkspace().StartStatement()
txnOp.GetWorkspace().StartStatement(nil)
defer func() {
txnOp.GetWorkspace().EndStatement()
}()
Expand Down
3 changes: 2 additions & 1 deletion pkg/txn/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
)

Expand Down Expand Up @@ -250,7 +251,7 @@ type Workspace interface {
Readonly() bool

// StartStatement tag a statement is running
StartStatement()
StartStatement(tree.Statement)
// EndStatement tag end a statement is completed
EndStatement()

Expand Down
17 changes: 17 additions & 0 deletions pkg/vm/engine/disttae/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ package disttae

import (
"bytes"
"context"
"sync"
"testing"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -62,3 +66,16 @@ func Test_GetUncommittedS3Tombstone(t *testing.T) {
return true
})
}

func TestTransaction_StartStatement(t *testing.T) {
var txn Transaction
var fn func()
ctx := context.Background()
txn.op, fn = client.NewTestTxnOperator(ctx)
defer fn()
txn.StartStatement(nil)
stmts, err := parsers.Parse(ctx, dialect.MYSQL, "show databases", 0)
require.NoError(t, err)
require.Equal(t, len(stmts), 1)
txn.StartStatement(stmts[0])
}
20 changes: 18 additions & 2 deletions pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/hex"
"flag"
"fmt"
"math"
"strconv"
Expand All @@ -44,6 +45,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
qclient "github.com/matrixorigin/matrixone/pkg/queryservice/client"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/trace"
"github.com/matrixorigin/matrixone/pkg/udf"
Expand Down Expand Up @@ -525,9 +528,22 @@ func (txn *Transaction) PPString() string {
stringifySlice(txn.transfer.timestamps, func(a any) string { t := a.(timestamp.Timestamp); return t.DebugString() }))
}

func (txn *Transaction) StartStatement() {
func (txn *Transaction) StartStatement(stmt tree.Statement) {
if txn.startStatementCalled {
logutil.Fatal("BUG: StartStatement called twice", zap.String("txn", hex.EncodeToString(txn.op.Txn().ID)))
var sql string
if stmt != nil {
fmtCtx := tree.NewFmtCtx(dialect.MYSQL, tree.WithQuoteString(true))
stmt.Format(fmtCtx)
sql = fmtCtx.String()
}
log := logutil.Fatal
if flag.Lookup("test.v") != nil {
log = logutil.Error
}
log("BUG: StartStatement called twice",
zap.String("txn", hex.EncodeToString(txn.op.Txn().ID)),
zap.String("SQL", sql),
)
}
txn.startStatementCalled = true
txn.incrStatementCalled = false
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/test/disttae_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestSystemDB1(t *testing.T) {
txnop = p.StartCNTxn()
dbs, err = p.D.Engine.Databases(p.Ctx, txnop)
require.NoError(t, err)
txnop.GetWorkspace().StartStatement()
txnop.GetWorkspace().StartStatement(nil)
require.Equal(t, 2+1, len(dbs))

txn, err := p.T.StartTxn()
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestColumnsTransfer(t *testing.T) {
require.NoError(t, txnop.Commit(p.Ctx))

txnop = p.StartCNTxn()
txnop.GetWorkspace().StartStatement()
txnop.GetWorkspace().StartStatement(nil)
p.DeleteTableInDB(txnop, "db", schema2.Name)

txn, _ := tae.StartTxn(nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/test/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func WriteToRelation(
bat *batch.Batch,
isDelete, toEndStatement bool,
) (err error) {
txn.GetWorkspace().StartStatement()
txn.GetWorkspace().StartStatement(nil)
if isDelete {
err = relation.Delete(ctx, bat, catalog2.Row_ID)
} else {
Expand Down
12 changes: 6 additions & 6 deletions pkg/vm/engine/test/workspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func Test_BasicRollbackStatement(t *testing.T) {

require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true))

txn.GetWorkspace().StartStatement()
txn.GetWorkspace().StartStatement(nil)
require.NoError(t, relation.Write(ctx, bat2))
require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx))
require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false))
Expand Down Expand Up @@ -921,7 +921,7 @@ func Test_BasicRollbackStatementS3(t *testing.T) {

require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat1, false, true))

txn.GetWorkspace().StartStatement()
txn.GetWorkspace().StartStatement(nil)
require.NoError(t, relation.Write(ctx, bat2))
require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx))
require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false))
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func Test_RollbackDeleteAndDrop(t *testing.T) {
txnop = p.StartCNTxn()
exec := v.(executor.SQLExecutor)
execopts := executor.Options{}.WithTxn(txnop).WithDisableIncrStatement()
txnop.GetWorkspace().StartStatement()
txnop.GetWorkspace().StartStatement(nil)
txnop.GetWorkspace().IncrStatementID(p.Ctx, false)
dropTable := func() {
_, err := exec.Exec(p.Ctx, "delete from db.test3 where mock_1 = 0", execopts)
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func Test_MultiTxnRollbackStatement(t *testing.T) {
{
require.NoError(t, testutil.WriteToRelation(ctx, txn, relation, bat2, false, true))

txn.GetWorkspace().StartStatement()
txn.GetWorkspace().StartStatement(nil)
require.NoError(t, relation.Write(ctx, bat2))
require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx))
require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false))
Expand Down Expand Up @@ -1357,7 +1357,7 @@ func Test_MultiTxnRollbackStatementS3(t *testing.T) {

// txn2 delete 5-15
{
txn.GetWorkspace().StartStatement()
txn.GetWorkspace().StartStatement(nil)
require.NoError(t, relation.Write(ctx, bat2))
require.NoError(t, txn.GetWorkspace().RollbackLastStatement(ctx))
require.NoError(t, txn.GetWorkspace().IncrStatementID(ctx, false))
Expand Down Expand Up @@ -1662,7 +1662,7 @@ func Test_CNTransferTombstoneObjects(t *testing.T) {
_, _, cnTxnOp, err = p.D.GetTable(ctx, databaseName, tableName)
require.NoError(t, err)

cnTxnOp.GetWorkspace().StartStatement()
cnTxnOp.GetWorkspace().StartStatement(nil)
err = cnTxnOp.GetWorkspace().IncrStatementID(ctx, false)
require.NoError(t, err)
}
Expand Down
Loading