From db83d728d7d546d199d51d63fcd8b56aabe2e036 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Tue, 10 Dec 2024 20:58:37 +0300 Subject: [PATCH] Add column mode in simple_queue --- ydb/tools/simple_queue/__main__.py | 42 ++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/ydb/tools/simple_queue/__main__.py b/ydb/tools/simple_queue/__main__.py index 721b04774c52..b20661aa013e 100644 --- a/ydb/tools/simple_queue/__main__.py +++ b/ydb/tools/simple_queue/__main__.py @@ -76,11 +76,20 @@ def list(cls): ) -def get_table_description(table_name): - return """ +def get_table_description(table_name, mode): + if mode == "row": + store_entry = "STORE = ROW," + ttl_entry = """TTL = Interval("PT240S") ON `timestamp` AS SECONDS,""" + elif mode == "column": + store_entry = "STORE = COLUMN," + ttl_entry = "" + else: + raise RuntimeError("Unkown mode: {}".format(mode)) + + return f""" CREATE TABLE `{table_name}` ( key Uint64 NOT NULL, - `timestamp` Timestamp, -- NOT NULL, -- not working for now + `timestamp` Uint64 NOT NULL, value Utf8 FAMILY lz4_family NOT NULL, PRIMARY KEY (key), FAMILY lz4_family ( @@ -89,7 +98,8 @@ def get_table_description(table_name): INDEX by_timestamp GLOBAL ON (`timestamp`) ) WITH ( - TTL = Interval("PT240S") ON `timestamp`, + {store_entry} + {ttl_entry} AUTO_PARTITIONING_BY_SIZE = ENABLED, AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_PARTITION_SIZE_MB = 128, @@ -100,7 +110,7 @@ def get_table_description(table_name): def timestamp(): - return int(1000 * time.time()) + return int(time.time()) def extract_keys(response): @@ -180,7 +190,7 @@ def print_stats(self): class YdbQueue(object): - def __init__(self, idx, database, stats, driver, pool): + def __init__(self, idx, database, stats, driver, pool, mode): self.working_dir = os.path.join(database, socket.gethostname().split('.')[0].replace('-', '_') + "_" + str(idx)) self.copies_dir = os.path.join(self.working_dir, 'copies') self.table_name = self.table_name_with_timestamp() @@ -195,6 +205,7 @@ def __init__(self, idx, database, stats, driver, pool): self.ops = ydb.BaseRequestSettings().with_operation_timeout(19).with_timeout(20) self.driver.scheme_client.make_directory(self.working_dir) self.driver.scheme_client.make_directory(self.copies_dir) + self.mode = mode print("Working dir %s" % self.working_dir) f = self.prepare_new_queue(self.table_name) f.result() @@ -212,7 +223,7 @@ def table_name_with_timestamp(self, working_dir=None): def prepare_new_queue(self, table_name=None): session = self.pool.acquire() table_name = self.table_name_with_timestamp() if table_name is None else table_name - f = session.async_execute_scheme(get_table_description(table_name), settings=self.ops) + f = session.async_execute_scheme(get_table_description(table_name, self.mode), settings=self.ops) f.add_done_callback(lambda x: self.on_received_response(session, x, 'create')) return f @@ -230,6 +241,12 @@ def on_received_response(self, session, response, event, callback=None): response.result() self.stats.save_event(event) except ydb.Error as e: + debug = False + if debug: + print(event) + print(e) + print() + self.stats.save_event(event, e.status) def send_query(self, query, parameters, event_kind, callback=None): @@ -335,7 +352,7 @@ def write(self): DECLARE $key as Uint64; DECLARE $value as Utf8; DECLARE $timestamp as Uint64; - UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, CAST($timestamp as Timestamp), $value); + UPSERT INTO `{}` (`key`, `timestamp`, `value`) VALUES ($key, $timestamp, $value); """.format(self.table_name), { '$key': ydb.PrimitiveType.Uint64.proto, @@ -429,7 +446,7 @@ def copy_table(self): class Workload(object): - def __init__(self, endpoint, database, duration): + def __init__(self, endpoint, database, duration, mode): self.database = database self.driver = ydb.Driver(ydb.DriverConfig(endpoint, database)) self.pool = ydb.SessionPool(self.driver, size=200) @@ -437,8 +454,10 @@ def __init__(self, endpoint, database, duration): self.duration = duration self.delayed_events = queue.Queue() self.workload_stats = WorkloadStats(*EventKind.list()) + # TODO: run both modes in parallel? + self.mode = mode self.ydb_queues = [ - YdbQueue(idx, database, self.workload_stats, self.driver, self.pool) + YdbQueue(idx, database, self.workload_stats, self.driver, self.pool, self.mode) for idx in range(2) ] @@ -511,7 +530,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): parser.add_argument('--endpoint', default='localhost:2135', help="An endpoint to be used") parser.add_argument('--database', default=None, required=True, help='A database to connect') parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.') + parser.add_argument('--mode', default="row", choices=["row", "column"], help='STORE mode for CREATE TABLE') args = parser.parse_args() - with Workload(args.endpoint, args.database, args.duration) as workload: + with Workload(args.endpoint, args.database, args.duration, args.mode) as workload: for handle in workload.loop(): handle()