Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,20 +557,33 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
Y_UNIT_TEST(WriteImmediateBadRequest) {
auto [runtime, server, sender] = TestCreateServer();

auto opts = TShardedTableOptions().Columns({{"key", "Utf8", true, false}});
auto opts = TShardedTableOptions().Columns({
{"key", "Utf8", true, true},
{"value1", "Uint32", false, false},
{"value2", "Uint32", false, true},
});
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
const ui64 shard = shards[0];

auto prepareEvWrite = [&](
NKikimrDataEvents::TEvWrite::TOperation::EOperationType opType,
TSerializedCellMatrix matrix,
const std::vector<ui32>& columnIds) {
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(opType, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
return evWrite;
};

Cout << "========= Send immediate write with huge key=========\n";
{
TString hugeStringValue(NLimits::MaxWriteKeySize + 1, 'X');
TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size())}, 1, 1);
TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size()), TCell::Make(ui32(123))}, 1, 2);

auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

const auto writeResult = Write(runtime, sender, shard, std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
const auto writeResult = Write(
runtime, sender, shard,
prepareEvWrite(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, std::move(matrix), {1, 3}),
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600"));
}
Expand All @@ -595,6 +608,43 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("OPERATION_UNSPECIFIED operation is not supported now"));
}

Cout << "========= Send immediate write with NULL value for NOT NULL column=========\n";
{
TString stringValue("KEY");
TSerializedCellMatrix matrix(
{TCell(stringValue.data(), stringValue.size()), TCell()},
1, 2);

const auto writeResult = Write(
runtime, sender, shard,
prepareEvWrite(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, matrix, {1, 3}),
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("NULL value for NON NULL column"));
}

Cout << "========= Send immediate write without data for NOT NULL column=========\n";
{
TString stringValue("KEY");
TSerializedCellMatrix matrix(
{TCell(stringValue.data(), stringValue.size()), TCell::Make(ui32(123))},
1, 2);

// Insert should fail.
const auto insertResult = Write(
runtime, sender, shard,
prepareEvWrite(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT, matrix, {1, 2}),
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(insertResult.GetIssues().size(), 1);
UNIT_ASSERT(insertResult.GetIssues(0).message().Contains("Missing inserted values for NON NULL column"));

// Update should complete successfully.
const auto updateResult = Write(
runtime, sender, shard,
prepareEvWrite(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPDATE, matrix, {1, 2}),
NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
}
}

Y_UNIT_TEST(WriteImmediateSeveralOperations) {
Expand Down
24 changes: 23 additions & 1 deletion ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,32 @@ std::tuple<NKikimrTxDataShard::TError::EKind, TString> TValidatedWriteTxOperatio
return {NKikimrTxDataShard::TError::SCHEME_ERROR, TStringBuilder() << "Key column schema at position " << i};
}

for (ui32 columnTag : ColumnIds) {
for (ui16 colIdx = 0; colIdx < ColumnIds.size(); ++colIdx) {
ui32 columnTag = ColumnIds[colIdx];
auto* col = tableInfo.Columns.FindPtr(columnTag);
if (!col)
return {NKikimrTxDataShard::TError::SCHEME_ERROR, TStringBuilder() << "Missing column with id " << columnTag};

if (col->NotNull) {
for (ui32 rowIdx = 0; rowIdx < Matrix.GetRowCount(); ++rowIdx) {
const TCell& cell = Matrix.GetCell(rowIdx, colIdx);
if (cell.IsNull()) {
return {NKikimrTxDataShard::TError::BAD_ARGUMENT, TStringBuilder() << "NULL value for NON NULL column " << columnTag};
}
}
}
}

if (OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT) {
// If we are inserting new rows, check that all NON NULL columns are present in data.
// Note that UPSERT can also insert new rows, but we don't know if this actually happens
// at this stage, so we skip the check for UPSERT.
auto columnIdsSet = THashSet<ui32>(ColumnIds.begin(), ColumnIds.end());
for (const auto& [id, column] : tableInfo.Columns) {
if (column.NotNull && !columnIdsSet.contains(id)) {
return {NKikimrTxDataShard::TError::BAD_ARGUMENT, TStringBuilder() << "Missing inserted values for NON NULL column " << id};
}
}
}

for (ui32 rowIdx = 0; rowIdx < Matrix.GetRowCount(); ++rowIdx)
Expand Down
Loading