diff --git a/sources/frontend.c b/sources/frontend.c index 859f15e2a..7bba5a609 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -769,6 +769,11 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, is_ready_for_query = 1; od_backend_ready(server, data, size); + /* exactly one RFQ! */ + if (od_server_in_sync_point(server)) { + retstatus = OD_SKIP; + } + if (is_deploy) server->deploy_sync--; @@ -979,7 +984,7 @@ static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, return msg; } -static int od_frontend_deploy_prepared_stmt( +static od_frontend_status_t od_frontend_deploy_prepared_stmt( od_server_t *server, od_relay_t *relay, char *ctx, machine_msg_t *msg /* to adcance or to write? */ ) @@ -1041,14 +1046,14 @@ static int od_frontend_deploy_prepared_stmt( // advance? // machine_iov_add(relay->iov, pmsg); - return 1; + return OD_OK; } else { int *refcnt; refcnt = value_ptr->data; *refcnt = 1 + *refcnt; od_stat_parse_reuse(&route->stats); - return 0; + return OD_OK; } } @@ -1769,25 +1774,61 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) break; } } + + if (status != OD_OK) { + break; + } + // deploy here assert(server->parse_msg != NULL); /* fill internals structs in */ - od_frontend_deploy_prepared_stmt(server, &server->relay, - "sync-point", - server->parse_msg); + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "sync-point", + server->parse_msg) != OD_OK) { + status = OD_ESERVER_WRITE; + break; + } server->parse_msg = NULL; - if (od_backend_request_sync_point(server) == - NOT_OK_RESPONSE) { - od_error(&instance->logger, "sync-point", - client, server, "write error: %s", - od_io_error(&client->io)); - return OD_ESERVER_WRITE; + machine_msg_t *msg; + msg = kiwi_fe_write_sync(NULL); + if (msg == NULL) { + status = OD_ESERVER_WRITE; + break; + } + rc = od_write(&server->io, msg); + if (rc == -1) { + status = OD_ESERVER_WRITE; + break; + } + + /* enter sync piont mode */ + server->sync_point = 1; + od_server_sync_request(server, 1); + + while (1) { + if (od_server_synchronized(server)) { + break; + } + // await here + + od_log(&instance->logger, "sync-point", client, + server, "process await"); + status = od_frontend_remote_process_server( + server, client, true); + + if (status != OD_OK) { + break; + } + } + + server->sync_point = 0; + if (status != OD_OK) { + break; } - /* Ugly hack here */ machine_msg_t *pmsg; pmsg = kiwi_be_write_parse_complete(NULL); if (pmsg == NULL) { diff --git a/sources/server.h b/sources/server.h index 262eca280..86cfa0535 100644 --- a/sources/server.h +++ b/sources/server.h @@ -54,6 +54,7 @@ struct od_server { /* allocated prepared statements ids */ od_hashmap_t *prep_stmts; + int sync_point; od_global_t *global; int offline; @@ -80,6 +81,7 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts) server->deploy_sync = 0; server->sync_request = 0; server->sync_reply = 0; + server->sync_point = 0; server->parse_msg = NULL; server->init_time_us = machine_time_us(); server->error_connect = NULL; @@ -143,6 +145,11 @@ static inline int od_server_in_deploy(od_server_t *server) return server->deploy_sync > 0; } +static inline int od_server_in_sync_point(od_server_t *server) +{ + return server->sync_point > 0; +} + static inline int od_server_synchronized(od_server_t *server) { assert(server->sync_request >= server->sync_reply);