From 18bbe16aaef68191eae259d9cd2fe758fd6720e6 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 13 Mar 2020 20:23:36 +0300 Subject: [PATCH 1/3] Fix compatibility in replication --- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 8 +- .../ReplicatedMergeTreeMutationEntry.cpp | 4 +- .../__init__.py | 0 .../test.py | 74 +++++++++++++++++++ 4 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 dbms/tests/integration/test_version_update_after_mutation/__init__.py create mode 100644 dbms/tests/integration/test_version_update_after_mutation/test.py diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 83532a8276db..4c6cacfa87dd 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -19,7 +19,7 @@ namespace ErrorCodes void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const { - out << "format version: 4\n" + out << "format version: 5\n" << "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n" << "source replica: " << source_replica << '\n' << "block_id: " << escape << block_id << '\n'; @@ -108,7 +108,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> "format version: " >> format_version >> "\n"; - if (format_version < 1 || format_version > 4) + if (format_version < 1 || format_version > 5) throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); if (format_version >= 2) @@ -177,7 +177,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) >> "to\n" >> new_part_name; source_parts.push_back(source_part); - in >> "\nalter_version\n" >> alter_version; + + if (format_version >= 5) + in >> "\nalter_version\n" >> alter_version; } else if (type_str == "alter") { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp index ea744b8f91e5..b2299b2cbbdc 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB @@ -52,7 +53,8 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) in >> "commands: "; commands.readText(in); - in >> "\nalter version: " >> alter_version; + if (checkString("\nalter version: ", in)) + in >> alter_version; } String ReplicatedMergeTreeMutationEntry::toString() const diff --git a/dbms/tests/integration/test_version_update_after_mutation/__init__.py b/dbms/tests/integration/test_version_update_after_mutation/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dbms/tests/integration/test_version_update_after_mutation/test.py b/dbms/tests/integration/test_version_update_after_mutation/test.py new file mode 100644 index 000000000000..3f2063bdddb0 --- /dev/null +++ b/dbms/tests/integration/test_version_update_after_mutation/test.py @@ -0,0 +1,74 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) +node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) +node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + + yield cluster + finally: + cluster.shutdown() + + +def test_mutate_and_upgrade(start_cluster): + for node in [node1, node2]: + node.query("CREATE TABLE mt (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple()".format(node.name)) + + node1.query("INSERT INTO mt VALUES ('2020-02-13', 1), ('2020-02-13', 2);") + + node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"}) + node2.query("SYSTEM SYNC REPLICA mt", timeout=5) + + node1.restart_with_latest_version() + node2.restart_with_latest_version() + + node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);") + + node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "2\n" + assert node2.query("SELECT COUNT() FROM mt") == "2\n" + + node1.query("INSERT INTO mt VALUES ('2020-02-13', 4);") + + node2.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "3\n" + assert node2.query("SELECT COUNT() FROM mt") == "3\n" + + node2.query("ALTER TABLE mt DELETE WHERE id = 3", settings={"mutations_sync": "2"}) + + node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "2\n" + assert node2.query("SELECT COUNT() FROM mt") == "2\n" + + node1.query("ALTER TABLE mt MODIFY COLUMN id String DEFAULT '0'", settings={"replication_alter_partitions_sync": "2"}) + + node2.query("OPTIMIZE TABLE mt FINAL") + + assert node1.query("SELECT id FROM mt") == "1\n4\n" + assert node2.query("SELECT id FROM mt") == "1\n4\n" + + +def test_upgrade_while_mutation(start_cluster): + node3.query("CREATE TABLE mt1 (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', 'node3') ORDER BY tuple()") + + node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)") + + node3.query("SYSTEM STOP MERGES") + node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0") + + node3.restart_with_latest_version() + + assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n") From b0cc690cfb7585fd16b41c98686991b5ff18baa2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 15 Mar 2020 22:33:25 +0300 Subject: [PATCH 2/3] Fix entries compatibility without versions update --- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 4c6cacfa87dd..6f48055318be 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -19,7 +19,7 @@ namespace ErrorCodes void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const { - out << "format version: 5\n" + out << "format version: 4\n" << "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n" << "source replica: " << source_replica << '\n' << "block_id: " << escape << block_id << '\n'; @@ -70,8 +70,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const << source_parts.at(0) << "\n" << "to\n" << new_part_name; - out << "\nalter_version\n"; - out << alter_version; + + if (isAlterMutation()) + out << "\nalter_version\n" << alter_version; break; case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global @@ -127,6 +128,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> type_str >> "\n"; + bool trailing_newline_found = false; if (type_str == "get") { type = GET_PART; @@ -178,8 +180,12 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) >> new_part_name; source_parts.push_back(source_part); - if (format_version >= 5) - in >> "\nalter_version\n" >> alter_version; + in >> "\n"; + + if (in.eof()) + trailing_newline_found = true; + else if (checkString("alter_version\n", in)) + in >> alter_version; } else if (type_str == "alter") { @@ -200,7 +206,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in.readStrict(&metadata_str[0], metadata_size); } - in >> "\n"; + if (!trailing_newline_found) + in >> "\n"; + if (checkString("part_type: ", in)) { String part_type_str; From 2c16732536f8fb18786307096e5cd7bc91d8fd50 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 15 Mar 2020 22:34:36 +0300 Subject: [PATCH 3/3] Revert version check --- dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 6f48055318be..7aac3128930c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -109,7 +109,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> "format version: " >> format_version >> "\n"; - if (format_version < 1 || format_version > 5) + if (format_version < 1 || format_version > 4) throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); if (format_version >= 2)