Skip to content

Commit

Permalink
POC fix server-client sync in tx prepared stmt mode
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Feb 20, 2024
1 parent 4e26ef5 commit 3368b08
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 86 deletions.
138 changes: 63 additions & 75 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,

kiwi_be_type_t type = *data;
if (instance->config.log_debug)
od_debug(&instance->logger, "main", client, server, "%s",
od_debug(&instance->logger, "main", client, server, "server recieved %s",
kiwi_be_type_to_string(type));

int is_deploy = od_server_in_deploy(server);
Expand Down Expand Up @@ -996,8 +996,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
"%s", kiwi_fe_type_to_string(type));

od_frontend_status_t retstatus = OD_OK;
machine_msg_t *msg;
msg = NULL;
bool forwarded = 0;
switch (type) {
case KIWI_FE_COPY_DONE:
Expand All @@ -1008,11 +1006,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
case KIWI_FE_QUERY:
if (instance->config.log_query || route->rule->log_query)
od_frontend_log_query(instance, client, data, size);
retstatus = OD_WAIT_SYNC;
/* update server sync state */
od_server_sync_request(server, 1);
break;
case KIWI_FE_FUNCTION_CALL:
case KIWI_FE_SYNC:
retstatus = OD_WAIT_SYNC;
/* update server sync state */
od_server_sync_request(server, 1);
break;
Expand Down Expand Up @@ -1088,10 +1088,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
// rewrite msg
// allocate prepered statement under name equal to body hash

msg = kiwi_fe_write_parse_description(
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);
if (msg == NULL) {
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

Expand All @@ -1100,17 +1101,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(msg),
machine_msg_size(msg));
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger, "describe",
NULL, server,
"write error: %s",
od_io_error(&server->io));
rc = machine_iov_add(relay->iov, pmsg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
} else {
Expand All @@ -1119,6 +1116,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
*refcnt = 1 + *refcnt;
}

machine_msg_t *msg;
msg = kiwi_fe_write_describe(NULL, 'S', opname,
OD_HASH_LEN);

Expand All @@ -1133,13 +1131,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_size(msg));
}

// msg if deallocated automaictly
rc = od_write(&server->io, msg);
forwarded = 1;
if (rc == -1) {
od_error(&instance->logger, "describe", NULL,
server, "write error: %s",
od_io_error(&server->io));
rc = machine_iov_add(relay->iov, msg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
}
Expand Down Expand Up @@ -1188,6 +1181,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
size - opname_start_offset -
desc.operator_name_len);

machine_msg_t *msg;
msg = NULL;

assert(client->prep_stmt_ids);
if (od_hashmap_insert(client->prep_stmt_ids, keyhash,
&key, &value_ptr)) {
Expand All @@ -1205,18 +1201,14 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_snprintf(buf, OD_HASH_LEN, "%08x",
body_hash);

msg = kiwi_fe_write_close(
machine_msg_t *cmsg;
cmsg = kiwi_fe_write_close(
NULL, 'S', buf, OD_HASH_LEN);
if (msg == NULL) {
if (cmsg == NULL) {
return OD_ESERVER_WRITE;
}
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger,
"parse", NULL, server,
"write error: %s",
od_io_error(
&server->io));
rc = machine_iov_add(relay->iov, cmsg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
msg = kiwi_fe_write_parse_description(
Expand Down Expand Up @@ -1281,12 +1273,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,

// stat backend parse msg
od_stat_parse(&route->stats);
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger, "parse",
NULL, server,
"write error: %s",
od_io_error(&server->io));
rc = machine_iov_add(relay->iov, msg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
} else {
Expand All @@ -1308,15 +1296,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
rc = od_write(&client->io, pmsg);
forwarded = 1;

if (rc == -1) {
od_error(&instance->logger, "parse", client,
NULL, "write error: %s",
od_io_error(&client->io));
rc = machine_iov_add(client->server->relay.iov, pmsg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
forwarded = 1;
}
break;
case KIWI_FE_BIND:
Expand Down Expand Up @@ -1389,11 +1373,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
// rewrite msg
// allocate prepered statement under name equal to body hash

msg = kiwi_fe_write_parse_description(
machine_msg_t *pmsg;

pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);

if (msg == NULL) {
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}

Expand All @@ -1402,24 +1388,23 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(msg),
machine_msg_size(msg));
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}

od_stat_parse(&route->stats);
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger,
"rewrite parse", NULL, server,
"write error: %s",
od_io_error(&server->io));

rc = machine_iov_add(relay->iov, pmsg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
}


machine_msg_t *msg;
msg = od_frontend_rewrite_msg(data, size,
opname_start_offset,
operator_name_len,
Expand All @@ -1437,15 +1422,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_size(msg));
}

rc = od_write(&server->io, msg);
forwarded = 1;

if (rc == -1) {
od_error(&instance->logger, "rewrite bind",
NULL, server, "write error: %s",
od_io_error(&server->io));
rc = machine_iov_add(relay->iov, msg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
forwarded = 1;
}
break;
case KIWI_FE_EXECUTE:
Expand Down Expand Up @@ -1477,13 +1458,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_t *pmsg;
pmsg = kiwi_be_write_close_complete(NULL);

rc = od_write(&client->io, pmsg);
if (rc == -1) {
od_error(&instance->logger,
"close report", NULL, server,
"write error: %s",
od_io_error(&server->io));
return OD_ECLIENT_WRITE;
rc = machine_iov_add(relay->iov, pmsg);
if (rc != 0) {
return OD_ESERVER_WRITE;
}
}

Expand Down Expand Up @@ -1514,11 +1491,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
}

/* If the retstatus is not SKIP */
if (route->rule->pool->reserve_prepared_statement && forwarded != 1 &&
msg != NULL) {
msg = kiwi_fe_copy_msg(msg, data, size);
od_write(&server->io, msg);
retstatus = OD_SKIP;
if (route->rule->pool->reserve_prepared_statement && forwarded != 1) {
// machine_iov_add_pointer(client->server->relay.iov, data, size);
// retstatus = OD_SKIP;
}
/* update server stats */
od_stat_query_start(&server->stats_state);
Expand Down Expand Up @@ -1596,9 +1571,9 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client,
}

static inline od_frontend_status_t
od_frontend_remote_process_server(od_server_t *server, od_client_t *client)
od_frontend_remote_process_server(od_server_t *server, od_client_t *client, int waitread)
{
od_frontend_status_t status = od_relay_step(&server->relay);
od_frontend_status_t status = od_relay_step(&server->relay, waitread, 1);
int rc;
od_instance_t *instance = client->global->instance;

Expand Down Expand Up @@ -1756,7 +1731,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
server = client->server;

/* attach */
status = od_relay_step(&client->relay);
status = od_relay_step(&client->relay, 0, 0);
if (status == OD_ATTACH) {
/* Check for replication lag and reject query if too big */
od_frontend_status_t catchup_status =
Expand Down Expand Up @@ -1785,14 +1760,26 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)

/* retry read operation after attach */
continue;
} else if (status == OD_WAIT_SYNC) {
// while (machine_cond_wait_no_change(server->relay.src->on_read, 60000) != 0) {
// machine_sleep(1);
// // wtf?
// }
} else if (status != OD_OK) {
break;
}

if (server == NULL)
continue;

status = od_frontend_remote_process_server(server, client);
if (status == OD_WAIT_SYNC) {
while (!od_server_synchronized(server)) {
status = od_frontend_remote_process_server(server, client, 1);
}
} else {
status = od_frontend_remote_process_server(server, client, 0);
}

if (status != OD_OK) {
break;
}
Expand Down Expand Up @@ -1843,6 +1830,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context,
case OD_STOP:
/* fallthrough */
case OD_OK:
case OD_WAIT_SYNC:
/* graceful disconnect or kill */
if (instance->config.log_session) {
od_log(&instance->logger, context, client, server,
Expand Down
Loading

0 comments on commit 3368b08

Please sign in to comment.