Skip to content

Commit

Permalink
fix and refactoring test
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov committed Dec 3, 2024
1 parent 98db446 commit f4983f4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ TConclusionStatus TReadMetadata::Init(
for (auto&& i : CommittedBlobs) {
if (!i.IsCommitted()) {
if (owner->HasLongTxWrites(i.GetInsertWriteId())) {
AddLongTxWriteId(i.GetInsertWriteId());
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i.GetInsertWriteId());
AddWriteIdToCheck(i.GetInsertWriteId(), op->GetLockId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct TReadMetadata : public TReadMetadataBase {

THashMap<ui64, std::shared_ptr<TAtomicCounter>> LockConflictCounters;
THashMap<TInsertWriteId, TWriteIdInfo> ConflictedWriteIds;
THashSet<TInsertWriteId> LongTxWriteIds;

virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override;
Expand Down Expand Up @@ -79,6 +80,14 @@ struct TReadMetadata : public TReadMetadataBase {
return it->second.IsConflictable();
}

void AddLongTxWriteId(const TInsertWriteId writeId) {
AFL_VERIFY(LongTxWriteIds.emplace(writeId).second);
}

bool IsLongTxWriteId(const TInsertWriteId writeId) const {
return LongTxWriteIds.contains(writeId);
}

void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) {
auto it = LockConflictCounters.find(lockId);
if (it == LockConflictCounters.end()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (i.IsCommitted()) {
continue;
}
if (GetReadMetadata()->IsLongTxWriteId(i.GetInsertWriteId())) {
continue;
}
if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) {
continue;
}
Expand All @@ -36,6 +39,9 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)

for (auto&& i : committed) {
if (!i.IsCommitted()) {
if (GetReadMetadata()->IsLongTxWriteId(i.GetInsertWriteId())) {
continue;
}
if (GetReadMetadata()->IsWriteConflictable(i.GetInsertWriteId())) {
continue;
}
Expand Down
34 changes: 24 additions & 10 deletions ydb/tests/olap/scenario/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
ScenarioTestHelper,
TestContext,
CreateTable,
DropTable,
)

from ydb import PrimitiveType
from typing import List, Dict, Any
from ydb.tests.olap.lib.utils import get_external_param
import threading

class TestInsert(BaseTestSet):
Expand All @@ -29,33 +29,47 @@ def _loop_upsert(self, ctx: TestContext, data: list):
for batch in data:
sth.bulk_upsert_data("log", self.schema_log, batch)

def _loop_insert(self, ctx: TestContext):
def _loop_insert(self, ctx: TestContext, rows_count: int):
sth = ScenarioTestHelper(ctx)
for i in range(100):
for i in range(rows_count):
sth.execute_query(f"$cnt = SELECT CAST(COUNT(*) AS INT64) from `{sth.get_full_path("log")}`; INSERT INTO `{sth.get_full_path("cnt") }` (key, c) values({i}, $cnt)")

def scenario_read_data_during_bulk_upsert(self, ctx: TestContext):
sth = ScenarioTestHelper(ctx)
cnt_table_name: str = "cnt"
log_table_name: str = "log"
batches_count = int(get_external_param('batches_count', '10'))
rows_count = int(get_external_param('rows_count', '1000'))
inserts_count = int(get_external_param('inserts_count', '200'))
sth.execute_scheme_query(CreateTable(cnt_table_name).with_schema(self.schema_cnt))
sth.execute_scheme_query(CreateTable(log_table_name).with_schema(self.schema_log))

data: list = []
for i in range(100):
data: List = []
for i in range(batches_count):
batch: List[Dict[str, Any]] = []
for j in range(i, 1000):
batch.append({'key' : j})
for j in range(rows_count):
batch.append({'key' : j + rows_count * i})
data.append(batch)

thread1 = threading.Thread(target=self._loop_upsert, args=[ctx, data])
thread2 = threading.Thread(target=self._loop_insert, args=[ctx])
thread2 = threading.Thread(target=self._loop_insert, args=[ctx, inserts_count])

thread1.start()
thread2.start()

thread2.join()
thread1.join()

sth.execute_scheme_query(DropTable(cnt_table_name))
sth.execute_scheme_query(DropTable(log_table_name))
rows: int = sth.get_table_rows_count(cnt_table_name)
assert rows == inserts_count
scan_result = sth.execute_scan_query(f'SELECT key, c FROM `{sth.get_full_path(cnt_table_name)}` ORDER BY key')
for i in range(rows):
if scan_result.result_set.rows[i]['key'] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

rows: int = sth.get_table_rows_count(log_table_name)
assert rows == rows_count * batches_count
scan_result = sth.execute_scan_query(f'SELECT key FROM `{sth.get_full_path(log_table_name)}` ORDER BY key')
for i in range(rows):
if scan_result.result_set.rows[i]['key'] != i:
assert False, f"{i} ?= {scan_result.result_set.rows[i]['key']}"

0 comments on commit f4983f4

Please sign in to comment.