From 8d9cbb43bd15ddee0e596565d32140cf126aaa44 Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 20 Feb 2024 18:43:12 +0000 Subject: [PATCH 1/8] POC fix server-client sync in tx prepared stmt mode --- sources/frontend.c | 138 ++++++++---------- sources/relay.h | 46 +++++- sources/server.h | 7 + sources/status.h | 3 + third_party/machinarium/sources/cond.c | 15 +- third_party/machinarium/sources/cond.h | 8 +- third_party/machinarium/sources/machinarium.h | 2 + 7 files changed, 133 insertions(+), 86 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index dbf00e64a..dc2090ca9 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -729,7 +729,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, kiwi_be_type_t type = *data; if (instance->config.log_debug) - od_debug(&instance->logger, "main", client, server, "%s", + od_debug(&instance->logger, "main", client, server, "server recieved %s", kiwi_be_type_to_string(type)); int is_deploy = od_server_in_deploy(server); @@ -996,8 +996,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, "%s", kiwi_fe_type_to_string(type)); od_frontend_status_t retstatus = OD_OK; - machine_msg_t *msg; - msg = NULL; bool forwarded = 0; switch (type) { case KIWI_FE_COPY_DONE: @@ -1008,11 +1006,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, case KIWI_FE_QUERY: if (instance->config.log_query || route->rule->log_query) od_frontend_log_query(instance, client, data, size); + retstatus = OD_WAIT_SYNC; /* update server sync state */ od_server_sync_request(server, 1); break; case KIWI_FE_FUNCTION_CALL: case KIWI_FE_SYNC: + retstatus = OD_WAIT_SYNC; /* update server sync state */ od_server_sync_request(server, 1); break; @@ -1088,10 +1088,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // rewrite msg // allocate prepered statement under name equal to body hash - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1100,17 +1101,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "describe", - NULL, server, - "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1119,6 +1116,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, *refcnt = 1 + *refcnt; } + machine_msg_t *msg; msg = kiwi_fe_write_describe(NULL, 'S', opname, OD_HASH_LEN); @@ -1133,13 +1131,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - // msg if deallocated automaictly - rc = od_write(&server->io, msg); - forwarded = 1; - if (rc == -1) { - od_error(&instance->logger, "describe", NULL, - server, "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } } @@ -1188,6 +1181,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, size - opname_start_offset - desc.operator_name_len); + machine_msg_t *msg; + msg = NULL; + assert(client->prep_stmt_ids); if (od_hashmap_insert(client->prep_stmt_ids, keyhash, &key, &value_ptr)) { @@ -1205,18 +1201,14 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_snprintf(buf, OD_HASH_LEN, "%08x", body_hash); - msg = kiwi_fe_write_close( + machine_msg_t *cmsg; + cmsg = kiwi_fe_write_close( NULL, 'S', buf, OD_HASH_LEN); - if (msg == NULL) { + if (cmsg == NULL) { return OD_ESERVER_WRITE; } - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "parse", NULL, server, - "write error: %s", - od_io_error( - &server->io)); + rc = machine_iov_add(relay->iov, cmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } msg = kiwi_fe_write_parse_description( @@ -1281,12 +1273,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // stat backend parse msg od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "parse", - NULL, server, - "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1308,15 +1296,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, if (pmsg == NULL) { return OD_ESERVER_WRITE; } - rc = od_write(&client->io, pmsg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "parse", client, - NULL, "write error: %s", - od_io_error(&client->io)); + rc = machine_iov_add(client->server->relay.iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } + forwarded = 1; } break; case KIWI_FE_BIND: @@ -1389,11 +1373,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // rewrite msg // allocate prepered statement under name equal to body hash - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1402,17 +1388,14 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "rewrite parse", NULL, server, - "write error: %s", - od_io_error(&server->io)); + + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1420,6 +1403,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, *refcnt = 1 + *refcnt; } + + machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, operator_name_len, @@ -1437,15 +1422,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - rc = od_write(&server->io, msg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "rewrite bind", - NULL, server, "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } + forwarded = 1; } break; case KIWI_FE_EXECUTE: @@ -1477,13 +1458,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_t *pmsg; pmsg = kiwi_be_write_close_complete(NULL); - rc = od_write(&client->io, pmsg); - if (rc == -1) { - od_error(&instance->logger, - "close report", NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ECLIENT_WRITE; + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { + return OD_ESERVER_WRITE; } } @@ -1514,11 +1491,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } /* If the retstatus is not SKIP */ - if (route->rule->pool->reserve_prepared_statement && forwarded != 1 && - msg != NULL) { - msg = kiwi_fe_copy_msg(msg, data, size); - od_write(&server->io, msg); - retstatus = OD_SKIP; + if (route->rule->pool->reserve_prepared_statement && forwarded != 1) { + // machine_iov_add_pointer(client->server->relay.iov, data, size); + // retstatus = OD_SKIP; } /* update server stats */ od_stat_query_start(&server->stats_state); @@ -1596,9 +1571,9 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, } static inline od_frontend_status_t -od_frontend_remote_process_server(od_server_t *server, od_client_t *client) +od_frontend_remote_process_server(od_server_t *server, od_client_t *client, int waitread) { - od_frontend_status_t status = od_relay_step(&server->relay); + od_frontend_status_t status = od_relay_step(&server->relay, waitread, 1); int rc; od_instance_t *instance = client->global->instance; @@ -1756,7 +1731,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) server = client->server; /* attach */ - status = od_relay_step(&client->relay); + status = od_relay_step(&client->relay, 0, 0); if (status == OD_ATTACH) { /* Check for replication lag and reject query if too big */ od_frontend_status_t catchup_status = @@ -1785,6 +1760,11 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) /* retry read operation after attach */ continue; + } else if (status == OD_WAIT_SYNC) { + // while (machine_cond_wait_no_change(server->relay.src->on_read, 60000) != 0) { + // machine_sleep(1); + // // wtf? + // } } else if (status != OD_OK) { break; } @@ -1792,7 +1772,14 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) if (server == NULL) continue; - status = od_frontend_remote_process_server(server, client); + if (status == OD_WAIT_SYNC) { + while (!od_server_synchronized(server)) { + status = od_frontend_remote_process_server(server, client, 1); + } + } else { + status = od_frontend_remote_process_server(server, client, 0); + } + if (status != OD_OK) { break; } @@ -1843,6 +1830,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context, case OD_STOP: /* fallthrough */ case OD_OK: + case OD_WAIT_SYNC: /* graceful disconnect or kill */ if (instance->config.log_session) { od_log(&instance->logger, context, client, server, diff --git a/sources/relay.h b/sources/relay.h index 61c66b1c6..3564c642c 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -168,6 +168,7 @@ static inline od_frontend_status_t od_relay_on_packet_msg(od_relay_t *relay, switch (status) { case OD_OK: + case OD_WAIT_SYNC: /* fallthrough */ case OD_DETACH: rc = machine_iov_add(relay->iov, msg); @@ -193,6 +194,7 @@ static inline od_frontend_status_t od_relay_on_packet(od_relay_t *relay, switch (status) { case OD_OK: + case OD_WAIT_SYNC: /* fallthrough */ case OD_DETACH: rc = machine_iov_add_pointer(relay->iov, data, size); @@ -284,7 +286,7 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) return OD_OK; } -static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) +static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, int stepserv) { char *current = od_readahead_pos_read(&relay->src->readahead); char *end = od_readahead_pos(&relay->src->readahead); @@ -292,6 +294,11 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) int progress; od_frontend_status_t rc; rc = od_relay_process(relay, &progress, current, end - current); + if (stepserv) { + if (rc == OD_WAIT_SYNC) { + rc = OD_OK; + } + } current += progress; od_readahead_pos_read_advance(&relay->src->readahead, progress); if (rc != OD_OK) { @@ -354,11 +361,20 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) return OD_OK; } -static inline od_frontend_status_t od_relay_step(od_relay_t *relay) +static inline od_frontend_status_t od_relay_step(od_relay_t *relay, int waitread, int stepserv) { /* on read event */ - int rc; - if (machine_cond_try(relay->src->on_read)) { + od_frontend_status_t retstatus; + retstatus = OD_OK; + int rc1; + if (waitread) { + rc1 = machine_cond_wait(relay->src->on_read, 10000000) == 0; + } else { + rc1 = machine_cond_try(relay->src->on_read); + } + if (rc1) { + od_frontend_status_t rc; + if (relay->dst == NULL) { /* signal to retry on read logic */ machine_cond_signal(relay->src->on_read); @@ -369,10 +385,23 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) if (rc != OD_OK) return rc; - rc = od_relay_pipeline(relay); + rc = od_relay_pipeline(relay, stepserv); - if (rc != OD_OK) + switch (rc) { + case OD_OK: + break; + case OD_WAIT_SYNC: + if (stepserv) { + break; + } + /* signal to retry on read logic */ + machine_cond_signal(relay->src->on_read); + + retstatus = rc; + break; + default: return rc; + } if (machine_iov_pending(relay->iov)) { /* try to optimize write path and handle it right-away */ @@ -383,10 +412,11 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } if (relay->dst == NULL) - return OD_OK; + return retstatus; /* on write event */ if (machine_cond_try(relay->dst->on_write)) { + int rc; rc = od_relay_write(relay); if (rc != OD_OK) return rc; @@ -408,7 +438,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } } - return OD_OK; + return retstatus; } static inline od_frontend_status_t od_relay_flush(od_relay_t *relay) diff --git a/sources/server.h b/sources/server.h index c5bc5123e..b200141d0 100644 --- a/sources/server.h +++ b/sources/server.h @@ -32,6 +32,8 @@ struct od_server { int deploy_sync; od_stat_state_t stats_state; + machine_cond_t *cond; + uint64_t sync_request; uint64_t sync_reply; @@ -78,6 +80,7 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts) server->deploy_sync = 0; server->sync_request = 0; server->sync_reply = 0; + server->cond = machine_cond_create(); server->init_time_us = machine_time_us(); server->error_connect = NULL; server->offline = 0; @@ -119,6 +122,10 @@ static inline void od_server_free(od_server_t *server) { od_relay_free(&server->relay); od_io_free(&server->io); + + if (server->cond) + machine_cond_free(server->cond); + if (server->prep_stmts) { od_hashmap_free(server->prep_stmts); } diff --git a/sources/status.h b/sources/status.h index ac4f29a66..8ae13c2a3 100644 --- a/sources/status.h +++ b/sources/status.h @@ -16,6 +16,7 @@ typedef enum { OD_WAIT_SYNC, OD_READ_FULL, OD_STOP, + OD_BREAK_PIPE, OD_EOOM, OD_EATTACH, OD_EATTACH_TOO_MANY_CONNECTIONS, @@ -45,6 +46,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status) return "OD_WAIT_SYNC"; case OD_STOP: return "OD_STOP"; + case OD_BREAK_PIPE: + return "OD_BREAK_PIPE"; case OD_EOOM: return "OD_EOOM"; case OD_READ_FULL: diff --git a/third_party/machinarium/sources/cond.c b/third_party/machinarium/sources/cond.c index 2dc5e34a0..a0ce59761 100644 --- a/third_party/machinarium/sources/cond.c +++ b/third_party/machinarium/sources/cond.c @@ -54,5 +54,18 @@ MACHINE_API int machine_cond_wait(machine_cond_t *obj, uint32_t time_ms) mm_errno_set(EINPROGRESS); return -1; } - return mm_cond_wait(cond, time_ms); + return mm_cond_wait(cond, time_ms, 0); +} + + + +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms) +{ + mm_cond_t *cond = mm_cast(mm_cond_t *, obj); + mm_errno_set(0); + if (cond->call.type != MM_CALL_NONE) { + mm_errno_set(EINPROGRESS); + return -1; + } + return mm_cond_wait(cond, time_ms, 1); } diff --git a/third_party/machinarium/sources/cond.h b/third_party/machinarium/sources/cond.h index 029c9cf43..f3e258194 100644 --- a/third_party/machinarium/sources/cond.h +++ b/third_party/machinarium/sources/cond.h @@ -41,15 +41,19 @@ static inline int mm_cond_try(mm_cond_t *cond) return signal; } -static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms) +static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms, int nochange) { if (cond->signal) { - cond->signal = 0; + if (!nochange) { + cond->signal = 0; + } return 0; } mm_call(&cond->call, MM_CALL_COND, time_ms); if (cond->call.status != 0) return -1; + + cond->signal = 0; return 0; } diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index abf9ecd17..5c3c3bf67 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -123,6 +123,8 @@ MACHINE_API int machine_cond_try(machine_cond_t *); MACHINE_API int machine_cond_wait(machine_cond_t *, uint32_t time_ms); +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms); + /* msg */ MACHINE_API machine_msg_t *machine_msg_create(int reserve); From 4b84b3fff40d2c23f802910c1eb838e3d9df4026 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 06:53:16 +0000 Subject: [PATCH 2/8] fix --- sources/frontend.c | 17 ++++++++++------- sources/relay.h | 6 ++++-- third_party/machinarium/sources/cond.c | 5 ++--- third_party/machinarium/sources/machinarium.h | 3 ++- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index dc2090ca9..fd117bfa3 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -729,8 +729,8 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, kiwi_be_type_t type = *data; if (instance->config.log_debug) - od_debug(&instance->logger, "main", client, server, "server recieved %s", - kiwi_be_type_to_string(type)); + od_debug(&instance->logger, "main", client, server, + "server recieved %s", kiwi_be_type_to_string(type)); int is_deploy = od_server_in_deploy(server); int is_ready_for_query = 0; @@ -1403,7 +1403,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, *refcnt = 1 + *refcnt; } - machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, @@ -1571,9 +1570,11 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, } static inline od_frontend_status_t -od_frontend_remote_process_server(od_server_t *server, od_client_t *client, int waitread) +od_frontend_remote_process_server(od_server_t *server, od_client_t *client, + int waitread) { - od_frontend_status_t status = od_relay_step(&server->relay, waitread, 1); + od_frontend_status_t status = + od_relay_step(&server->relay, waitread, 1); int rc; od_instance_t *instance = client->global->instance; @@ -1774,10 +1775,12 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) if (status == OD_WAIT_SYNC) { while (!od_server_synchronized(server)) { - status = od_frontend_remote_process_server(server, client, 1); + status = od_frontend_remote_process_server( + server, client, 1); } } else { - status = od_frontend_remote_process_server(server, client, 0); + status = od_frontend_remote_process_server(server, + client, 0); } if (status != OD_OK) { diff --git a/sources/relay.h b/sources/relay.h index 3564c642c..3199756fa 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -286,7 +286,8 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) return OD_OK; } -static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, int stepserv) +static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, + int stepserv) { char *current = od_readahead_pos_read(&relay->src->readahead); char *end = od_readahead_pos(&relay->src->readahead); @@ -361,7 +362,8 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) return OD_OK; } -static inline od_frontend_status_t od_relay_step(od_relay_t *relay, int waitread, int stepserv) +static inline od_frontend_status_t od_relay_step(od_relay_t *relay, + int waitread, int stepserv) { /* on read event */ od_frontend_status_t retstatus; diff --git a/third_party/machinarium/sources/cond.c b/third_party/machinarium/sources/cond.c index a0ce59761..93a91c13f 100644 --- a/third_party/machinarium/sources/cond.c +++ b/third_party/machinarium/sources/cond.c @@ -57,9 +57,8 @@ MACHINE_API int machine_cond_wait(machine_cond_t *obj, uint32_t time_ms) return mm_cond_wait(cond, time_ms, 0); } - - -MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms) +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, + uint32_t time_ms) { mm_cond_t *cond = mm_cast(mm_cond_t *, obj); mm_errno_set(0); diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index 5c3c3bf67..023331f38 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -123,7 +123,8 @@ MACHINE_API int machine_cond_try(machine_cond_t *); MACHINE_API int machine_cond_wait(machine_cond_t *, uint32_t time_ms); -MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms); +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, + uint32_t time_ms); /* msg */ From adb51147d4c8ae52fb40b418798cfb37ecc81cf2 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 07:44:32 +0000 Subject: [PATCH 3/8] remove bullshit --- sources/frontend.c | 11 ++++++++--- sources/relay.h | 17 ++++------------- third_party/machinarium/sources/cond.c | 14 +------------- third_party/machinarium/sources/cond.h | 7 ++----- third_party/machinarium/sources/machinarium.h | 3 --- 5 files changed, 15 insertions(+), 37 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index fd117bfa3..0c45d3d78 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1573,8 +1573,7 @@ static inline od_frontend_status_t od_frontend_remote_process_server(od_server_t *server, od_client_t *client, int waitread) { - od_frontend_status_t status = - od_relay_step(&server->relay, waitread, 1); + od_frontend_status_t status = od_relay_step(&server->relay, waitread); int rc; od_instance_t *instance = client->global->instance; @@ -1732,7 +1731,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) server = client->server; /* attach */ - status = od_relay_step(&client->relay, 0, 0); + status = od_relay_step(&client->relay, 0); if (status == OD_ATTACH) { /* Check for replication lag and reject query if too big */ od_frontend_status_t catchup_status = @@ -1777,6 +1776,12 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) while (!od_server_synchronized(server)) { status = od_frontend_remote_process_server( server, client, 1); + // OD_ATTACH should not happen. + // OD_DETACH only when od_server_synchronized is true + if (status != OD_OK && status != OD_WAIT_SYNC && + status != OD_DETACH) { + break; + } } } else { status = od_frontend_remote_process_server(server, diff --git a/sources/relay.h b/sources/relay.h index 3199756fa..56fcdb858 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -286,8 +286,7 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) return OD_OK; } -static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, - int stepserv) +static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) { char *current = od_readahead_pos_read(&relay->src->readahead); char *end = od_readahead_pos(&relay->src->readahead); @@ -295,11 +294,6 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, int progress; od_frontend_status_t rc; rc = od_relay_process(relay, &progress, current, end - current); - if (stepserv) { - if (rc == OD_WAIT_SYNC) { - rc = OD_OK; - } - } current += progress; od_readahead_pos_read_advance(&relay->src->readahead, progress); if (rc != OD_OK) { @@ -363,14 +357,14 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) } static inline od_frontend_status_t od_relay_step(od_relay_t *relay, - int waitread, int stepserv) + int waitread) { /* on read event */ od_frontend_status_t retstatus; retstatus = OD_OK; int rc1; if (waitread) { - rc1 = machine_cond_wait(relay->src->on_read, 10000000) == 0; + rc1 = machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0; } else { rc1 = machine_cond_try(relay->src->on_read); } @@ -387,15 +381,12 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay, if (rc != OD_OK) return rc; - rc = od_relay_pipeline(relay, stepserv); + rc = od_relay_pipeline(relay); switch (rc) { case OD_OK: break; case OD_WAIT_SYNC: - if (stepserv) { - break; - } /* signal to retry on read logic */ machine_cond_signal(relay->src->on_read); diff --git a/third_party/machinarium/sources/cond.c b/third_party/machinarium/sources/cond.c index 93a91c13f..2dc5e34a0 100644 --- a/third_party/machinarium/sources/cond.c +++ b/third_party/machinarium/sources/cond.c @@ -54,17 +54,5 @@ MACHINE_API int machine_cond_wait(machine_cond_t *obj, uint32_t time_ms) mm_errno_set(EINPROGRESS); return -1; } - return mm_cond_wait(cond, time_ms, 0); -} - -MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, - uint32_t time_ms) -{ - mm_cond_t *cond = mm_cast(mm_cond_t *, obj); - mm_errno_set(0); - if (cond->call.type != MM_CALL_NONE) { - mm_errno_set(EINPROGRESS); - return -1; - } - return mm_cond_wait(cond, time_ms, 1); + return mm_cond_wait(cond, time_ms); } diff --git a/third_party/machinarium/sources/cond.h b/third_party/machinarium/sources/cond.h index f3e258194..05a3b07af 100644 --- a/third_party/machinarium/sources/cond.h +++ b/third_party/machinarium/sources/cond.h @@ -41,19 +41,16 @@ static inline int mm_cond_try(mm_cond_t *cond) return signal; } -static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms, int nochange) +static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms) { if (cond->signal) { - if (!nochange) { - cond->signal = 0; - } + cond->signal = 0; return 0; } mm_call(&cond->call, MM_CALL_COND, time_ms); if (cond->call.status != 0) return -1; - cond->signal = 0; return 0; } diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index 023331f38..abf9ecd17 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -123,9 +123,6 @@ MACHINE_API int machine_cond_try(machine_cond_t *); MACHINE_API int machine_cond_wait(machine_cond_t *, uint32_t time_ms); -MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, - uint32_t time_ms); - /* msg */ MACHINE_API machine_msg_t *machine_msg_create(int reserve); From 58892b697a74d7f835c5515b87984ad5e31caf14 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 07:46:27 +0000 Subject: [PATCH 4/8] remove more shitty code --- sources/status.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sources/status.h b/sources/status.h index 8ae13c2a3..42f520859 100644 --- a/sources/status.h +++ b/sources/status.h @@ -14,9 +14,7 @@ typedef enum { OD_ATTACH, OD_DETACH, OD_WAIT_SYNC, - OD_READ_FULL, OD_STOP, - OD_BREAK_PIPE, OD_EOOM, OD_EATTACH, OD_EATTACH_TOO_MANY_CONNECTIONS, @@ -46,12 +44,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status) return "OD_WAIT_SYNC"; case OD_STOP: return "OD_STOP"; - case OD_BREAK_PIPE: - return "OD_BREAK_PIPE"; case OD_EOOM: return "OD_EOOM"; - case OD_READ_FULL: - return "OD_READ_FULL"; case OD_EATTACH: return "OD_EATTACH"; case OD_EATTACH_TOO_MANY_CONNECTIONS: From 5984d92b95aa5481e75fdf04ff93283377fb467c Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 07:52:05 +0000 Subject: [PATCH 5/8] make better --- sources/frontend.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 0c45d3d78..0a419c456 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1604,7 +1604,9 @@ od_frontend_remote_process_server(od_server_t *server, od_client_t *client, od_router_t *router = client->global->router; od_router_detach(router, client); server = NULL; - } else if (status != OD_OK) { + } else if (status == OD_WAIT_SYNC) { + return OD_OK; + } if (status != OD_OK) { return status; } return OD_OK; @@ -1761,10 +1763,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) /* retry read operation after attach */ continue; } else if (status == OD_WAIT_SYNC) { - // while (machine_cond_wait_no_change(server->relay.src->on_read, 60000) != 0) { - // machine_sleep(1); - // // wtf? - // } + // ok } else if (status != OD_OK) { break; } @@ -1777,9 +1776,9 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) status = od_frontend_remote_process_server( server, client, 1); // OD_ATTACH should not happen. - // OD_DETACH only when od_server_synchronized is true - if (status != OD_OK && status != OD_WAIT_SYNC && - status != OD_DETACH) { + // OD_DETACH & OD_WAIT_SYNC handled inside + // od_frontend_remote_process_server + if (status != OD_OK) { break; } } From af157adb46b83713866cf9c258ad80362c7e0bed Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 09:16:21 +0000 Subject: [PATCH 6/8] better --- sources/frontend.c | 2 +- sources/relay.h | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 0a419c456..4dd4d16cd 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1606,7 +1606,7 @@ od_frontend_remote_process_server(od_server_t *server, od_client_t *client, server = NULL; } else if (status == OD_WAIT_SYNC) { return OD_OK; - } if (status != OD_OK) { + } else if (status != OD_OK) { return status; } return OD_OK; diff --git a/sources/relay.h b/sources/relay.h index 56fcdb858..702748354 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -362,13 +362,9 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay, /* on read event */ od_frontend_status_t retstatus; retstatus = OD_OK; - int rc1; - if (waitread) { - rc1 = machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0; - } else { - rc1 = machine_cond_try(relay->src->on_read); - } - if (rc1) { + if (waitread ? + (machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0) : + machine_cond_try(relay->src->on_read)) { od_frontend_status_t rc; if (relay->dst == NULL) { From 76e590249f583a7af16bb7f588563bcdc766cfbc Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 10:36:37 +0000 Subject: [PATCH 7/8] fix --- sources/frontend.c | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sources/frontend.c b/sources/frontend.c index 4dd4d16cd..cd3e9a729 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1454,10 +1454,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, client, server, "statement: %.*s", name_len, name); - machine_msg_t *pmsg; - pmsg = kiwi_be_write_close_complete(NULL); - - rc = machine_iov_add(relay->iov, pmsg); + machine_msg_t *cmsg; + cmsg = kiwi_be_write_close_complete(NULL); + rc = machine_iov_add(client->server->relay.iov, cmsg); if (rc != 0) { return OD_ESERVER_WRITE; } @@ -1489,11 +1488,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, break; } - /* If the retstatus is not SKIP */ - if (route->rule->pool->reserve_prepared_statement && forwarded != 1) { - // machine_iov_add_pointer(client->server->relay.iov, data, size); - // retstatus = OD_SKIP; - } /* update server stats */ od_stat_query_start(&server->stats_state); return retstatus; From 2c255bd219036382a973552f8a7004c3c46efa79 Mon Sep 17 00:00:00 2001 From: reshke Date: Wed, 21 Feb 2024 10:40:53 +0000 Subject: [PATCH 8/8] fix --- sources/frontend.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sources/frontend.c b/sources/frontend.c index cd3e9a729..e31687763 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1456,7 +1456,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_t *cmsg; cmsg = kiwi_be_write_close_complete(NULL); - rc = machine_iov_add(client->server->relay.iov, cmsg); + rc = machine_iov_add(client->server->relay.iov, + cmsg); if (rc != 0) { return OD_ESERVER_WRITE; }