Skip to content

Commit

Permalink
Fix compatibility in replication
Browse files Browse the repository at this point in the history
  • Loading branch information
alesapin committed Mar 13, 2020
1 parent 23c617c commit 18bbe16
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
8 changes: 5 additions & 3 deletions dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace ErrorCodes

void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
{
out << "format version: 4\n"
out << "format version: 5\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << '\n'
<< "block_id: " << escape << block_id << '\n';
Expand Down Expand Up @@ -108,7 +108,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)

in >> "format version: " >> format_version >> "\n";

if (format_version < 1 || format_version > 4)
if (format_version < 1 || format_version > 5)
throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);

if (format_version >= 2)
Expand Down Expand Up @@ -177,7 +177,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
>> "to\n"
>> new_part_name;
source_parts.push_back(source_part);
in >> "\nalter_version\n" >> alter_version;

if (format_version >= 5)
in >> "\nalter_version\n" >> alter_version;
}
else if (type_str == "alter")
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>


namespace DB
Expand Down Expand Up @@ -52,7 +53,8 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)

in >> "commands: ";
commands.readText(in);
in >> "\nalter version: " >> alter_version;
if (checkString("\nalter version: ", in))
in >> alter_version;
}

String ReplicatedMergeTreeMutationEntry::toString() const
Expand Down
Empty file.
74 changes: 74 additions & 0 deletions dbms/tests/integration/test_version_update_after_mutation/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest
import time

from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)

@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()

yield cluster
finally:
cluster.shutdown()


def test_mutate_and_upgrade(start_cluster):
for node in [node1, node2]:
node.query("CREATE TABLE mt (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple()".format(node.name))

node1.query("INSERT INTO mt VALUES ('2020-02-13', 1), ('2020-02-13', 2);")

node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)

node1.restart_with_latest_version()
node2.restart_with_latest_version()

node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")

node1.query("SYSTEM SYNC REPLICA mt", timeout=5)

assert node1.query("SELECT COUNT() FROM mt") == "2\n"
assert node2.query("SELECT COUNT() FROM mt") == "2\n"

node1.query("INSERT INTO mt VALUES ('2020-02-13', 4);")

node2.query("SYSTEM SYNC REPLICA mt", timeout=5)

assert node1.query("SELECT COUNT() FROM mt") == "3\n"
assert node2.query("SELECT COUNT() FROM mt") == "3\n"

node2.query("ALTER TABLE mt DELETE WHERE id = 3", settings={"mutations_sync": "2"})

node1.query("SYSTEM SYNC REPLICA mt", timeout=5)

assert node1.query("SELECT COUNT() FROM mt") == "2\n"
assert node2.query("SELECT COUNT() FROM mt") == "2\n"

node1.query("ALTER TABLE mt MODIFY COLUMN id String DEFAULT '0'", settings={"replication_alter_partitions_sync": "2"})

node2.query("OPTIMIZE TABLE mt FINAL")

assert node1.query("SELECT id FROM mt") == "1\n4\n"
assert node2.query("SELECT id FROM mt") == "1\n4\n"


def test_upgrade_while_mutation(start_cluster):
node3.query("CREATE TABLE mt1 (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', 'node3') ORDER BY tuple()")

node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)")

node3.query("SYSTEM STOP MERGES")
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")

node3.restart_with_latest_version()

assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")

0 comments on commit 18bbe16

Please sign in to comment.