diff --git a/sources/frontend.c b/sources/frontend.c index dbf00e64a..dc2090ca9 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -729,7 +729,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, kiwi_be_type_t type = *data; if (instance->config.log_debug) - od_debug(&instance->logger, "main", client, server, "%s", + od_debug(&instance->logger, "main", client, server, "server recieved %s", kiwi_be_type_to_string(type)); int is_deploy = od_server_in_deploy(server); @@ -996,8 +996,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, "%s", kiwi_fe_type_to_string(type)); od_frontend_status_t retstatus = OD_OK; - machine_msg_t *msg; - msg = NULL; bool forwarded = 0; switch (type) { case KIWI_FE_COPY_DONE: @@ -1008,11 +1006,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, case KIWI_FE_QUERY: if (instance->config.log_query || route->rule->log_query) od_frontend_log_query(instance, client, data, size); + retstatus = OD_WAIT_SYNC; /* update server sync state */ od_server_sync_request(server, 1); break; case KIWI_FE_FUNCTION_CALL: case KIWI_FE_SYNC: + retstatus = OD_WAIT_SYNC; /* update server sync state */ od_server_sync_request(server, 1); break; @@ -1088,10 +1088,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // rewrite msg // allocate prepered statement under name equal to body hash - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1100,17 +1101,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "describe", - NULL, server, - "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1119,6 +1116,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, *refcnt = 1 + *refcnt; } + machine_msg_t *msg; msg = kiwi_fe_write_describe(NULL, 'S', opname, OD_HASH_LEN); @@ -1133,13 +1131,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - // msg if deallocated automaictly - rc = od_write(&server->io, msg); - forwarded = 1; - if (rc == -1) { - od_error(&instance->logger, "describe", NULL, - server, "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } } @@ -1188,6 +1181,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, size - opname_start_offset - desc.operator_name_len); + machine_msg_t *msg; + msg = NULL; + assert(client->prep_stmt_ids); if (od_hashmap_insert(client->prep_stmt_ids, keyhash, &key, &value_ptr)) { @@ -1205,18 +1201,14 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_snprintf(buf, OD_HASH_LEN, "%08x", body_hash); - msg = kiwi_fe_write_close( + machine_msg_t *cmsg; + cmsg = kiwi_fe_write_close( NULL, 'S', buf, OD_HASH_LEN); - if (msg == NULL) { + if (cmsg == NULL) { return OD_ESERVER_WRITE; } - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "parse", NULL, server, - "write error: %s", - od_io_error( - &server->io)); + rc = machine_iov_add(relay->iov, cmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } msg = kiwi_fe_write_parse_description( @@ -1281,12 +1273,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // stat backend parse msg od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "parse", - NULL, server, - "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1308,15 +1296,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, if (pmsg == NULL) { return OD_ESERVER_WRITE; } - rc = od_write(&client->io, pmsg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "parse", client, - NULL, "write error: %s", - od_io_error(&client->io)); + rc = machine_iov_add(client->server->relay.iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } + forwarded = 1; } break; case KIWI_FE_BIND: @@ -1389,11 +1373,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // rewrite msg // allocate prepered statement under name equal to body hash - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1402,17 +1388,14 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "rewrite parse", NULL, server, - "write error: %s", - od_io_error(&server->io)); + + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { return OD_ESERVER_WRITE; } } else { @@ -1420,6 +1403,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, *refcnt = 1 + *refcnt; } + + machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, operator_name_len, @@ -1437,15 +1422,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - rc = od_write(&server->io, msg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "rewrite bind", - NULL, server, "write error: %s", - od_io_error(&server->io)); + rc = machine_iov_add(relay->iov, msg); + if (rc != 0) { return OD_ESERVER_WRITE; } + forwarded = 1; } break; case KIWI_FE_EXECUTE: @@ -1477,13 +1458,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_t *pmsg; pmsg = kiwi_be_write_close_complete(NULL); - rc = od_write(&client->io, pmsg); - if (rc == -1) { - od_error(&instance->logger, - "close report", NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ECLIENT_WRITE; + rc = machine_iov_add(relay->iov, pmsg); + if (rc != 0) { + return OD_ESERVER_WRITE; } } @@ -1514,11 +1491,9 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } /* If the retstatus is not SKIP */ - if (route->rule->pool->reserve_prepared_statement && forwarded != 1 && - msg != NULL) { - msg = kiwi_fe_copy_msg(msg, data, size); - od_write(&server->io, msg); - retstatus = OD_SKIP; + if (route->rule->pool->reserve_prepared_statement && forwarded != 1) { + // machine_iov_add_pointer(client->server->relay.iov, data, size); + // retstatus = OD_SKIP; } /* update server stats */ od_stat_query_start(&server->stats_state); @@ -1596,9 +1571,9 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, } static inline od_frontend_status_t -od_frontend_remote_process_server(od_server_t *server, od_client_t *client) +od_frontend_remote_process_server(od_server_t *server, od_client_t *client, int waitread) { - od_frontend_status_t status = od_relay_step(&server->relay); + od_frontend_status_t status = od_relay_step(&server->relay, waitread, 1); int rc; od_instance_t *instance = client->global->instance; @@ -1756,7 +1731,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) server = client->server; /* attach */ - status = od_relay_step(&client->relay); + status = od_relay_step(&client->relay, 0, 0); if (status == OD_ATTACH) { /* Check for replication lag and reject query if too big */ od_frontend_status_t catchup_status = @@ -1785,6 +1760,11 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) /* retry read operation after attach */ continue; + } else if (status == OD_WAIT_SYNC) { + // while (machine_cond_wait_no_change(server->relay.src->on_read, 60000) != 0) { + // machine_sleep(1); + // // wtf? + // } } else if (status != OD_OK) { break; } @@ -1792,7 +1772,14 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) if (server == NULL) continue; - status = od_frontend_remote_process_server(server, client); + if (status == OD_WAIT_SYNC) { + while (!od_server_synchronized(server)) { + status = od_frontend_remote_process_server(server, client, 1); + } + } else { + status = od_frontend_remote_process_server(server, client, 0); + } + if (status != OD_OK) { break; } @@ -1843,6 +1830,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context, case OD_STOP: /* fallthrough */ case OD_OK: + case OD_WAIT_SYNC: /* graceful disconnect or kill */ if (instance->config.log_session) { od_log(&instance->logger, context, client, server, diff --git a/sources/relay.h b/sources/relay.h index 61c66b1c6..3564c642c 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -168,6 +168,7 @@ static inline od_frontend_status_t od_relay_on_packet_msg(od_relay_t *relay, switch (status) { case OD_OK: + case OD_WAIT_SYNC: /* fallthrough */ case OD_DETACH: rc = machine_iov_add(relay->iov, msg); @@ -193,6 +194,7 @@ static inline od_frontend_status_t od_relay_on_packet(od_relay_t *relay, switch (status) { case OD_OK: + case OD_WAIT_SYNC: /* fallthrough */ case OD_DETACH: rc = machine_iov_add_pointer(relay->iov, data, size); @@ -284,7 +286,7 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) return OD_OK; } -static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) +static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay, int stepserv) { char *current = od_readahead_pos_read(&relay->src->readahead); char *end = od_readahead_pos(&relay->src->readahead); @@ -292,6 +294,11 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) int progress; od_frontend_status_t rc; rc = od_relay_process(relay, &progress, current, end - current); + if (stepserv) { + if (rc == OD_WAIT_SYNC) { + rc = OD_OK; + } + } current += progress; od_readahead_pos_read_advance(&relay->src->readahead, progress); if (rc != OD_OK) { @@ -354,11 +361,20 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) return OD_OK; } -static inline od_frontend_status_t od_relay_step(od_relay_t *relay) +static inline od_frontend_status_t od_relay_step(od_relay_t *relay, int waitread, int stepserv) { /* on read event */ - int rc; - if (machine_cond_try(relay->src->on_read)) { + od_frontend_status_t retstatus; + retstatus = OD_OK; + int rc1; + if (waitread) { + rc1 = machine_cond_wait(relay->src->on_read, 10000000) == 0; + } else { + rc1 = machine_cond_try(relay->src->on_read); + } + if (rc1) { + od_frontend_status_t rc; + if (relay->dst == NULL) { /* signal to retry on read logic */ machine_cond_signal(relay->src->on_read); @@ -369,10 +385,23 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) if (rc != OD_OK) return rc; - rc = od_relay_pipeline(relay); + rc = od_relay_pipeline(relay, stepserv); - if (rc != OD_OK) + switch (rc) { + case OD_OK: + break; + case OD_WAIT_SYNC: + if (stepserv) { + break; + } + /* signal to retry on read logic */ + machine_cond_signal(relay->src->on_read); + + retstatus = rc; + break; + default: return rc; + } if (machine_iov_pending(relay->iov)) { /* try to optimize write path and handle it right-away */ @@ -383,10 +412,11 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } if (relay->dst == NULL) - return OD_OK; + return retstatus; /* on write event */ if (machine_cond_try(relay->dst->on_write)) { + int rc; rc = od_relay_write(relay); if (rc != OD_OK) return rc; @@ -408,7 +438,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } } - return OD_OK; + return retstatus; } static inline od_frontend_status_t od_relay_flush(od_relay_t *relay) diff --git a/sources/server.h b/sources/server.h index c5bc5123e..b200141d0 100644 --- a/sources/server.h +++ b/sources/server.h @@ -32,6 +32,8 @@ struct od_server { int deploy_sync; od_stat_state_t stats_state; + machine_cond_t *cond; + uint64_t sync_request; uint64_t sync_reply; @@ -78,6 +80,7 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts) server->deploy_sync = 0; server->sync_request = 0; server->sync_reply = 0; + server->cond = machine_cond_create(); server->init_time_us = machine_time_us(); server->error_connect = NULL; server->offline = 0; @@ -119,6 +122,10 @@ static inline void od_server_free(od_server_t *server) { od_relay_free(&server->relay); od_io_free(&server->io); + + if (server->cond) + machine_cond_free(server->cond); + if (server->prep_stmts) { od_hashmap_free(server->prep_stmts); } diff --git a/sources/status.h b/sources/status.h index ac4f29a66..8ae13c2a3 100644 --- a/sources/status.h +++ b/sources/status.h @@ -16,6 +16,7 @@ typedef enum { OD_WAIT_SYNC, OD_READ_FULL, OD_STOP, + OD_BREAK_PIPE, OD_EOOM, OD_EATTACH, OD_EATTACH_TOO_MANY_CONNECTIONS, @@ -45,6 +46,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status) return "OD_WAIT_SYNC"; case OD_STOP: return "OD_STOP"; + case OD_BREAK_PIPE: + return "OD_BREAK_PIPE"; case OD_EOOM: return "OD_EOOM"; case OD_READ_FULL: diff --git a/third_party/machinarium/sources/cond.c b/third_party/machinarium/sources/cond.c index 2dc5e34a0..a0ce59761 100644 --- a/third_party/machinarium/sources/cond.c +++ b/third_party/machinarium/sources/cond.c @@ -54,5 +54,18 @@ MACHINE_API int machine_cond_wait(machine_cond_t *obj, uint32_t time_ms) mm_errno_set(EINPROGRESS); return -1; } - return mm_cond_wait(cond, time_ms); + return mm_cond_wait(cond, time_ms, 0); +} + + + +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms) +{ + mm_cond_t *cond = mm_cast(mm_cond_t *, obj); + mm_errno_set(0); + if (cond->call.type != MM_CALL_NONE) { + mm_errno_set(EINPROGRESS); + return -1; + } + return mm_cond_wait(cond, time_ms, 1); } diff --git a/third_party/machinarium/sources/cond.h b/third_party/machinarium/sources/cond.h index 029c9cf43..f3e258194 100644 --- a/third_party/machinarium/sources/cond.h +++ b/third_party/machinarium/sources/cond.h @@ -41,15 +41,19 @@ static inline int mm_cond_try(mm_cond_t *cond) return signal; } -static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms) +static inline int mm_cond_wait(mm_cond_t *cond, uint32_t time_ms, int nochange) { if (cond->signal) { - cond->signal = 0; + if (!nochange) { + cond->signal = 0; + } return 0; } mm_call(&cond->call, MM_CALL_COND, time_ms); if (cond->call.status != 0) return -1; + + cond->signal = 0; return 0; } diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index abf9ecd17..5c3c3bf67 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -123,6 +123,8 @@ MACHINE_API int machine_cond_try(machine_cond_t *); MACHINE_API int machine_cond_wait(machine_cond_t *, uint32_t time_ms); +MACHINE_API int machine_cond_wait_no_change(machine_cond_t *obj, uint32_t time_ms); + /* msg */ MACHINE_API machine_msg_t *machine_msg_create(int reserve);