Skip to content

Commit

Permalink
Optimize tx pool prepared statement feature. Lazy allocate backend pr…
Browse files Browse the repository at this point in the history
…ep stmt (#581)
  • Loading branch information
reshke authored Feb 28, 2024
1 parent dc2122a commit ed8ca61
Showing 1 changed file with 7 additions and 103 deletions.
110 changes: 7 additions & 103 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
}

od_stat_parse(&route->stats);
// msg deallocated here
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger, "describe",
Expand All @@ -1113,10 +1114,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_io_error(&server->io));
return OD_ESERVER_WRITE;
}

} else {
int *refcnt;
refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;

od_stat_parse_reuse(&route->stats);
}

msg = kiwi_fe_write_describe(NULL, 'S', opname,
Expand Down Expand Up @@ -1194,113 +1198,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
if (value_ptr->len != desc.description_len ||
strncmp(desc.description, value_ptr->data,
value_ptr->len) != 0) {
value_ptr->len = desc.description_len;
value_ptr->data = desc.description;

/* redeploy
* previous
/*
* Raise error:
* client allocated prepared stmt with same name
*/
char buf[OD_HASH_LEN];
od_snprintf(buf, OD_HASH_LEN, "%08x",
body_hash);

msg = kiwi_fe_write_close(
NULL, 'S', buf, OD_HASH_LEN);
if (msg == NULL) {
return OD_ESERVER_WRITE;
}
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger,
"parse", NULL, server,
"write error: %s",
od_io_error(
&server->io));
return OD_ESERVER_WRITE;
}
msg = kiwi_fe_write_parse_description(
NULL, buf, OD_HASH_LEN,
desc.description,
desc.description_len);
if (msg == NULL) {
return OD_ESERVER_WRITE;
}
} else {
char buf[OD_HASH_LEN];
od_snprintf(buf, OD_HASH_LEN, "%08x",
body_hash);
msg = kiwi_fe_write_parse_description(
NULL, buf, OD_HASH_LEN,
desc.description,
desc.description_len);
if (msg == NULL) {
return OD_ESERVER_WRITE;
}
}
} else {
char buf[OD_HASH_LEN];
od_snprintf(buf, OD_HASH_LEN, "%08x",
body_hash);
msg = kiwi_fe_write_parse_description(
NULL, buf, OD_HASH_LEN,
desc.description, desc.description_len);
if (msg == NULL) {
return OD_ESERVER_WRITE;
}
}

key.len = desc.description_len;
key.data = desc.description;

int refcnt = 0;
value.data = &refcnt;
value.len = sizeof(int);

value_ptr = &value;

if (od_hashmap_insert(server->prep_stmts, body_hash,
&key, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse initial deploy", client,
server,
"deploy %.*s operator hash %u to server",
key.len, key.data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash

if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(msg),
machine_msg_size(msg));
}

// stat backend parse msg
od_stat_parse(&route->stats);
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger, "parse",
NULL, server,
"write error: %s",
od_io_error(&server->io));
return OD_ESERVER_WRITE;
}
} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;

if (instance->config.log_query ||
route->rule->log_query) {
od_stat_parse_reuse(&route->stats);
od_log(&instance->logger, "parse",
client, server,
"stmt already exists, simply report its ok");
}
machine_msg_free(msg);
}

machine_msg_t *pmsg;
Expand Down Expand Up @@ -1418,6 +1321,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
od_stat_parse_reuse(&route->stats);
}

msg = od_frontend_rewrite_msg(data, size,
Expand Down

0 comments on commit ed8ca61

Please sign in to comment.