Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor prepared statement with tx pool frontend routines. #587

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 44 additions & 130 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -959,43 +959,37 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance,
// 8 hex
#define OD_HASH_LEN 9

static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size,
int opname_start_offset,
int operator_name_len,
od_hash_t body_hash)
static inline machine_msg_t *
od_frontend_rewrite_msg(char *data, int size, int opname_start_offset,
int operator_name_len, char *opname, int opnamelen)
{
machine_msg_t *msg =
machine_msg_create(size - operator_name_len + OD_HASH_LEN);
machine_msg_create(size - operator_name_len + opnamelen);
char *rewrite_data = machine_msg_data(msg);

// packet header
memcpy(rewrite_data, data, opname_start_offset);
// prefix for opname
od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x",
body_hash);
od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname);
// rest of msg
memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN,
memcpy(rewrite_data + opname_start_offset + opnamelen,
data + opname_start_offset + operator_name_len,
size - opname_start_offset - operator_name_len);
// set proper size to package
kiwi_header_set_size((kiwi_header_t *)rewrite_data,
size - operator_name_len + OD_HASH_LEN);
size - operator_name_len + opnamelen);

return msg;
}

static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
)
od_server_t *server, od_relay_t *relay, char *ctx, char *data,
int size /* to adcance or to write? */, od_hash_t body_hash,
char *opname, int opnamelen)
{
od_route_t *route = server->route;
od_instance_t *instance = server->global->instance;
od_client_t *client = server->client;
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);

od_hash_t body_hash = od_murmur_hash(data, size);

od_hashmap_elt_t desc;
desc.data = data;
Expand All @@ -1004,9 +998,6 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_debug(&instance->logger, ctx, client, server,
"statement: %.*s, hash: %08x", desc.len, desc.data, body_hash);

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
Expand All @@ -1017,16 +1008,16 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
if (od_hashmap_insert(server->prep_stmts, body_hash, &desc,
&value_ptr) == 0) {
od_debug(&instance->logger, ctx, client, server,
"deploy %.*s operator %s to server", desc.len,
desc.data, opname);
"deploy %.*s operator %.*s to server", desc.len,
desc.data, opnamelen, opname);
// rewrite msg
// allocate prepered statement under name equal to body hash

od_stat_parse(&route->stats);

machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc.data, desc.len);
pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen,
desc.data, desc.len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
Expand Down Expand Up @@ -1057,6 +1048,21 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
}
}

static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

advance

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who are your talking to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

)
{
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);

od_hash_t body_hash = od_murmur_hash(data, size);
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size,
body_hash, opname, OD_HASH_LEN);
}

static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
char *data, int size)
{
Expand Down Expand Up @@ -1140,63 +1146,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,

od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);

od_debug(&instance->logger, "rewrite describe", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;

// send parse msg if needed
if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before describe", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash

machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
// msg deallocated here

machine_iov_add(relay->iov, pmsg);
od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));

} else {
int *refcnt;
refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;

od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}

machine_msg_t *msg;
Expand Down Expand Up @@ -1316,59 +1274,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,

od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);

od_debug(&instance->logger, "rewrite bind", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);

od_hashmap_elt_t value;
int refcnt = 1;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;

char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);

if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before bind", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);

if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
machine_iov_add(relay->iov, pmsg);

od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));

} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}

int opname_start_offset =
Expand All @@ -1380,8 +1294,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_t *msg;
msg = od_frontend_rewrite_msg(data, size,
opname_start_offset,
operator_name_len,
body_hash);
operator_name_len, opname,
OD_HASH_LEN);

if (msg == NULL) {
return OD_ESERVER_WRITE;
Expand Down Expand Up @@ -1784,7 +1698,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
assert(server->parse_msg != NULL);

/* fill internals structs in */
if (od_frontend_deploy_prepared_stmt(
if (od_frontend_deploy_prepared_stmt_msg(
server, &server->relay, "sync-point",
server->parse_msg) != OD_OK) {
status = OD_ESERVER_WRITE;
Expand Down
Loading