Skip to content

Commit

Permalink
Refactor prepared statement with tx pool frontend routines. (#587)
Browse files Browse the repository at this point in the history
* Refac

* fix

* FIX
  • Loading branch information
reshke authored Mar 7, 2024
1 parent ff5b656 commit ddbbb69
Showing 1 changed file with 44 additions and 130 deletions.
174 changes: 44 additions & 130 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -959,43 +959,37 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance,
// 8 hex
#define OD_HASH_LEN 9

static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size,
int opname_start_offset,
int operator_name_len,
od_hash_t body_hash)
static inline machine_msg_t *
od_frontend_rewrite_msg(char *data, int size, int opname_start_offset,
int operator_name_len, char *opname, int opnamelen)
{
machine_msg_t *msg =
machine_msg_create(size - operator_name_len + OD_HASH_LEN);
machine_msg_create(size - operator_name_len + opnamelen);
char *rewrite_data = machine_msg_data(msg);

// packet header
memcpy(rewrite_data, data, opname_start_offset);
// prefix for opname
od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x",
body_hash);
od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname);
// rest of msg
memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN,
memcpy(rewrite_data + opname_start_offset + opnamelen,
data + opname_start_offset + operator_name_len,
size - opname_start_offset - operator_name_len);
// set proper size to package
kiwi_header_set_size((kiwi_header_t *)rewrite_data,
size - operator_name_len + OD_HASH_LEN);
size - operator_name_len + opnamelen);

return msg;
}

static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
)
od_server_t *server, od_relay_t *relay, char *ctx, char *data,
int size /* to adcance or to write? */, od_hash_t body_hash,
char *opname, int opnamelen)
{
od_route_t *route = server->route;
od_instance_t *instance = server->global->instance;
od_client_t *client = server->client;
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);

od_hash_t body_hash = od_murmur_hash(data, size);

od_hashmap_elt_t desc;
desc.data = data;
Expand All @@ -1004,9 +998,6 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_debug(&instance->logger, ctx, client, server,
"statement: %.*s, hash: %08x", desc.len, desc.data, body_hash);

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
Expand All @@ -1017,16 +1008,16 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
if (od_hashmap_insert(server->prep_stmts, body_hash, &desc,
&value_ptr) == 0) {
od_debug(&instance->logger, ctx, client, server,
"deploy %.*s operator %s to server", desc.len,
desc.data, opname);
"deploy %.*s operator %.*s to server", desc.len,
desc.data, opnamelen, opname);
// rewrite msg
// allocate prepered statement under name equal to body hash

od_stat_parse(&route->stats);

machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc.data, desc.len);
pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen,
desc.data, desc.len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
Expand Down Expand Up @@ -1057,6 +1048,21 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
}
}

static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
)
{
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);

od_hash_t body_hash = od_murmur_hash(data, size);
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size,
body_hash, opname, OD_HASH_LEN);
}

static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
char *data, int size)
{
Expand Down Expand Up @@ -1140,63 +1146,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,

od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);

od_debug(&instance->logger, "rewrite describe", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;

// send parse msg if needed
if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before describe", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash

machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
// msg deallocated here

machine_iov_add(relay->iov, pmsg);
od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));

} else {
int *refcnt;
refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;

od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}

machine_msg_t *msg;
Expand Down Expand Up @@ -1316,59 +1274,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,

od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);

od_debug(&instance->logger, "rewrite bind", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);

od_hashmap_elt_t value;
int refcnt = 1;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before bind", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);

if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
machine_iov_add(relay->iov, pmsg);

od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));

} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}

int opname_start_offset =
Expand All @@ -1380,8 +1294,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_t *msg;
msg = od_frontend_rewrite_msg(data, size,
opname_start_offset,
operator_name_len,
body_hash);
operator_name_len, opname,
OD_HASH_LEN);

if (msg == NULL) {
return OD_ESERVER_WRITE;
Expand Down Expand Up @@ -1784,7 +1698,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
assert(server->parse_msg != NULL);

/* fill internals structs in */
if (od_frontend_deploy_prepared_stmt(
if (od_frontend_deploy_prepared_stmt_msg(
server, &server->relay, "sync-point",
server->parse_msg) != OD_OK) {
status = OD_ESERVER_WRITE;
Expand Down

0 comments on commit ddbbb69

Please sign in to comment.