Skip to content

Commit 6bb57b1

Browse files
authored
to 3.0: clone with snapshot should match the snapshot level and owner (#22429)
A clone with a snapshot should match the snapshot's level and owner. Approved by: @heni02, @XuPeng-SH, @aunjgr, @daviszhen
1 parent c0d2cd2 commit 6bb57b1

File tree

10 files changed

+1414
-933
lines changed

10 files changed

+1414
-933
lines changed

pkg/frontend/authenticate.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,10 +1297,7 @@ const (
12971297

12981298
checkDatabaseWithOwnerFormat = `select dat_id, owner from mo_catalog.mo_database where datname = "%s" and account_id = %d;`
12991299

1300-
checkDatabaseTableFormat = `select t.rel_id from mo_catalog.mo_database d, mo_catalog.mo_tables t
1301-
where d.dat_id = t.reldatabase_id
1302-
and d.datname = "%s"
1303-
and t.relname = "%s";`
1300+
checkDatabaseTableFormat = `select rel_id from mo_catalog.mo_tables where relname = "%s" and reldatabase = "%s" and account_id = %d;`
13041301

13051302
//TODO:fix privilege_level string and obj_type string
13061303
//For object_type : table, privilege_level : *.*
@@ -1941,12 +1938,30 @@ func getSqlForCheckDatabaseWithOwner(ctx context.Context, dbName string, account
19411938
return fmt.Sprintf(checkDatabaseWithOwnerFormat, dbName, accountId), nil
19421939
}
19431940

1944-
func getSqlForCheckDatabaseTable(ctx context.Context, dbName, tableName string) (string, error) {
1941+
func getSqlForCheckDatabaseTable(
1942+
ctx context.Context,
1943+
dbName string,
1944+
tableName string,
1945+
) (string, error) {
1946+
19451947
err := inputNameIsInvalid(ctx, dbName, tableName)
19461948
if err != nil {
19471949
return "", err
19481950
}
1949-
return fmt.Sprintf(checkDatabaseTableFormat, dbName, tableName), nil
1951+
1952+
var (
1953+
account uint32
1954+
)
1955+
1956+
if v := ctx.Value(defines.TenantIDKey{}); v != nil {
1957+
account = v.(uint32)
1958+
} else {
1959+
return "", moerr.NewInternalErrorNoCtx("no account id found in the ctx")
1960+
}
1961+
1962+
// we need the account id here to filter out the same dbName and tableName that exist in the
1963+
// different accounts.
1964+
return fmt.Sprintf(checkDatabaseTableFormat, tableName, dbName, account), nil
19501965
}
19511966

19521967
func getSqlForDeleteRole(roleId int64) []string {
@@ -6714,6 +6729,8 @@ func authenticateUserCanExecuteStatementWithObjectTypeAccountAndDatabase(ctx con
67146729
}
67156730
tbName := string(st.Names[0].ObjectName)
67166731
return checkRoleWhetherTableOwner(ctx, ses, dbName, tbName, ok)
6732+
case *tree.CloneTable, *tree.CloneDatabase:
6733+
return true, stats, nil
67176734
}
67186735
}
67196736
return ok, stats, nil

pkg/frontend/authenticate_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5134,6 +5134,8 @@ func Test_doGrantPrivilege(t *testing.T) {
51345134
},
51355135
}
51365136

5137+
ctx := context.WithValue(context.TODO(), defines.TenantIDKey{}, uint32(sysAccountID))
5138+
51375139
for _, stmt := range stmts {
51385140
priv := determinePrivilegeSetOfStatement(stmt)
51395141
ses := newSes(priv, ctrl)
@@ -5165,14 +5167,14 @@ func Test_doGrantPrivilege(t *testing.T) {
51655167
bh.sql2result[sql] = mrs
51665168
} else if stmt.Level.Level == tree.PRIVILEGE_LEVEL_TYPE_TABLE ||
51675169
stmt.Level.Level == tree.PRIVILEGE_LEVEL_TYPE_DATABASE_TABLE {
5168-
sql, _ := getSqlForCheckDatabaseTable(context.TODO(), dbName, tableName)
5170+
sql, _ := getSqlForCheckDatabaseTable(ctx, dbName, tableName)
51695171
mrs := newMrsForCheckDatabaseTable([][]interface{}{
51705172
{0},
51715173
})
51725174
bh.sql2result[sql] = mrs
51735175
}
51745176

5175-
_, objId, err := checkPrivilegeObjectTypeAndPrivilegeLevel(context.TODO(), ses, bh, stmt.ObjType, *stmt.Level)
5177+
_, objId, err := checkPrivilegeObjectTypeAndPrivilegeLevel(ctx, ses, bh, stmt.ObjType, *stmt.Level)
51765178
convey.So(err, convey.ShouldBeNil)
51775179

51785180
for _, p := range stmt.Privileges {
@@ -5436,6 +5438,8 @@ func Test_doRevokePrivilege(t *testing.T) {
54365438
},
54375439
}
54385440

5441+
ctx := context.WithValue(context.TODO(), defines.TenantIDKey{}, uint32(sysAccountID))
5442+
54395443
for _, stmt := range stmts {
54405444
priv := determinePrivilegeSetOfStatement(stmt)
54415445
ses := newSes(priv, ctrl)
@@ -5467,14 +5471,14 @@ func Test_doRevokePrivilege(t *testing.T) {
54675471
bh.sql2result[sql] = mrs
54685472
} else if stmt.Level.Level == tree.PRIVILEGE_LEVEL_TYPE_TABLE ||
54695473
stmt.Level.Level == tree.PRIVILEGE_LEVEL_TYPE_DATABASE_TABLE {
5470-
sql, _ := getSqlForCheckDatabaseTable(context.TODO(), dbName, tableName)
5474+
sql, _ := getSqlForCheckDatabaseTable(ctx, dbName, tableName)
54715475
mrs := newMrsForCheckDatabaseTable([][]interface{}{
54725476
{0},
54735477
})
54745478
bh.sql2result[sql] = mrs
54755479
}
54765480

5477-
_, objId, err := checkPrivilegeObjectTypeAndPrivilegeLevel(context.TODO(), ses, bh, stmt.ObjType, *stmt.Level)
5481+
_, objId, err := checkPrivilegeObjectTypeAndPrivilegeLevel(ctx, ses, bh, stmt.ObjType, *stmt.Level)
54785482
convey.So(err, convey.ShouldBeNil)
54795483

54805484
for _, p := range stmt.Privileges {

pkg/frontend/snapshot.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,6 +1627,11 @@ func doResolveSnapshotWithSnapshotName(ctx context.Context, ses FeSession, snaps
16271627
TenantName: record.accountName,
16281628
TenantID: accountId,
16291629
},
1630+
ExtraInfo: &pbplan.SnapshotExtraInfo{
1631+
Level: record.level,
1632+
ObjId: record.objId,
1633+
Name: record.snapshotName,
1634+
},
16301635
}, nil
16311636
}
16321637

0 commit comments

Comments
 (0)