Skip to content

Commit f33f7ae

Browse files
authored
to 3.0: apply cow to indexes if the primary key and indexed columns are not modified when ALTER TABLE (#22385)
During ALTER TABLE, apply a Copy-On-Write to indexes if the primary key and indexed columns are not modified. Approved by: @daviszhen, @zhangxu19830126, @heni02, @aunjgr, @XuPeng-SH, @ouyuanning
1 parent 6bb57b1 commit f33f7ae

24 files changed

+2252
-1024
lines changed

pkg/defines/type.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ type SourceScanResKey struct{}
230230

231231
type IgnoreForeignKey struct{}
232232

233-
type AlterCopyDedupOpt struct{}
233+
type AlterCopyOpt struct{}
234234

235235
// Determine if now is a bg sql.
236236
type BgKey struct{}

pkg/pb/plan/plan.pb.go

Lines changed: 1628 additions & 855 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/colexec/multi_update/insert.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (update *MultiUpdate) insert_secondary_index_table(
109109
attrs = append(attrs, col.Name)
110110
}
111111
}
112+
112113
ctr.insertBuf[tableIndex] = batch.New(attrs)
113114
for insertIdx, inputIdx := range updateCtx.InsertCols {
114115
ctr.insertBuf[tableIndex].Vecs[insertIdx] = vector.NewVec(*inputBatch.Vecs[inputIdx].GetType())
@@ -131,6 +132,7 @@ func (update *MultiUpdate) insert_table(
131132
inputBatch *batch.Batch,
132133
insertBatch *batch.Batch,
133134
) (err error) {
135+
134136
insertBatch.CleanOnlyData()
135137
for insertIdx, inputIdx := range updateCtx.InsertCols {
136138
err = insertBatch.Vecs[insertIdx].UnionBatch(inputBatch.Vecs[inputIdx], 0, inputBatch.Vecs[inputIdx].Length(), nil, proc.GetMPool())

pkg/sql/colexec/table_clone/table_clone.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21+
2122
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2223
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2324
"github.com/matrixorigin/matrixone/pkg/common/reuse"
@@ -137,9 +138,17 @@ func (tc *TableClone) Prepare(proc *process.Process) error {
137138
// the src table is a publication
138139
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, uint32(tc.Ctx.SrcObjDef.PubInfo.TenantId))
139140

140-
} else if tc.Ctx.ScanSnapshot != nil && tc.Ctx.ScanSnapshot.Tenant != nil {
141-
// the source data may be coming from a different account.
142-
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, tc.Ctx.ScanSnapshot.Tenant.TenantID)
141+
} else if tc.Ctx.ScanSnapshot != nil {
142+
if tc.Ctx.ScanSnapshot.Tenant != nil {
143+
// the source data may be coming from a different account.
144+
tc.Ctx.SrcCtx = defines.AttachAccountId(tc.Ctx.SrcCtx, tc.Ctx.ScanSnapshot.Tenant.TenantID)
145+
}
146+
147+
// without setting this scan ts, we could read the newly created table !!!
148+
if tc.Ctx.ScanSnapshot.TS != nil {
149+
txnOp = proc.GetTxnOperator().CloneSnapshotOp(*tc.Ctx.ScanSnapshot.TS)
150+
proc.SetCloneTxnOperator(txnOp)
151+
}
143152
}
144153

145154
txnOp = proc.GetCloneTxnOperator()

pkg/sql/compile/alter.go

Lines changed: 116 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ package compile
1717
import (
1818
"context"
1919
"fmt"
20+
"github.com/matrixorigin/matrixone/pkg/common/reuse"
21+
"github.com/matrixorigin/matrixone/pkg/sql/colexec/table_clone"
22+
"slices"
2023

2124
"github.com/matrixorigin/matrixone/pkg/catalog"
2225
"github.com/matrixorigin/matrixone/pkg/common/moerr"
@@ -141,8 +144,8 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
141144
return err
142145
}
143146
opt := executor.StatementOption{}
144-
if qry.DedupOpt.SkipPkDedup || len(qry.DedupOpt.SkipUniqueIdxDedup) > 0 {
145-
opt = opt.WithAlterCopyDedupOpt(qry.DedupOpt)
147+
if qry.Options.SkipPkDedup || len(qry.Options.SkipUniqueIdxDedup) > 0 {
148+
opt = opt.WithAlterCopyOpt(qry.Options)
146149
}
147150
// 4. copy the original table data to the temporary replica table
148151
err = c.runSqlWithOptions(qry.InsertTmpDataSql, opt)
@@ -156,7 +159,25 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
156159
return err
157160
}
158161

159-
// 5. drop original table
162+
//5. obtain relation for new tables
163+
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
164+
if err != nil {
165+
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
166+
zap.String("databaseName", dbName),
167+
zap.String("origin tableName", qry.GetTableDef().Name),
168+
zap.String("copy table name", qry.CopyTableDef.Name),
169+
zap.Error(err))
170+
return err
171+
}
172+
173+
//6. copy on writing unaffected index table
174+
if err = cowUnaffectedIndexes(
175+
c, dbName, qry.AffectedCols, newRel, qry.TableDef, nil,
176+
); err != nil {
177+
return err
178+
}
179+
180+
// 7. drop original table
160181
dropSql := fmt.Sprintf("drop table `%s`.`%s`", dbName, tblName)
161182
if err := c.runSqlWithOptions(
162183
dropSql,
@@ -170,7 +191,7 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
170191
return err
171192
}
172193

173-
// 5.1 delete all index objects of the table in mo_catalog.mo_indexes
194+
// 7.1 delete all index objects of the table in mo_catalog.mo_indexes
174195
if qry.Database != catalog.MO_CATALOG && qry.TableDef.Name != catalog.MO_INDEXES {
175196
if qry.GetTableDef().Pkey != nil || len(qry.GetTableDef().Indexes) > 0 {
176197
deleteSql := fmt.Sprintf(
@@ -190,20 +211,9 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
190211
}
191212
}
192213

193-
//6. obtain relation for new tables
194-
newRel, err := dbSource.Relation(c.proc.Ctx, qry.CopyTableDef.Name, nil)
195-
if err != nil {
196-
c.proc.Error(c.proc.Ctx, "obtain new relation for copy table for alter table",
197-
zap.String("databaseName", dbName),
198-
zap.String("origin tableName", qry.GetTableDef().Name),
199-
zap.String("copy table name", qry.CopyTableDef.Name),
200-
zap.Error(err))
201-
return err
202-
}
203-
204214
newId := newRel.GetTableID(c.proc.Ctx)
205215
//-------------------------------------------------------------------------
206-
// 7. rename temporary replica table into the original table(Table Id remains unchanged)
216+
// 8. rename temporary replica table into the original table(Table Id remains unchanged)
207217
copyTblName := qry.CopyTableDef.Name
208218
req := api.NewRenameTableReq(
209219
newRel.GetDBID(c.proc.Ctx),
@@ -226,7 +236,7 @@ func (s *Scope) AlterTableCopy(c *Compile) error {
226236
}
227237
//--------------------------------------------------------------------------------------------------------------
228238
{
229-
// 8. invoke reindex for the new table, if it contains ivf index.
239+
// 9. invoke reindex for the new table, if it contains ivf index.
230240
multiTableIndexes := make(map[string]*MultiTableIndex)
231241
newTableDef := newRel.CopyTableDef(c.proc.Ctx)
232242
extra := newRel.GetExtraInfo()
@@ -545,3 +555,92 @@ func notifyParentTableFkTableIdChange(c *Compile, fkey *plan.ForeignKeyDef, oldT
545555
}
546556
return fatherRelation.UpdateConstraint(c.proc.Ctx, oldCt)
547557
}
558+
559+
func cowUnaffectedIndexes(
560+
c *Compile,
561+
dbName string,
562+
affectedCols []string,
563+
newRel engine.Relation,
564+
oriTblDef *plan.TableDef,
565+
cloneSnapshot *plan.Snapshot,
566+
) (err error) {
567+
568+
var (
569+
clone *table_clone.TableClone
570+
571+
oriIdxTblDef *plan.TableDef
572+
oriIdxObjRef *plan.ObjectRef
573+
574+
newTblDef = newRel.GetTableDef(c.proc.Ctx)
575+
576+
oriIdxColNameToTblName = make(map[string]string)
577+
newIdxTColNameToTblName = make(map[string]string)
578+
)
579+
580+
releaseClone := func() {
581+
if clone != nil {
582+
clone.Free(c.proc, false, err)
583+
reuse.Free[table_clone.TableClone](clone, nil)
584+
clone = nil
585+
}
586+
}
587+
588+
defer func() {
589+
releaseClone()
590+
}()
591+
592+
for _, idxTbl := range oriTblDef.Indexes {
593+
if slices.Index(affectedCols, idxTbl.IndexName) != -1 {
594+
continue
595+
}
596+
597+
oriIdxColNameToTblName[idxTbl.IndexName] = idxTbl.IndexTableName
598+
}
599+
600+
for _, idxTbl := range newTblDef.Indexes {
601+
newIdxTColNameToTblName[idxTbl.IndexName] = idxTbl.IndexTableName
602+
}
603+
604+
cctx := compilerContext{
605+
ctx: c.proc.Ctx,
606+
defaultDB: dbName,
607+
engine: c.e,
608+
proc: c.proc,
609+
}
610+
611+
for colName, oriIdxTblName := range oriIdxColNameToTblName {
612+
newIdxTblName, ok := newIdxTColNameToTblName[colName]
613+
if !ok {
614+
continue
615+
}
616+
617+
oriIdxObjRef, oriIdxTblDef, err = cctx.Resolve(dbName, oriIdxTblName, cloneSnapshot)
618+
619+
clonePlan := plan.CloneTable{
620+
CreateTable: nil,
621+
ScanSnapshot: cloneSnapshot,
622+
SrcTableDef: oriIdxTblDef,
623+
SrcObjDef: oriIdxObjRef,
624+
DstDatabaseName: dbName,
625+
DstTableName: newIdxTblName,
626+
}
627+
628+
if clone, err = constructTableClone(c, &clonePlan); err != nil {
629+
return err
630+
}
631+
632+
if err = clone.Prepare(c.proc); err != nil {
633+
releaseClone()
634+
return err
635+
}
636+
637+
if _, err = clone.Call(c.proc); err != nil {
638+
releaseClone()
639+
return err
640+
}
641+
642+
releaseClone()
643+
}
644+
645+
return nil
646+
}

pkg/sql/compile/compile.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4926,26 +4926,20 @@ func (c *Compile) compileTableClone(
49264926
err error
49274927
s1 *Scope
49284928

4929-
nodes []engine.Node
4930-
cloneQry = pn.GetDdl().Query
4929+
node engine.Node
4930+
clonePlan = pn.GetDdl().GetCloneTable()
49314931
)
49324932

4933-
nodes, err = c.generateNodes(cloneQry.Nodes[0])
4934-
if err != nil {
4935-
return nil, err
4936-
}
4933+
node = getEngineNode(c)
49374934

4938-
copyOp, err := constructTableClone(c, cloneQry.Nodes[0])
4935+
copyOp, err := constructTableClone(c, clonePlan)
49394936
if err != nil {
49404937
return nil, err
49414938
}
49424939

49434940
s1 = newScope(TableClone)
4944-
s1.NodeInfo = nodes[0]
4941+
s1.NodeInfo = node
49454942
s1.TxnOffset = c.TxnOffset
4946-
s1.DataSource = &Source{
4947-
node: cloneQry.Nodes[0],
4948-
}
49494943
s1.Plan = pn
49504944

49514945
s1.Proc = c.proc.NewNoContextChildProc(0)

pkg/sql/compile/ddl.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2931,8 +2931,13 @@ func (s *Scope) TableClone(c *Compile) error {
29312931
err error
29322932
)
29332933

2934-
if err = s.CreateTable(c); err != nil {
2935-
return err
2934+
clonePlan := s.Plan.GetDdl().GetCloneTable()
2935+
2936+
if clonePlan.CreateTable != nil {
2937+
s.Plan = clonePlan.CreateTable
2938+
if err = s.CreateTable(c); err != nil {
2939+
return err
2940+
}
29362941
}
29372942

29382943
return s.Run(c)

pkg/sql/compile/operator.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2376,19 +2376,19 @@ func constructPostDml(n *plan.Node, eg engine.Engine) *postdml.PostDml {
23762376

23772377
func constructTableClone(
23782378
c *Compile,
2379-
n *plan.Node,
2379+
clonePlan *plan.CloneTable,
23802380
) (*table_clone.TableClone, error) {
23812381

23822382
metaCopy := table_clone.NewTableClone()
23832383

23842384
metaCopy.Ctx = &table_clone.TableCloneCtx{
23852385
Eng: c.e,
2386-
SrcTblDef: n.TableDef,
2387-
SrcObjDef: n.ObjRef,
2386+
SrcTblDef: clonePlan.SrcTableDef,
2387+
SrcObjDef: clonePlan.SrcObjDef,
23882388

2389-
ScanSnapshot: n.ScanSnapshot,
2390-
DstTblName: n.InsertCtx.TableDef.Name,
2391-
DstDatabaseName: n.InsertCtx.TableDef.DbName,
2389+
ScanSnapshot: clonePlan.ScanSnapshot,
2390+
DstTblName: clonePlan.DstTableName,
2391+
DstDatabaseName: clonePlan.DstDatabaseName,
23922392
}
23932393

23942394
var (
@@ -2401,7 +2401,7 @@ func constructTableClone(
24012401
hasAutoIncr bool
24022402
)
24032403

2404-
for _, colDef := range n.TableDef.Cols {
2404+
for _, colDef := range clonePlan.SrcTableDef.Cols {
24052405
if colDef.Typ.AutoIncr {
24062406
hasAutoIncr = true
24072407
break
@@ -2413,17 +2413,20 @@ func constructTableClone(
24132413
}
24142414

24152415
sql = fmt.Sprintf(
2416-
"select col_index, offset from mo_catalog.mo_increment_columns where table_id = %d", n.TableDef.TblId)
2416+
"select col_index, offset from mo_catalog.mo_increment_columns where table_id = %d",
2417+
clonePlan.SrcTableDef.TblId,
2418+
)
24172419

2418-
if n.ScanSnapshot != nil {
2419-
if n.ScanSnapshot.Tenant != nil {
2420-
account = n.ScanSnapshot.Tenant.TenantID
2420+
if clonePlan.ScanSnapshot != nil {
2421+
if clonePlan.ScanSnapshot.Tenant != nil {
2422+
account = clonePlan.ScanSnapshot.Tenant.TenantID
24212423
}
24222424

2423-
if n.ScanSnapshot.TS != nil {
2425+
if clonePlan.ScanSnapshot.TS != nil {
24242426
sql = fmt.Sprintf(
24252427
"select col_index, offset from mo_catalog.mo_increment_columns {MO_TS = %d} where table_id = %d",
2426-
n.ScanSnapshot.TS.PhysicalTime, n.TableDef.TblId)
2428+
clonePlan.ScanSnapshot.TS.PhysicalTime, clonePlan.SrcTableDef.TblId,
2429+
)
24272430
}
24282431
}
24292432

pkg/sql/compile/sql_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (exec *txnExecutor) Exec(
271271

272272
if v := statementOption.AlterCopyDedupOpt(); v != nil {
273273
exec.ctx = context.WithValue(exec.ctx,
274-
defines.AlterCopyDedupOpt{}, v)
274+
defines.AlterCopyOpt{}, v)
275275
}
276276

277277
receiveAt := time.Now()

pkg/sql/plan/bind_insert.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -279,16 +279,22 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
279279
}
280280
}
281281
} else {
282-
var skipPkDedup bool
283-
var skipUniqueIdxDedup map[string]bool
284-
if v := builder.compCtx.GetContext().Value(defines.AlterCopyDedupOpt{}); v != nil {
285-
dedupOpt := v.(*plan.AlterCopyDedupOpt)
286-
if dedupOpt.TargetTableName == tableDef.Name {
282+
283+
var (
284+
option *plan.AlterCopyOpt
285+
skipPkDedup bool
286+
skipUniqueIdxDedup map[string]bool
287+
)
288+
289+
if v := builder.compCtx.GetContext().Value(defines.AlterCopyOpt{}); v != nil {
290+
option = v.(*plan.AlterCopyOpt)
291+
if option.TargetTableName == tableDef.Name {
287292
logutil.Info("alter copy dedup exec",
288293
zap.String("tableDef", tableDef.Name),
289-
zap.Any("dedupOpt", dedupOpt))
290-
skipPkDedup = dedupOpt.SkipPkDedup
291-
skipUniqueIdxDedup = dedupOpt.SkipUniqueIdxDedup
294+
zap.Any("option", option),
295+
)
296+
skipPkDedup = option.SkipPkDedup
297+
skipUniqueIdxDedup = option.SkipUniqueIdxDedup
292298
}
293299
}
294300

@@ -382,6 +388,10 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
382388
continue
383389
}
384390

391+
if option != nil && option.SkipIndexesCopy[idxDef.IndexName] {
392+
continue
393+
}
394+
385395
idxObjRefs[i], idxTableDefs[i], err = builder.compCtx.ResolveIndexTableByRef(objRef, idxDef.IndexTableName, bindCtx.snapshot)
386396
if err != nil {
387397
return 0, err

0 commit comments

Comments
 (0)