diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 83532a8276db..7aac3128930c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -70,8 +70,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const << source_parts.at(0) << "\n" << "to\n" << new_part_name; - out << "\nalter_version\n"; - out << alter_version; + + if (isAlterMutation()) + out << "\nalter_version\n" << alter_version; break; case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global @@ -127,6 +128,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> type_str >> "\n"; + bool trailing_newline_found = false; if (type_str == "get") { type = GET_PART; @@ -177,7 +179,13 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) >> "to\n" >> new_part_name; source_parts.push_back(source_part); - in >> "\nalter_version\n" >> alter_version; + + in >> "\n"; + + if (in.eof()) + trailing_newline_found = true; + else if (checkString("alter_version\n", in)) + in >> alter_version; } else if (type_str == "alter") { @@ -198,7 +206,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in.readStrict(&metadata_str[0], metadata_size); } - in >> "\n"; + if (!trailing_newline_found) + in >> "\n"; + if (checkString("part_type: ", in)) { String part_type_str; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp index ea744b8f91e5..b2299b2cbbdc 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace DB @@ -52,7 +53,8 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) in >> "commands: "; commands.readText(in); - in >> "\nalter version: " >> alter_version; + if (checkString("\nalter version: ", in)) + in >> alter_version; } String ReplicatedMergeTreeMutationEntry::toString() const diff --git a/dbms/tests/integration/test_version_update_after_mutation/__init__.py b/dbms/tests/integration/test_version_update_after_mutation/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/dbms/tests/integration/test_version_update_after_mutation/test.py b/dbms/tests/integration/test_version_update_after_mutation/test.py new file mode 100644 index 000000000000..3f2063bdddb0 --- /dev/null +++ b/dbms/tests/integration/test_version_update_after_mutation/test.py @@ -0,0 +1,74 @@ +import pytest +import time + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) +node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) +node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True) + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + + yield cluster + finally: + cluster.shutdown() + + +def test_mutate_and_upgrade(start_cluster): + for node in [node1, node2]: + node.query("CREATE TABLE mt (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple()".format(node.name)) + + node1.query("INSERT INTO mt VALUES ('2020-02-13', 1), ('2020-02-13', 2);") + + node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"}) + node2.query("SYSTEM SYNC REPLICA mt", timeout=5) + + node1.restart_with_latest_version() + node2.restart_with_latest_version() + + node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);") + + node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "2\n" + assert node2.query("SELECT COUNT() FROM mt") == "2\n" + + node1.query("INSERT INTO mt VALUES ('2020-02-13', 4);") + + node2.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "3\n" + assert node2.query("SELECT COUNT() FROM mt") == "3\n" + + node2.query("ALTER TABLE mt DELETE WHERE id = 3", settings={"mutations_sync": "2"}) + + node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + + assert node1.query("SELECT COUNT() FROM mt") == "2\n" + assert node2.query("SELECT COUNT() FROM mt") == "2\n" + + node1.query("ALTER TABLE mt MODIFY COLUMN id String DEFAULT '0'", settings={"replication_alter_partitions_sync": "2"}) + + node2.query("OPTIMIZE TABLE mt FINAL") + + assert node1.query("SELECT id FROM mt") == "1\n4\n" + assert node2.query("SELECT id FROM mt") == "1\n4\n" + + +def test_upgrade_while_mutation(start_cluster): + node3.query("CREATE TABLE mt1 (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', 'node3') ORDER BY tuple()") + + node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)") + + node3.query("SYSTEM STOP MERGES") + node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0") + + node3.restart_with_latest_version() + + assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")