Skip to content

Commit ce80288

Browse files
authored
[Stable-25-3-1] Fix EvWrite INTERNAL_ERROR (#29575)
2 parents 2f10047 + 4009ca5 commit ce80288

File tree

4 files changed

+312
-1
lines changed

4 files changed

+312
-1
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2190,6 +2190,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
21902190
try {
21912191
switch (ev->GetTypeRewrite()) {
21922192
hFunc(TEvKqpBuffer::TEvTerminate, Handle);
2193+
hFunc(TEvKqpBuffer::TEvRollback, Handle);
21932194
default:
21942195
AFL_ENSURE(false)("StateWaitTasks: unknown message", ev->GetTypeRewrite());
21952196
}
@@ -2204,6 +2205,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
22042205
try {
22052206
switch (ev->GetTypeRewrite()) {
22062207
hFunc(TEvKqpBuffer::TEvTerminate, Handle);
2208+
hFunc(TEvKqpBuffer::TEvRollback, Handle);
22072209
default:
22082210
AFL_ENSURE(false)("StateFlush: unknown message", ev->GetTypeRewrite());
22092211
}
@@ -2218,6 +2220,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
22182220
try {
22192221
switch (ev->GetTypeRewrite()) {
22202222
hFunc(TEvKqpBuffer::TEvTerminate, Handle);
2223+
hFunc(TEvKqpBuffer::TEvRollback, Handle);
22212224
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
22222225
hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, HandlePrepare);
22232226
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
@@ -2241,6 +2244,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
22412244
try {
22422245
switch (ev->GetTypeRewrite()) {
22432246
hFunc(TEvKqpBuffer::TEvTerminate, Handle);
2247+
hFunc(TEvKqpBuffer::TEvRollback, Handle);
22442248
hFunc(TEvTxProxy::TEvProposeTransactionStatus, Handle);
22452249
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandleCommit);
22462250
hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, HandleCommit);

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2085,6 +2085,30 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
20852085
}
20862086
}
20872087

2088+
void HandleCleanup(TEvKqpBuffer::TEvError::TPtr& ev) {
2089+
auto& msg = *ev->Get();
2090+
2091+
TString logMsg = TStringBuilder() << "got TEvKqpBuffer::TEvError in " << CurrentStateFuncName()
2092+
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << " send to: " << ExecuterId << " from: " << ev->Sender;
2093+
2094+
if (CleanupCtx->TransactionsToBeAborted.empty()) {
2095+
LOG_E(logMsg << ": Ignored error. TransactionsToBeAborted is empty.");
2096+
}
2097+
2098+
AFL_ENSURE(ExecuterId); // ExecuterId can't be empty during cleanup if TransactionsToBeAborted is not empty.
2099+
2100+
const auto& txCtx = CleanupCtx->TransactionsToBeAborted.front();
2101+
AFL_ENSURE(txCtx);
2102+
if (txCtx->BufferActorId != ev->Sender) {
2103+
LOG_E(logMsg << ": Ignored error. Current BufferActorId is not sender.");
2104+
return;
2105+
} else {
2106+
LOG_W(logMsg);
2107+
}
2108+
2109+
Send(ExecuterId, new TEvKqpBuffer::TEvError{msg.StatusCode, std::move(msg.Issues), std::move(msg.Stats)}, IEventHandle::FlagTrackDelivery);
2110+
}
2111+
20882112
void CollectSystemViewQueryStats(const TKqpQueryStats* stats, TDuration queryDuration,
20892113
const TString& database, ui64 requestUnits)
20902114
{
@@ -2731,6 +2755,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
27312755
if (QueryState) {
27322756
QueryState->Orbit = std::move(ev->Get()->Orbit);
27332757
}
2758+
ExecuterId = {};
27342759

27352760
auto& response = ev->Get()->Record.GetResponse();
27362761
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -2743,6 +2768,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
27432768
// Terminate BufferActors without waiting
27442769
TerminateBufferActor(txCtx);
27452770
}
2771+
CleanupCtx->TransactionsToBeAborted.clear();
2772+
27462773
EndCleanup(CleanupCtx->Final);
27472774
return;
27482775
}
@@ -2986,7 +3013,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29863013
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
29873014
hFunc(TEvKqp::TEvSplitResponse, HandleNoop);
29883015
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
2989-
hFunc(TEvKqpBuffer::TEvError, Handle);
3016+
hFunc(TEvKqpBuffer::TEvError, HandleCleanup);
29903017
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
29913018
hFunc(TEvents::TEvUndelivered, HandleNoop);
29923019
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
#include <ydb/core/kqp/common/buffer/events.h>
2+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
3+
#include <ydb/core/testlib/tablet_helpers.h>
4+
#include <ydb/core/tx/data_events/events.h>
5+
#include <ydb/core/tx/datashard/datashard.h>
6+
7+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
8+
9+
#include <util/generic/scope.h>
10+
11+
12+
namespace NKikimr {
13+
namespace NKqp {
14+
15+
using namespace NYdb;
16+
using namespace NYdb::NQuery;
17+
18+
Y_UNIT_TEST_SUITE(KqpFail) {
19+
20+
Y_UNIT_TEST(Immediate) {
21+
TKikimrSettings settings;
22+
settings.SetUseRealThreads(false);
23+
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
24+
25+
TKikimrRunner kikimr(settings);
26+
auto db = kikimr.GetQueryClient();
27+
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });
28+
29+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
30+
Y_UNUSED(runtime);
31+
32+
auto edgeActor = runtime.AllocateEdgeActor();
33+
34+
const auto& shards = GetTableShards(
35+
&kikimr.GetTestServer(),
36+
edgeActor,
37+
"/Root/KeyValue");
38+
39+
{
40+
const TString query =
41+
R"(
42+
SELECT * FROM `/Root/KeyValue`;
43+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (2, 'value');
44+
)";
45+
46+
std::vector<std::unique_ptr<IEventHandle>> requests;
47+
int stage = 0;
48+
49+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
50+
if (stage == 0 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
51+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
52+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
53+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Commit);
54+
++stage;
55+
requests.emplace_back(ev.Release());
56+
return TTestActorRuntime::EEventAction::DROP;
57+
} else if (stage == 1 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
58+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
59+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
60+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
61+
++stage;
62+
return TTestActorRuntime::EEventAction::PROCESS;
63+
}
64+
65+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvError::EventType);
66+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvTerminate::EventType);
67+
68+
return TTestActorRuntime::EEventAction::PROCESS;
69+
};
70+
71+
72+
runtime.SetObserverFunc(grab);
73+
74+
auto future = kikimr.RunInThreadPool([&]{
75+
return session.ExecuteQuery(
76+
query,
77+
TTxControl::BeginTx().CommitTx(),
78+
TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
79+
});
80+
81+
{
82+
TDispatchOptions opts;
83+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
84+
return stage == 1;
85+
});
86+
runtime.DispatchEvents(opts);
87+
}
88+
89+
auto result = runtime.WaitFuture(future);
90+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, result.GetIssues().ToString());
91+
92+
{
93+
TDispatchOptions opts;
94+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
95+
return stage == 2;
96+
});
97+
runtime.DispatchEvents(opts);
98+
}
99+
100+
UNIT_ASSERT(stage == 2);
101+
}
102+
}
103+
104+
Y_UNIT_TEST(OnPrepare) {
105+
TKikimrSettings settings;
106+
settings.SetUseRealThreads(false);
107+
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
108+
109+
TKikimrRunner kikimr(settings);
110+
auto db = kikimr.GetQueryClient();
111+
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });
112+
113+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
114+
Y_UNUSED(runtime);
115+
116+
auto edgeActor = runtime.AllocateEdgeActor();
117+
118+
const auto& shards = GetTableShards(
119+
&kikimr.GetTestServer(),
120+
edgeActor,
121+
"/Root/TwoShard");
122+
123+
{
124+
const TString query =
125+
R"(
126+
SELECT * FROM `/Root/TwoShard`;
127+
UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (2, 'value');
128+
)";
129+
130+
std::vector<std::unique_ptr<IEventHandle>> requests;
131+
int stage = 0;
132+
133+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
134+
if (stage < 2 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
135+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
136+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
137+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Commit);
138+
++stage;
139+
requests.emplace_back(ev.Release());
140+
return TTestActorRuntime::EEventAction::DROP;
141+
} else if (ev->GetTypeRewrite() == TEvTxProxy::TEvProposeTransaction::EventType) {
142+
UNIT_ASSERT(false);
143+
} else if (stage < 4 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
144+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
145+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
146+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
147+
++stage;
148+
return TTestActorRuntime::EEventAction::PROCESS;
149+
}
150+
151+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvError::EventType);
152+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvTerminate::EventType);
153+
154+
return TTestActorRuntime::EEventAction::PROCESS;
155+
};
156+
157+
158+
runtime.SetObserverFunc(grab);
159+
160+
auto future = kikimr.RunInThreadPool([&]{
161+
return session.ExecuteQuery(
162+
query,
163+
TTxControl::BeginTx().CommitTx(),
164+
TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
165+
});
166+
167+
{
168+
TDispatchOptions opts;
169+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
170+
return stage == 2;
171+
});
172+
runtime.DispatchEvents(opts);
173+
}
174+
175+
auto result = runtime.WaitFuture(future);
176+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, result.GetIssues().ToString());
177+
178+
{
179+
TDispatchOptions opts;
180+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
181+
return stage == 4;
182+
});
183+
runtime.DispatchEvents(opts);
184+
}
185+
186+
UNIT_ASSERT(stage == 4);
187+
}
188+
}
189+
190+
Y_UNIT_TEST(OnCommit) {
191+
TKikimrSettings settings;
192+
settings.SetUseRealThreads(false);
193+
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
194+
195+
TKikimrRunner kikimr(settings);
196+
auto db = kikimr.GetQueryClient();
197+
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });
198+
199+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
200+
Y_UNUSED(runtime);
201+
202+
auto edgeActor = runtime.AllocateEdgeActor();
203+
204+
const auto& shards = GetTableShards(
205+
&kikimr.GetTestServer(),
206+
edgeActor,
207+
"/Root/TwoShard");
208+
209+
{
210+
const TString query =
211+
R"(
212+
SELECT * FROM `/Root/TwoShard`;
213+
UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (2, 'value');
214+
)";
215+
216+
std::vector<std::unique_ptr<IEventHandle>> requests;
217+
int stage = 0;
218+
219+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
220+
if (stage < 2 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
221+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
222+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
223+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Commit);
224+
++stage;
225+
return TTestActorRuntime::EEventAction::PROCESS;
226+
} else if (ev->GetTypeRewrite() == TEvTxProxy::TEvProposeTransaction::EventType) {
227+
AFL_ENSURE(stage == 2);
228+
++stage;
229+
requests.emplace_back(ev.Release());
230+
return TTestActorRuntime::EEventAction::DROP;
231+
} else if (stage < 5 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
232+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
233+
UNIT_ASSERT(msg->Record.GetLocks().GetLocks().size() == 1);
234+
UNIT_ASSERT(msg->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
235+
++stage;
236+
return TTestActorRuntime::EEventAction::PROCESS;
237+
}
238+
239+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvError::EventType);
240+
UNIT_ASSERT(ev->GetTypeRewrite() != TEvKqpBuffer::TEvTerminate::EventType);
241+
242+
return TTestActorRuntime::EEventAction::PROCESS;
243+
};
244+
245+
246+
runtime.SetObserverFunc(grab);
247+
248+
auto future = kikimr.RunInThreadPool([&]{
249+
return session.ExecuteQuery(
250+
query,
251+
TTxControl::BeginTx().CommitTx(),
252+
TExecuteQuerySettings().ClientTimeout(TDuration::MilliSeconds(1000))).ExtractValueSync();
253+
});
254+
255+
{
256+
TDispatchOptions opts;
257+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
258+
return stage == 3;
259+
});
260+
runtime.DispatchEvents(opts);
261+
}
262+
263+
auto result = runtime.WaitFuture(future);
264+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, result.GetIssues().ToString());
265+
266+
{
267+
TDispatchOptions opts;
268+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
269+
return stage == 5;
270+
});
271+
runtime.DispatchEvents(opts);
272+
}
273+
274+
UNIT_ASSERT(stage == 5);
275+
}
276+
}
277+
}
278+
}
279+
}

ydb/core/kqp/ut/effects/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ ELSE()
1111
ENDIF()
1212

1313
SRCS(
14+
kqp_connection_ut.cpp
1415
kqp_effects_ut.cpp
1516
kqp_immediate_effects_ut.cpp
1617
kqp_inplace_update_ut.cpp

0 commit comments

Comments
 (0)