Skip to content

Commit 4ce75c2

Browse files
authored
Local topic for transfer (ydb-platform#18926) (ydb-platform#20029)
2 parents d982c13 + 89cca86 commit 4ce75c2

File tree

120 files changed

+4470
-996
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

120 files changed

+4470
-996
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class TLocalPartitionReader
7777
}
7878
Y_VERIFY_S(record.GetErrorCode() == NPersQueue::NErrorCode::OK, "Unimplemented!");
7979
Y_VERIFY_S(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdGetClientOffsetResult(), "Unimplemented!");
80-
auto resp = record.GetPartitionResponse().GetCmdGetClientOffsetResult();
80+
const auto& resp = record.GetPartitionResponse().GetCmdGetClientOffsetResult();
8181
Offset = resp.GetOffset();
8282
SentOffset = Offset;
8383

ydb/core/base/events.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ struct TKikimrEvents : TEvents {
188188
ES_CONVEYOR_COMPOSITE = 4265,
189189
ES_GENERAL_CACHE_PUBLIC = 4266,
190190
ES_GENERAL_CACHE_SOURCE = 4267,
191-
191+
ES_TRANSFER = 4268,
192192
};
193193
};
194194

ydb/core/formats/arrow/arrow_batch_builder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) {
217217
NumBytes += cell.Size();
218218
auto ydbType = YdbSchema[colNum].second;
219219
auto status = NKikimr::NArrow::AppendCell(*BatchBuilder, cell, colNum, ydbType);
220-
Y_ABORT_UNLESS(status.ok(), "Faield to append cell: %s", status.ToString().c_str());
220+
Y_ABORT_UNLESS(status.ok(), "Failed to append cell: %s", status.ToString().c_str());
221221
}
222222

223223
void TArrowBatchBuilder::AddRow(const TDbTupleRef& key, const TDbTupleRef& value) {

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ namespace NKikimr::NGRpcService {
1818
using namespace Ydb;
1919

2020
using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;
21+
using TEvDescribeTransfer = TGrpcRequestOperationCall<Replication::DescribeTransferRequest, Replication::DescribeTransferResponse>;
2122

22-
class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
23-
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;
23+
template <typename TReq, typename TResp, typename TResult>
24+
class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC<TReq, TResp, TResult>, TGrpcRequestOperationCall<TReq, TResp>> {
25+
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC<TReq, TResp, TResult>, TGrpcRequestOperationCall<TReq, TResp>>;
26+
using TThis = TDescribeReplicationRPC<TReq, TResp, TResult>;
2427

2528
public:
2629
using TBase::TBase;
@@ -31,7 +34,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
3134

3235
void PassAway() override {
3336
if (ControllerPipeClient) {
34-
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
37+
NTabletPipe::CloseAndForgetClient(TBase::SelfId(), ControllerPipeClient);
3538
}
3639

3740
TBase::PassAway();
@@ -40,12 +43,12 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
4043
private:
4144
void DescribeScheme() {
4245
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
43-
SetAuthToken(ev, *Request_);
44-
SetDatabase(ev.get(), *Request_);
45-
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());
46+
SetAuthToken(ev, *TBase::Request_);
47+
SetDatabase(ev.get(), *TBase::Request_);
48+
ev->Record.MutableDescribePath()->SetPath(TBase::GetProtoRequest()->path());
4649

47-
Send(MakeTxProxyID(), ev.release());
48-
Become(&TDescribeReplicationRPC::StateDescribeScheme);
50+
TBase::Send(MakeTxProxyID(), ev.release());
51+
TBase::Become(&TThis::StateDescribeScheme);
4952
}
5053

5154
STATEFN(StateDescribeScheme) {
@@ -62,7 +65,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
6265

6366
if (record.HasReason()) {
6467
auto issue = NYql::TIssue(record.GetReason());
65-
Request_->RaiseIssue(issue);
68+
TBase::Request_->RaiseIssue(issue);
6669
}
6770

6871
switch (record.GetStatus()) {
@@ -73,8 +76,8 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
7376
break;
7477
default: {
7578
auto issue = NYql::TIssue("Is not a replication");
76-
Request_->RaiseIssue(issue);
77-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
79+
TBase::Request_->RaiseIssue(issue);
80+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
7881
}
7982
}
8083

@@ -84,16 +87,16 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
8487

8588
case NKikimrScheme::StatusPathDoesNotExist:
8689
case NKikimrScheme::StatusSchemeError:
87-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
90+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
8891

8992
case NKikimrScheme::StatusAccessDenied:
90-
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
93+
return TBase::Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
9194

9295
case NKikimrScheme::StatusNotAvailable:
93-
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
96+
return TBase::Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
9497

9598
default: {
96-
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
99+
return TBase::Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
97100
}
98101
}
99102
}
@@ -104,15 +107,15 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
104107
config.RetryPolicy = {
105108
.RetryLimitCount = 3,
106109
};
107-
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
110+
ControllerPipeClient = TBase::Register(NTabletPipe::CreateClient(TBase::SelfId(), tabletId, config));
108111
}
109112

110113
auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
111114
pathId.ToProto(ev->Record.MutablePathId());
112-
ev->Record.SetIncludeStats(GetProtoRequest()->include_stats());
115+
BuildRequest(TBase::GetProtoRequest(), ev->Record);
113116

114-
NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
115-
Become(&TDescribeReplicationRPC::StateDescribeReplication);
117+
NTabletPipe::SendData(TBase::SelfId(), ControllerPipeClient, ev.release());
118+
TBase::Become(&TThis::StateDescribeReplication);
116119
}
117120

118121
STATEFN(StateDescribeReplication) {
@@ -130,20 +133,14 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
130133
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
131134
break;
132135
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
133-
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
136+
return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
134137
default:
135-
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
138+
return TBase::Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
136139
}
137140

138-
ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
139-
ConvertConsistencySettings(record.GetConsistencySettings(), Result);
140-
ConvertState(*record.MutableState(), Result);
141+
Convert(record, Result);
141142

142-
for (const auto& target : record.GetTargets()) {
143-
ConvertItem(target, *Result.add_items());
144-
}
145-
146-
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
143+
return TBase::ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
147144
}
148145

149146
static TString BuildConnectionString(const NKikimrReplication::TConnectionParams& params) {
@@ -214,17 +211,26 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
214211
}
215212
}
216213

217-
static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
214+
static void ConvertStats(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
215+
if (from.GetStandBy().HasLagMilliSeconds()) {
216+
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
217+
from.GetStandBy().GetLagMilliSeconds());
218+
}
219+
if (from.GetStandBy().HasInitialScanProgress()) {
220+
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
221+
}
222+
}
223+
224+
static void ConvertStats(NKikimrReplication::TReplicationState&, Ydb::Replication::DescribeTransferResult&) {
225+
// nop
226+
}
227+
228+
template<typename T>
229+
static void ConvertState(NKikimrReplication::TReplicationState& from, T& to) {
218230
switch (from.GetStateCase()) {
219231
case NKikimrReplication::TReplicationState::kStandBy:
220232
to.mutable_running();
221-
if (from.GetStandBy().HasLagMilliSeconds()) {
222-
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
223-
from.GetStandBy().GetLagMilliSeconds());
224-
}
225-
if (from.GetStandBy().HasInitialScanProgress()) {
226-
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
227-
}
233+
ConvertStats(from, to);
228234
break;
229235
case NKikimrReplication::TReplicationState::kError:
230236
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
@@ -240,20 +246,65 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
240246
}
241247
}
242248

249+
static void Convert(NKikimrReplication::TEvDescribeReplicationResult& record, Replication::DescribeReplicationResult& result) {
250+
ConvertConnectionParams(record.GetConnectionParams(), *result.mutable_connection_params());
251+
ConvertConsistencySettings(record.GetConsistencySettings(), result);
252+
ConvertState(*record.MutableState(), result);
253+
254+
for (const auto& target : record.GetTargets()) {
255+
ConvertItem(target, *result.add_items());
256+
}
257+
}
258+
259+
static void Convert(NKikimrReplication::TEvDescribeReplicationResult& record, Replication::DescribeTransferResult& result) {
260+
ConvertConnectionParams(record.GetConnectionParams(), *result.mutable_connection_params());
261+
ConvertState(*record.MutableState(), result);
262+
263+
const auto& transferSpecific = record.GetTransferSpecific();
264+
result.set_source_path(transferSpecific.GetTarget().GetSrcPath());
265+
result.set_destination_path(transferSpecific.GetTarget().GetDstPath());
266+
result.set_consumer_name(transferSpecific.GetTarget().GetConsumerName());
267+
result.set_transformation_lambda(transferSpecific.GetTarget().GetTransformLambda());
268+
result.mutable_batch_settings()->set_size_bytes(transferSpecific.GetBatching().GetBatchSizeBytes());
269+
result.mutable_batch_settings()->mutable_flush_interval()->set_seconds(transferSpecific.GetBatching().GetFlushIntervalMilliSeconds() / 1000);
270+
}
271+
272+
static void BuildRequest(const Replication::DescribeReplicationRequest* from, NKikimrReplication::TEvDescribeReplication& to) {
273+
to.SetIncludeStats(from->include_stats());
274+
}
275+
276+
static void BuildRequest(const Replication::DescribeTransferRequest*, NKikimrReplication::TEvDescribeReplication&) {
277+
// nop
278+
}
279+
243280
private:
244-
Ydb::Replication::DescribeReplicationResult Result;
281+
TResult Result;
245282
TActorId ControllerPipeClient;
246283
};
247284

285+
using TDescribeReplicationActor = TDescribeReplicationRPC<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse, Replication::DescribeReplicationResult>;
286+
using TDescribeTransferActor = TDescribeReplicationRPC<Replication::DescribeTransferRequest, Replication::DescribeTransferResponse, Replication::DescribeTransferResult>;
287+
248288
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
249-
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
289+
f.RegisterActor(new TDescribeReplicationActor(p.release()));
290+
}
291+
292+
void DoDescribeTransfer(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
293+
f.RegisterActor(new TDescribeTransferActor(p.release()));
250294
}
251295

252296
using TEvDescribeReplicationRequest = TGrpcRequestOperationCall<Ydb::Replication::DescribeReplicationRequest, Ydb::Replication::DescribeReplicationResponse>;
253297

254298
template<>
255299
IActor* TEvDescribeReplicationRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
256-
return new TDescribeReplicationRPC(msg);
300+
return new TDescribeReplicationActor(msg);
301+
}
302+
303+
using TEvDescribeTransferRequest = TGrpcRequestOperationCall<Ydb::Replication::DescribeTransferRequest, Ydb::Replication::DescribeTransferResponse>;
304+
305+
template<>
306+
IActor* TEvDescribeTransferRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
307+
return new TDescribeTransferActor(msg);
257308
}
258309

259310
}

ydb/core/grpc_services/service_replication.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ class IRequestOpCtx;
88
class IFacilityProvider;
99

1010
void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
11+
void DoDescribeTransfer(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1112

1213
}

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2636,6 +2636,9 @@ class TKqpGatewayProxy : public IKikimrGateway {
26362636
if (settings.Settings.ConsumerName) {
26372637
target.SetConsumerName(*settings.Settings.ConsumerName);
26382638
}
2639+
if (settings.Settings.DirectoryPath) {
2640+
target.SetDirectoryPath(*settings.Settings.DirectoryPath);
2641+
}
26392642
}
26402643

26412644
if (IsPrepare()) {
@@ -2690,6 +2693,10 @@ class TKqpGatewayProxy : public IKikimrGateway {
26902693
}
26912694
}
26922695

2696+
if (settings.Settings.DirectoryPath) {
2697+
op.MutableAlterTransfer()->SetDirectoryPath(*settings.Settings.DirectoryPath);
2698+
}
2699+
26932700
if (const auto& done = settings.Settings.StateDone) {
26942701
auto& state = *op.MutableState();
26952702
state.MutableDone()->SetFailoverMode(

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,14 @@ namespace {
870870
}
871871

872872
dstSettings.ConsumerName = value;
873+
} else if (name == "directory") {
874+
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
875+
if (value.empty()) {
876+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
877+
TStringBuilder() << name << " must be not empty"));
878+
return false;
879+
}
880+
dstSettings.DirectoryPath = value;
873881
}
874882
}
875883

@@ -2435,7 +2443,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
24352443
return SyncError();
24362444
}
24372445

2438-
if (!settings.Settings.ConnectionString && (!settings.Settings.Endpoint || !settings.Settings.Database)) {
2446+
if (!settings.Settings.Endpoint ^ !settings.Settings.Database) {
24392447
ctx.AddError(TIssue(ctx.GetPosition(createTransfer.Pos()),
24402448
"Neither CONNECTION_STRING nor ENDPOINT/DATABASE are provided"));
24412449
return SyncError();

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,8 @@ struct TTransferSettings : public TReplicationSettingsBase {
932932

933933
return *Batching;
934934
}
935+
936+
TMaybe<TString> DirectoryPath;
935937
};
936938

937939
struct TCreateTransferSettings {

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,6 +1841,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
18411841
"flush_interval",
18421842
"batch_size_bytes",
18431843
"consumer",
1844+
"directory",
18441845
};
18451846

18461847
if (!CheckReplicationSettings(node.TransferSettings(), supportedSettings, ctx)) {
@@ -1870,6 +1871,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
18701871
"failover_mode",
18711872
"flush_interval",
18721873
"batch_size_bytes",
1874+
"directory"
18731875
};
18741876

18751877
if (!CheckReplicationSettings(node.TransferSettings(), supportedSettings, ctx)) {

0 commit comments

Comments
 (0)