From ddbbb698b7f202cf753fdca3401014de2ff08fdd Mon Sep 17 00:00:00 2001 From: reshke Date: Thu, 7 Mar 2024 14:53:27 +0500 Subject: [PATCH] Refactor prepared statement with tx pool frontend routines. (#587) * Refac * fix * FIX --- sources/frontend.c | 174 ++++++++++++--------------------------------- 1 file changed, 44 insertions(+), 130 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 7bba5a609..39c8ba164 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -959,43 +959,37 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance, // 8 hex #define OD_HASH_LEN 9 -static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, - int opname_start_offset, - int operator_name_len, - od_hash_t body_hash) +static inline machine_msg_t * +od_frontend_rewrite_msg(char *data, int size, int opname_start_offset, + int operator_name_len, char *opname, int opnamelen) { machine_msg_t *msg = - machine_msg_create(size - operator_name_len + OD_HASH_LEN); + machine_msg_create(size - operator_name_len + opnamelen); char *rewrite_data = machine_msg_data(msg); // packet header memcpy(rewrite_data, data, opname_start_offset); // prefix for opname - od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x", - body_hash); + od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname); // rest of msg - memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN, + memcpy(rewrite_data + opname_start_offset + opnamelen, data + opname_start_offset + operator_name_len, size - opname_start_offset - operator_name_len); // set proper size to package kiwi_header_set_size((kiwi_header_t *)rewrite_data, - size - operator_name_len + OD_HASH_LEN); + size - operator_name_len + opnamelen); return msg; } static od_frontend_status_t od_frontend_deploy_prepared_stmt( - od_server_t *server, od_relay_t *relay, char *ctx, - machine_msg_t *msg /* to adcance or to write? */ -) + od_server_t *server, od_relay_t *relay, char *ctx, char *data, + int size /* to adcance or to write? */, od_hash_t body_hash, + char *opname, int opnamelen) { od_route_t *route = server->route; od_instance_t *instance = server->global->instance; od_client_t *client = server->client; - char *data = machine_msg_data(msg); - int size = machine_msg_size(msg); - - od_hash_t body_hash = od_murmur_hash(data, size); od_hashmap_elt_t desc; desc.data = data; @@ -1004,9 +998,6 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt( od_debug(&instance->logger, ctx, client, server, "statement: %.*s, hash: %08x", desc.len, desc.data, body_hash); - char opname[OD_HASH_LEN]; - od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - int refcnt = 0; od_hashmap_elt_t value; value.data = &refcnt; @@ -1017,16 +1008,16 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt( if (od_hashmap_insert(server->prep_stmts, body_hash, &desc, &value_ptr) == 0) { od_debug(&instance->logger, ctx, client, server, - "deploy %.*s operator %s to server", desc.len, - desc.data, opname); + "deploy %.*s operator %.*s to server", desc.len, + desc.data, opnamelen, opname); // rewrite msg // allocate prepered statement under name equal to body hash od_stat_parse(&route->stats); machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc.data, desc.len); + pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen, + desc.data, desc.len); if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1057,6 +1048,21 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt( } } +static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg( + od_server_t *server, od_relay_t *relay, char *ctx, + machine_msg_t *msg /* to adcance or to write? */ +) +{ + char *data = machine_msg_data(msg); + int size = machine_msg_size(msg); + + od_hash_t body_hash = od_murmur_hash(data, size); + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size, + body_hash, opname, OD_HASH_LEN); +} + static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, char *data, int size) { @@ -1140,63 +1146,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hash_t body_hash = od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite describe", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - char opname[OD_HASH_LEN]; od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - int refcnt = 0; - od_hashmap_elt_t value; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - - // send parse msg if needed - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before describe", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - - machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(pmsg), - machine_msg_size(pmsg)); - } - - od_stat_parse(&route->stats); - // msg deallocated here - - machine_iov_add(relay->iov, pmsg); - od_dbg_printf_on_dvl_lvl( - 1, "client relay %p advance msg %c\n", - relay, *(char *)machine_msg_data(pmsg)); - - } else { - int *refcnt; - refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; - - od_stat_parse_reuse(&route->stats); + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { + return OD_ESERVER_WRITE; } machine_msg_t *msg; @@ -1316,59 +1274,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hash_t body_hash = od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite bind", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - - od_hashmap_elt_t value; - int refcnt = 1; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - char opname[OD_HASH_LEN]; od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before bind", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - machine_msg_t *pmsg; - pmsg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(pmsg), - machine_msg_size(pmsg)); - } - - od_stat_parse(&route->stats); - machine_iov_add(relay->iov, pmsg); - - od_dbg_printf_on_dvl_lvl( - 1, "client relay %p advance msg %c\n", - relay, *(char *)machine_msg_data(pmsg)); - - } else { - int *refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; - od_stat_parse_reuse(&route->stats); + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { + return OD_ESERVER_WRITE; } int opname_start_offset = @@ -1380,8 +1294,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, - operator_name_len, - body_hash); + operator_name_len, opname, + OD_HASH_LEN); if (msg == NULL) { return OD_ESERVER_WRITE; @@ -1784,7 +1698,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) assert(server->parse_msg != NULL); /* fill internals structs in */ - if (od_frontend_deploy_prepared_stmt( + if (od_frontend_deploy_prepared_stmt_msg( server, &server->relay, "sync-point", server->parse_msg) != OD_OK) { status = OD_ESERVER_WRITE;