diff --git a/docker/Dockerfile b/docker/Dockerfile index fc4df8868..b451ca900 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -69,6 +69,7 @@ COPY --from=base /prep_stmts/pkg/pstmts-test /pstmts-test COPY --from=base /config-validation/pkg/config-validation /config-validation COPY ./docker/scram /scram COPY ./docker/hba /hba +COPY ./docker/rule-address /rule-address COPY ./docker/auth_query /auth_query COPY ./docker/ldap /ldap COPY ./docker/lagpolling /lagpolling diff --git a/docker/auth_query/test_auth_query.sh b/docker/auth_query/test_auth_query.sh index c7b7fc102..6e179228d 100755 --- a/docker/auth_query/test_auth_query.sh +++ b/docker/auth_query/test_auth_query.sh @@ -28,5 +28,4 @@ PGPASSWORD=passwd psql -h localhost -p 6432 -U auth_query_user_md5 -c "SELECT 1" exit 1 } - ody-stop diff --git a/docker/bin/setup b/docker/bin/setup index 09ae7a47f..bb3ece249 100755 --- a/docker/bin/setup +++ b/docker/bin/setup @@ -49,7 +49,7 @@ sudo -u postgres /usr/bin/pg_basebackup -D /var/lib/postgresql/14/repl -R -h loc sudo -u postgres /usr/lib/postgresql/14/bin/pg_ctl -D /var/lib/postgresql/14/repl/ -o '-p 5433' start # Create databases -for database_name in db scram_db ldap_db auth_query_db db1 hba_db tsa_db group_db; do +for database_name in db scram_db ldap_db auth_query_db db1 hba_db tsa_db group_db addr_db; do sudo -u postgres createdb $database_name >> "$SETUP_LOG" 2>&1 || { echo "ERROR: 'createdb $database_name' failed, examine the log" cat "$SETUP_LOG" @@ -135,6 +135,14 @@ psql -h localhost -p 5432 -U postgres -c "create user user_allow password 'corr exit 1 } +# Create users +psql -h localhost -p 5432 -U postgres -c "create user user_addr_correct password 'correct_password'; create user user_addr_incorrect password 'correct_password'; create user user_addr_default password 'correct_password'; create user user_addr_empty password 'correct_password'; create user user_addr_hostname_localhost password 'correct_password';" >> $SETUP_LOG 2>&1 || { + echo "ERROR: users creation failed, examine the log" + cat "$SETUP_LOG" + cat "$PG_LOG" + exit 1 +} + for i in `seq 0 9` do # Create tables diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 373755185..238f759db 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -6,8 +6,11 @@ cd /test_dir/test && /usr/bin/odyssey_test setup -# group +# group tests /group/test_group.sh + +# odyssey rule-address test +/rule-address/test.sh if [ $? -eq 1 ] then exit 1 @@ -81,4 +84,4 @@ ody-start /ody-integration-test ody-stop -teardown +teardown \ No newline at end of file diff --git a/docker/prep_stmts/.gitignore b/docker/prep_stmts/.gitignore new file mode 100644 index 000000000..666e094e2 --- /dev/null +++ b/docker/prep_stmts/.gitignore @@ -0,0 +1,2 @@ +pstst +psmst diff --git a/docker/prep_stmts/pstmts.conf b/docker/prep_stmts/pstmts.conf index c38f404cb..957524405 100644 --- a/docker/prep_stmts/pstmts.conf +++ b/docker/prep_stmts/pstmts.conf @@ -60,7 +60,7 @@ database "db1" { pool_timeout 0 pool_ttl 60 pool_discard no - pool_smart_discard yes + pool_smart_discard no pool_cancel yes pool_rollback yes pool_reserve_prepared_statement yes diff --git a/docker/prep_stmts/pstst b/docker/prep_stmts/pstst deleted file mode 100755 index 550577eab..000000000 Binary files a/docker/prep_stmts/pstst and /dev/null differ diff --git a/docker/rule-address/addr.conf b/docker/rule-address/addr.conf new file mode 100644 index 000000000..c57e75bb0 --- /dev/null +++ b/docker/rule-address/addr.conf @@ -0,0 +1,64 @@ +listen { + host "*" + port 6432 +} + +storage "postgres_server" { + type "remote" + host "127.0.0.1" + port 5432 +} + +database "addr_db" { + user "user_addr_correct" "127.0.0.0/24" { + authentication "clear_text" + password "correct_password" + storage "postgres_server" + pool "session" + } + + user "user_addr_incorrect" "255.0.0.0/24" { + authentication "clear_text" + password "correct_password" + storage "postgres_server" + pool "session" + } + + user "user_addr_default" default { + authentication "clear_text" + password "correct_password" + storage "postgres_server" + pool "session" + } + + user "user_addr_empty" { + authentication "clear_text" + password "correct_password" + storage "postgres_server" + pool "session" + } + + user "user_addr_hostname_localhost" "localhost" { + authentication "clear_text" + password "correct_password" + storage "postgres_server" + pool "session" + } +} + +daemonize yes +pid_file "/var/run/odyssey.pid" + +unix_socket_dir "/tmp" +unix_socket_mode "0644" + +locks_dir "/tmp" + +log_format "%p %t %l [%i %s] (%c) %m\n" +log_file "/var/log/odyssey.log" +log_to_stdout no +log_config yes +log_debug yes +log_session yes +log_stats no +log_query yes diff --git a/docker/rule-address/test.sh b/docker/rule-address/test.sh new file mode 100755 index 000000000..165051285 --- /dev/null +++ b/docker/rule-address/test.sh @@ -0,0 +1,79 @@ +#!/bin/bash -x + +set -ex + +/usr/bin/odyssey /rule-address/addr.conf + +PGPASSWORD=correct_password psql -h localhost -p 6432 -U user_addr_correct -c "SELECT 1" addr_db > /dev/null 2>&1 || { + echo "ERROR: failed auth with correct addr, correct password and plain password in config" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=incorrect_password psql -h localhost -p 6432 -U user_addr_correct -c "SELECT 1" addr_db > /dev/null 2>&1 && { + echo "ERROR: successfully auth with correct addr, but incorrect password" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=correct_password psql -h localhost -p 6432 -U user_addr_incorrect -c "SELECT 1" addr_db > /dev/null 2>&1 && { + echo "ERROR: successfully auth with incorrect addr" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=correct_password psql -h localhost -p 6432 -U user_addr_default -c "SELECT 1" addr_db > /dev/null 2>&1 || { + echo "ERROR: failed auth with correct addr, correct password and plain password in config" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=incorrect_password psql -h localhost -p 6432 -U user_addr_default -c "SELECT 1" addr_db > /dev/null 2>&1 && { + echo "ERROR: successfully auth with correct addr, but incorrect password" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=correct_password psql -h localhost -p 6432 -U user_addr_empty -c "SELECT 1" addr_db > /dev/null 2>&1 || { + echo "ERROR: failed auth with correct addr, correct password and plain password in config" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=incorrect_password psql -h localhost -p 6432 -U user_addr_empty -c "SELECT 1" addr_db > /dev/null 2>&1 && { + echo "ERROR: successfully auth with correct addr, but incorrect password" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=correct_password psql -h localhost -p 6432 -U user_addr_hostname_localhost -c "SELECT 1" addr_db > /dev/null 2>&1 || { + echo "ERROR: failed auth with correct addr, correct password and plain password in config" + + cat /var/log/odyssey.log + + exit 1 +} + +PGPASSWORD=incorrect_password psql -h localhost -p 6432 -U user_addr_hostname_localhost -c "SELECT 1" addr_db > /dev/null 2>&1 && { + echo "ERROR: successfully auth with correct addr, but incorrect password" + + cat /var/log/odyssey.log + + exit 1 +} + +ody-stop diff --git a/sources/CMakeLists.txt b/sources/CMakeLists.txt index d57375695..a4456d0eb 100644 --- a/sources/CMakeLists.txt +++ b/sources/CMakeLists.txt @@ -24,6 +24,7 @@ set(od_src deploy.c reset.c frontend.c + backend_sync.c backend.c instance.c main.c @@ -47,6 +48,7 @@ set(od_src storage.c murmurhash.c hashmap.c + address.c hba.c hba_reader.c hba_rule.c diff --git a/sources/address.c b/sources/address.c new file mode 100644 index 000000000..30b5ff3eb --- /dev/null +++ b/sources/address.c @@ -0,0 +1,242 @@ +#include +#include +#include + +/* +* Odyssey. +* +* Scalable PostgreSQL connection pooler. +*/ + +od_address_range_t od_address_range_create_default() +{ + od_address_range_t address_range = { .string_value = strdup("all"), + .string_value_len = strlen("all"), + .is_default = 1 }; + return address_range; +} + +void od_address_range_copy(od_address_range_t *src, od_address_range_t *dst) +{ + dst->string_value = strndup(src->string_value, src->string_value_len); + dst->string_value_len = src->string_value_len; + dst->addr = src->addr; + dst->mask = src->mask; + dst->is_default = src->is_default; + dst->is_hostname = src->is_hostname; +} + +int od_address_range_read_prefix(od_address_range_t *address_range, + char *prefix) +{ + char *end = NULL; + long len = strtol(prefix, &end, 10); + if (*prefix == '\0' || *end != '\0') { + return -1; + } + if (address_range->addr.ss_family == AF_INET) { + if (len > 32) + return -1; + struct sockaddr_in *addr = + (struct sockaddr_in *)&address_range->mask; + uint32 mask; + if (len > 0) + mask = 0xffffffffUL << (32 - (int)len); + else + mask = 0; + addr->sin_addr.s_addr = od_bswap32(mask); + return 0; + } else if (address_range->addr.ss_family == AF_INET6) { + if (len > 128) + return -1; + struct sockaddr_in6 *addr = + (struct sockaddr_in6 *)&address_range->mask; + int i; + for (i = 0; i < 16; i++) { + if (len <= 0) + addr->sin6_addr.s6_addr[i] = 0; + else if (len >= 8) + addr->sin6_addr.s6_addr[i] = 0xff; + else { + addr->sin6_addr.s6_addr[i] = + (0xff << (8 - (int)len)) & 0xff; + } + len -= 8; + } + return 0; + } + + return -1; +} + +int od_address_read(struct sockaddr_storage *dest, const char *addr) +{ + int rc; + rc = inet_pton(AF_INET, addr, &((struct sockaddr_in *)dest)->sin_addr); + if (rc > 0) { + dest->ss_family = AF_INET; + return 0; + } + if (inet_pton(AF_INET6, addr, + &((struct sockaddr_in6 *)dest)->sin6_addr) > 0) { + dest->ss_family = AF_INET6; + return 0; + } + return -1; +} + +static bool od_address_ipv4eq(struct sockaddr_in *a, struct sockaddr_in *b) +{ + return (a->sin_addr.s_addr == b->sin_addr.s_addr); +} + +static bool od_address_ipv6eq(struct sockaddr_in6 *a, struct sockaddr_in6 *b) +{ + int i; + for (i = 0; i < 16; i++) + if (a->sin6_addr.s6_addr[i] != b->sin6_addr.s6_addr[i]) + return false; + return true; +} + +bool od_address_equals(struct sockaddr *firstAddress, + struct sockaddr *secondAddress) +{ + if (firstAddress->sa_family == secondAddress->sa_family) { + if (firstAddress->sa_family == AF_INET) { + if (od_address_ipv4eq( + (struct sockaddr_in *)firstAddress, + (struct sockaddr_in *)secondAddress)) + return true; + } else if (firstAddress->sa_family == AF_INET6) { + if (od_address_ipv6eq( + (struct sockaddr_in6 *)firstAddress, + (struct sockaddr_in6 *)secondAddress)) + return true; + } + } + return false; +} + +bool od_address_range_equals(od_address_range_t *first, + od_address_range_t *second) +{ + if (first->is_hostname == second->is_hostname) + return pg_strcasecmp(first->string_value, + second->string_value) == 0; + + return od_address_equals((struct sockaddr *)&first->addr, + (struct sockaddr *)&second->addr) && + od_address_equals((struct sockaddr *)&first->mask, + (struct sockaddr *)&second->mask); +} + +static bool od_address_hostname_match(const char *pattern, + const char *actual_hostname) +{ + if (pattern[0] == '.') /* suffix match */ + { + size_t plen = strlen(pattern); + size_t hlen = strlen(actual_hostname); + if (hlen < plen) + return false; + return (pg_strcasecmp(pattern, + actual_hostname + (hlen - plen)) == 0); + } else + return (pg_strcasecmp(pattern, actual_hostname) == 0); +} + +/* + * Check to see if a connecting IP matches a given host name. + */ +static bool od_address_check_hostname(struct sockaddr_storage *client_sa, + const char *hostname) +{ + struct addrinfo *gai_result, *gai; + int ret; + bool found; + + char client_hostname[NI_MAXHOST]; + + ret = getnameinfo(client_sa, sizeof(*client_sa), client_hostname, + sizeof(client_hostname), NULL, 0, NI_NAMEREQD); + + if (ret != 0) + return false; + + /* Now see if remote host name matches this pg_hba line */ + if (!od_address_hostname_match(hostname, client_hostname)) + return false; + + /* Lookup IP from host name and check against original IP */ + ret = getaddrinfo(client_hostname, NULL, NULL, &gai_result); + if (ret != 0) + return false; + + found = false; + for (gai = gai_result; gai; gai = gai->ai_next) { + found = od_address_equals(gai->ai_addr, + (struct sockaddr *)client_sa); + if (found) { + break; + } + } + + if (gai_result) + freeaddrinfo(gai_result); + + return found; +} + +bool od_address_validate(od_address_range_t *address_range, + struct sockaddr_storage *sa) +{ + if (address_range->is_hostname) + return od_address_check_hostname(sa, + address_range->string_value); + + if (address_range->addr.ss_family != sa->ss_family) + return false; + + if (sa->ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)sa; + struct sockaddr_in *addr = + (struct sockaddr_in *)&address_range->addr; + struct sockaddr_in *mask = + (struct sockaddr_in *)&address_range->mask; + in_addr_t client_addr = sin->sin_addr.s_addr; + in_addr_t client_net = mask->sin_addr.s_addr & client_addr; + return (client_net ^ addr->sin_addr.s_addr) == 0; + } else if (sa->ss_family == AF_INET6) { + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa; + struct sockaddr_in6 *addr = + (struct sockaddr_in6 *)&address_range->addr; + struct sockaddr_in6 *mask = + (struct sockaddr_in6 *)&address_range->mask; + for (int i = 0; i < 16; ++i) { + uint8_t client_net_byte = mask->sin6_addr.s6_addr[i] & + sin->sin6_addr.s6_addr[i]; + if (client_net_byte ^ addr->sin6_addr.s6_addr[i]) { + return false; + } + } + return true; + } + + return false; +} + +int od_address_hostname_validate(char *hostname) +{ + regex_t regex; + char *valid_rfc952_hostname_regex = + "^(\\.?(([a-zA-Z]|[a-zA-Z][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z]|[A-Za-z][A-Za-z0-9\\-]*[A-Za-z0-9]))$"; + int reti = regcomp(®ex, valid_rfc952_hostname_regex, REG_EXTENDED); + if (reti) + return -1; + reti = regexec(®ex, hostname, 0, NULL, 0); + if (reti == 0) + return 0; + else + return 1; +} diff --git a/sources/address.h b/sources/address.h new file mode 100644 index 000000000..f1ec0fb03 --- /dev/null +++ b/sources/address.h @@ -0,0 +1,37 @@ +#ifndef ODYSSEY_ADDRESS_H +#define ODYSSEY_ADDRESS_H + +/* + * Odyssey. + * + * Scalable PostgreSQL connection pooler. + */ + +typedef struct od_address_range od_address_range_t; + +struct od_address_range { + char *string_value; + int string_value_len; + struct sockaddr_storage addr; + struct sockaddr_storage mask; + int is_hostname; + int is_default; +}; + +od_address_range_t od_address_range_create_default(); + +void od_address_range_copy(od_address_range_t *, od_address_range_t *); + +int od_address_range_read_prefix(od_address_range_t *, char *); + +int od_address_read(struct sockaddr_storage *, const char *); + +bool od_address_equals(struct sockaddr *, struct sockaddr *); + +bool od_address_range_equals(od_address_range_t *, od_address_range_t *); + +bool od_address_validate(od_address_range_t *, struct sockaddr_storage *); + +int od_address_hostname_validate(char *); + +#endif /* ODYSSEY_ADDRESS_H */ \ No newline at end of file diff --git a/sources/auth_query.c b/sources/auth_query.c index 922671dcc..51c914dfe 100644 --- a/sources/auth_query.c +++ b/sources/auth_query.c @@ -178,9 +178,17 @@ int od_auth_query(od_client_t *client, char *peer) kiwi_var_set(&auth_client->startup.database, KIWI_VAR_UNDEF, rule->auth_query_db, strlen(rule->auth_query_db) + 1); + /* set io from client */ + od_io_t auth_client_io = auth_client->io; + auth_client->io = client->io; + /* route */ od_router_status_t status; status = od_router_route(router, auth_client); + + /* return io auth_client back */ + auth_client->io = auth_client_io; + if (status != OD_ROUTER_OK) { od_debug(&instance->logger, "auth_query", auth_client, NULL, "failed to route internal auth query client: %s", diff --git a/sources/backend.c b/sources/backend.c index bd7236044..e4555bbc6 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -100,6 +100,7 @@ int od_backend_ready(od_server_t *server, char *data, uint32_t size) } /* update server sync reply state */ + od_server_sync_reply(server); return 0; } @@ -171,6 +172,7 @@ static inline int od_backend_startup(od_server_t *server, /* update request count and sync state */ od_server_sync_request(server, 1); + assert(server->client); while (1) { msg = od_read(&server->io, UINT32_MAX); @@ -695,11 +697,14 @@ int od_backend_update_parameter(od_server_t *server, char *context, char *data, } int od_backend_ready_wait(od_server_t *server, char *context, int count, - uint32_t time_ms) + uint32_t time_ms, uint32_t ignore_errors) { od_instance_t *instance = server->global->instance; int ready = 0; - for (;;) { + int query_rc; + query_rc = 0; + + for (; !od_server_synchronized(server);) { machine_msg_t *msg; msg = od_read(&server->io, time_ms); if (msg == NULL) { @@ -722,26 +727,28 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count, machine_msg_data(msg), machine_msg_size(msg), 1); + machine_msg_free(msg); if (rc == -1) { - machine_msg_free(msg); return -1; } } else if (type == KIWI_BE_ERROR_RESPONSE) { od_backend_error(server, context, machine_msg_data(msg), machine_msg_size(msg)); machine_msg_free(msg); - continue; + if (!ignore_errors) { + query_rc = -1; + } } else if (type == KIWI_BE_READY_FOR_QUERY) { od_backend_ready(server, machine_msg_data(msg), machine_msg_size(msg)); + machine_msg_free(msg); ready++; - if (ready == count) { - machine_msg_free(msg); - return 0; - } + } else { + machine_msg_free(msg); } - machine_msg_free(msg); } + + return query_rc; /* never reached */ } @@ -771,18 +778,19 @@ od_retcode_t od_backend_query_send(od_server_t *server, char *context, /* update server sync state */ od_server_sync_request(server, 1); + assert(server->client); return OK_RESPONSE; } od_retcode_t od_backend_query(od_server_t *server, char *context, char *query, char *param, int len, uint32_t timeout, - uint32_t count) + uint32_t count, uint32_t ignore_errors) { if (od_backend_query_send(server, context, query, param, len) == NOT_OK_RESPONSE) { return NOT_OK_RESPONSE; } - od_retcode_t rc = - od_backend_ready_wait(server, context, count, timeout); + od_retcode_t rc = od_backend_ready_wait(server, context, count, timeout, + ignore_errors); return rc; } diff --git a/sources/backend.h b/sources/backend.h index f2c1ddd8d..b583a528d 100644 --- a/sources/backend.h +++ b/sources/backend.h @@ -16,11 +16,11 @@ void od_backend_error(od_server_t *, char *, char *, uint32_t); int od_backend_update_parameter(od_server_t *, char *, char *, uint32_t, int); int od_backend_ready(od_server_t *, char *, uint32_t); -int od_backend_ready_wait(od_server_t *, char *, int, uint32_t); +int od_backend_ready_wait(od_server_t *, char *, int, uint32_t, uint32_t); od_retcode_t od_backend_query_send(od_server_t *server, char *context, char *query, char *param, int len); od_retcode_t od_backend_query(od_server_t *, char *, char *, char *, int, - uint32_t, uint32_t); + uint32_t, uint32_t, uint32_t); #endif /* ODYSSEY_BACKEND_H */ diff --git a/sources/backend_sync.c b/sources/backend_sync.c new file mode 100644 index 000000000..1a557c1b3 --- /dev/null +++ b/sources/backend_sync.c @@ -0,0 +1,34 @@ +/* + * Odyssey. + * + * Scalable PostgreSQL connection pooler. + */ + +#include +#include +#include + +int od_backend_request_sync_point(od_server_t *server) +{ + od_instance_t *instance = server->global->instance; + int rc; + + machine_msg_t *msg; + msg = kiwi_fe_write_sync(NULL); + if (msg == NULL) { + return -1; + } + rc = od_write(&server->io, msg); + if (rc == -1) { + od_error(&instance->logger, "sync-point", server->client, + server, "write error: %s", od_io_error(&server->io)); + return NOT_OK_RESPONSE; + } + + /* update server sync state */ + od_server_sync_request(server, 1); + + return od_backend_ready_wait(server, "sync-point", 1 /*count*/, + UINT32_MAX /* timeout */, + 0 /*ignore error?*/); +} \ No newline at end of file diff --git a/sources/backend_sync.h b/sources/backend_sync.h new file mode 100644 index 000000000..b22665d8f --- /dev/null +++ b/sources/backend_sync.h @@ -0,0 +1,12 @@ +#ifndef ODYSSEY_BACKEND_SYNC_H +#define ODYSSEY_BACKEND_SYNC_H + +/* + * Odyssey. + * + * Scalable PostgreSQL connection pooler. + */ + +int od_backend_request_sync_point(od_server_t *); + +#endif /* ODYSSEY_BACKEND_SYNC_H */ diff --git a/sources/client.h b/sources/client.h index 68e49b2b2..1252d3b7b 100644 --- a/sources/client.h +++ b/sources/client.h @@ -43,6 +43,8 @@ struct od_client { uint64_t time_setup; uint64_t time_last_active; + bool is_watchdog; + kiwi_be_startup_t startup; kiwi_vars_t vars; kiwi_key_t key; @@ -52,7 +54,7 @@ struct od_client { void *route; char peer[OD_CLIENT_MAX_PEERLEN]; - // desc preparet statements ids + /* desc preparet statements ids */ od_hashmap_t *prep_stmt_ids; /* passwd from config rule */ diff --git a/sources/config_reader.c b/sources/config_reader.c index d55b43ec7..f34b7813a 100644 --- a/sources/config_reader.c +++ b/sources/config_reader.c @@ -405,6 +405,38 @@ static bool od_config_reader_is(od_config_reader_t *reader, int id) return true; } +static inline bool od_config_reader_symbol_is(od_config_reader_t *reader, + char symbol) +{ + od_token_t token; + int rc; + rc = od_parser_next(&reader->parser, &token); + od_parser_push(&reader->parser, &token); + if (rc != OD_PARSER_SYMBOL) + return false; + if (token.value.num != (int64_t)symbol) + return false; + return true; +} + +static bool od_config_reader_keyword_is(od_config_reader_t *reader, + od_keyword_t *keyword) +{ + od_token_t token; + int rc; + rc = od_parser_next(&reader->parser, &token); + od_parser_push(&reader->parser, &token); + if (rc != OD_PARSER_KEYWORD) + return false; + od_keyword_t *match; + match = od_keyword_match(od_config_keywords, &token); + if (keyword == NULL) + return false; + if (keyword != match) + return false; + return true; +} + bool od_config_reader_keyword(od_config_reader_t *reader, od_keyword_t *keyword) { od_token_t token; @@ -775,18 +807,18 @@ static int od_config_reader_storage(od_config_reader_t *reader, /* name */ if (!od_config_reader_string(reader, &storage->name)) - return NOT_OK_RESPONSE; + goto error; if (od_rules_storage_match(reader->rules, storage->name) != NULL) { od_config_reader_error(reader, NULL, "duplicate storage definition: %s", storage->name); - return NOT_OK_RESPONSE; + goto error; } od_rules_storage_add(reader->rules, storage); /* { */ if (!od_config_reader_symbol(reader, '{')) - return NOT_OK_RESPONSE; + goto error; for (;;) { od_token_t token; @@ -795,51 +827,53 @@ static int od_config_reader_storage(od_config_reader_t *reader, switch (rc) { case OD_PARSER_KEYWORD: break; - case OD_PARSER_EOF: + case OD_PARSER_EOF: { od_config_reader_error(reader, &token, "unexpected end of config file"); - return NOT_OK_RESPONSE; + goto error; + } case OD_PARSER_SYMBOL: /* } */ if (token.value.num == '}') { return OK_RESPONSE; } /* fall through */ - default: + default: { od_config_reader_error( reader, &token, "incorrect or unexpected parameter"); - return NOT_OK_RESPONSE; + goto error; + } } od_keyword_t *keyword; keyword = od_keyword_match(od_config_keywords, &token); if (keyword == NULL) { od_config_reader_error(reader, &token, "unknown parameter"); - return NOT_OK_RESPONSE; + goto error; } switch (keyword->id) { /* type */ case OD_LTYPE: if (!od_config_reader_string(reader, &storage->type)) - return NOT_OK_RESPONSE; + goto error; continue; /* host */ case OD_LHOST: if (od_config_reader_storage_host(reader, storage) != OK_RESPONSE) - return NOT_OK_RESPONSE; + goto error; continue; /* port */ case OD_LPORT: if (!od_config_reader_number(reader, &storage->port)) - return NOT_OK_RESPONSE; + goto error; continue; /* target_session_attrs */ case OD_LTARGET_SESSION_ATTRS: if (!od_config_reader_string(reader, &tmp)) { - return NOT_OK_RESPONSE; + goto error; } if (strcmp(tmp, "read-write") == 0) { @@ -852,7 +886,7 @@ static int od_config_reader_storage(od_config_reader_t *reader, storage->target_session_attrs = OD_TARGET_SESSION_ATTRS_RO; } else { - return NOT_OK_RESPONSE; + goto error; } free(tmp); @@ -863,57 +897,62 @@ static int od_config_reader_storage(od_config_reader_t *reader, case OD_LTLS: if (!od_config_reader_string(reader, &storage->tls_opts->tls)) - return NOT_OK_RESPONSE; + goto error; continue; /* tls_ca_file */ case OD_LTLS_CA_FILE: if (!od_config_reader_string( reader, &storage->tls_opts->tls_ca_file)) - return NOT_OK_RESPONSE; + goto error; continue; /* tls_key_file */ case OD_LTLS_KEY_FILE: if (!od_config_reader_string( reader, &storage->tls_opts->tls_key_file)) - return NOT_OK_RESPONSE; + goto error; continue; /* tls_cert_file */ case OD_LTLS_CERT_FILE: if (!od_config_reader_string( reader, &storage->tls_opts->tls_cert_file)) - return NOT_OK_RESPONSE; + goto error; continue; /* tls_protocols */ case OD_LTLS_PROTOCOLS: if (!od_config_reader_string( reader, &storage->tls_opts->tls_protocols)) - return NOT_OK_RESPONSE; + goto error; continue; /* server_max_routing */ case OD_LSERVERS_MAX_ROUTING: if (!od_config_reader_number( reader, &storage->server_max_routing)) - return NOT_OK_RESPONSE; + goto error; continue; /* watchdog */ case OD_LWATCHDOG: storage->watchdog = od_storage_watchdog_allocate(reader->global); - if (storage->watchdog == NULL) { - return NOT_OK_RESPONSE; - } + if (storage->watchdog == NULL) + goto error; if (od_config_reader_watchdog(reader, storage->watchdog, extentions) == NOT_OK_RESPONSE) - return NOT_OK_RESPONSE; + goto error; continue; - default: + default: { od_config_reader_error(reader, &token, "unexpected parameter"); - return NOT_OK_RESPONSE; + goto error; + } } } /* unreach */ +error: + if (storage->watchdog) { + od_storage_watchdog_free(storage->watchdog); + } + od_rules_storage_free(storage); return NOT_OK_RESPONSE; } @@ -1730,33 +1769,111 @@ static int od_config_reader_route(od_config_reader_t *reader, char *db_name, } user_name_len = strlen(user_name); + /* address and mask or default */ + char *addr_str = NULL; + char *mask_str = NULL; + + od_address_range_t address_range; + address_range = od_address_range_create_default(); + address_range.string_value = NULL; + address_range.string_value_len = 0; + address_range.is_default = 0; + address_range.is_hostname = 0; + + if (od_config_reader_is(reader, OD_PARSER_STRING)) { + if (!od_config_reader_string(reader, + &address_range.string_value)) + return NOT_OK_RESPONSE; + } else { + bool is_default_keyword; + is_default_keyword = od_config_reader_keyword_is( + reader, &od_config_keywords[OD_LDEFAULT]); + + if (!is_default_keyword && + !od_config_reader_symbol_is(reader, '{')) + return NOT_OK_RESPONSE; + + if (is_default_keyword) + od_config_reader_keyword( + reader, &od_config_keywords[OD_LDEFAULT]); + + address_range = od_address_range_create_default(); + if (address_range.string_value == NULL) + return NOT_OK_RESPONSE; + } + + if (address_range.is_default == 0) { + addr_str = strdup(address_range.string_value); + mask_str = strchr(addr_str, '/'); + if (mask_str) + *mask_str++ = 0; + + if (od_address_read(&address_range.addr, addr_str) == + NOT_OK_RESPONSE) { + int is_valid_hostname = od_address_hostname_validate( + address_range.string_value); + if (is_valid_hostname == -1) { + od_config_reader_error( + reader, NULL, + "could not compile regex"); + return NOT_OK_RESPONSE; + } else if (is_valid_hostname == 0) { + address_range.is_hostname = 1; + } else { + od_config_reader_error(reader, NULL, + "invalid address"); + return NOT_OK_RESPONSE; + } + } else if (mask_str) { + if (od_address_range_read_prefix(&address_range, + mask_str) == -1) { + od_config_reader_error( + reader, NULL, + "invalid network prefix length"); + return NOT_OK_RESPONSE; + } + } else { + od_config_reader_error(reader, NULL, + "expected network mask"); + return NOT_OK_RESPONSE; + } + } + /* ensure rule does not exists and add new rule */ od_rule_t *rule; - rule = od_rules_match(reader->rules, db_name, user_name, db_is_default, - user_is_default, 0); + rule = od_rules_match(reader->rules, db_name, user_name, &address_range, + db_is_default, user_is_default, 0); if (rule) { od_errorf(reader->error, "route '%s.%s': is redefined", db_name, user_name); free(user_name); return NOT_OK_RESPONSE; } + rule = od_rules_add(reader->rules); if (rule == NULL) { free(user_name); return NOT_OK_RESPONSE; } + rule->user_is_default = user_is_default; rule->user_name_len = user_name_len; rule->user_name = strdup(user_name); free(user_name); if (rule->user_name == NULL) return NOT_OK_RESPONSE; + rule->db_is_default = db_is_default; rule->db_name_len = db_name_len; rule->db_name = strdup(db_name); if (rule->db_name == NULL) return NOT_OK_RESPONSE; + address_range.string_value_len = strlen(address_range.string_value); + rule->address_range = address_range; + + free(addr_str); + /* { */ if (!od_config_reader_symbol(reader, '{')) return NOT_OK_RESPONSE; @@ -1844,8 +1961,9 @@ static inline int od_config_reader_watchdog(od_config_reader_t *reader, /* ensure rule does not exists and add new rule */ od_rule_t *rule; + od_address_range_t address_range = od_address_range_create_default(); rule = od_rules_match(reader->rules, watchdog->route_db, - watchdog->route_usr, 0, 0, 1); + watchdog->route_usr, &address_range, 0, 0, 1); if (rule) { od_errorf(reader->error, "route '%s.%s': is redefined", watchdog->route_db, watchdog->route_usr); @@ -1867,6 +1985,8 @@ static inline int od_config_reader_watchdog(od_config_reader_t *reader, if (rule->db_name == NULL) return NOT_OK_RESPONSE; + rule->address_range = address_range; + /* { */ if (!od_config_reader_symbol(reader, '{')) return NOT_OK_RESPONSE; diff --git a/sources/dns.c b/sources/dns.c index b171765d8..76865ea17 100644 --- a/sources/dns.c +++ b/sources/dns.c @@ -11,8 +11,8 @@ #include #include -static int od_getsockaddrname(struct sockaddr *sa, char *buf, int size, - int add_addr, int add_port) +int od_getsockaddrname(struct sockaddr *sa, char *buf, int size, int add_addr, + int add_port) { char addr[128]; if (sa->sa_family == AF_INET) { diff --git a/sources/dns.h b/sources/dns.h index 542647fd6..44956a8a8 100644 --- a/sources/dns.h +++ b/sources/dns.h @@ -7,6 +7,7 @@ * Scalable PostgreSQL connection pooler. */ +int od_getsockaddrname(struct sockaddr *, char *, int, int, int); int od_getaddrname(struct addrinfo *, char *, int, int, int); int od_getpeername(machine_io_t *, char *, int, int, int); int od_getsockname(machine_io_t *, char *, int, int, int); diff --git a/sources/frontend.c b/sources/frontend.c index 72e113dc6..39c8ba164 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -229,6 +229,8 @@ od_frontend_attach(od_client_t *client, char *context, server->id.id_prefix, (int)sizeof(server->id.id_prefix), server->id.id); + assert(od_server_synchronized(server)); + /* connect to server, if necessary */ if (server->io.io) { return OD_OK; @@ -279,7 +281,7 @@ od_frontend_attach_and_deploy(od_client_t *client, char *context) return OD_ESERVER_WRITE; /* set number of replies to discard */ - client->server->deploy_sync = rc; + server->deploy_sync = rc; od_server_sync_request(server, server->deploy_sync); return OD_OK; @@ -726,6 +728,9 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, od_server_t *server = client->server; od_route_t *route = client->route; od_instance_t *instance = client->global->instance; + od_frontend_status_t retstatus; + + retstatus = OD_OK; kiwi_be_type_t type = *data; if (instance->config.log_debug) @@ -764,6 +769,11 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, is_ready_for_query = 1; od_backend_ready(server, data, size); + /* exactly one RFQ! */ + if (od_server_in_sync_point(server)) { + retstatus = OD_SKIP; + } + if (is_deploy) server->deploy_sync--; @@ -786,14 +796,14 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, case KIWI_BE_PARSE_COMPLETE: if (route->rule->pool->reserve_prepared_statement) { // skip msg - is_deploy = 1; + retstatus = OD_SKIP; } default: break; } /* discard replies during configuration deploy */ - if (is_deploy) + if (is_deploy || retstatus == OD_SKIP) return OD_SKIP; if (route->id.physical_rep || route->id.logical_rep) { @@ -803,7 +813,8 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, return OD_DETACH; } } else { - if (is_ready_for_query && od_server_synchronized(server)) { + if (is_ready_for_query && od_server_synchronized(server) && + server->parse_msg == NULL) { switch (route->rule->pool->pool) { case OD_RULE_POOL_STATEMENT: return OD_DETACH; @@ -822,7 +833,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, } } - return OD_OK; + return retstatus; } static inline od_retcode_t od_frontend_log_query(od_instance_t *instance, @@ -948,31 +959,110 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance, // 8 hex #define OD_HASH_LEN 9 -static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, - int opname_start_offset, - int operator_name_len, - od_hash_t body_hash) +static inline machine_msg_t * +od_frontend_rewrite_msg(char *data, int size, int opname_start_offset, + int operator_name_len, char *opname, int opnamelen) { machine_msg_t *msg = - machine_msg_create(size - operator_name_len + OD_HASH_LEN); + machine_msg_create(size - operator_name_len + opnamelen); char *rewrite_data = machine_msg_data(msg); // packet header memcpy(rewrite_data, data, opname_start_offset); // prefix for opname - od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x", - body_hash); + od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname); // rest of msg - memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN, + memcpy(rewrite_data + opname_start_offset + opnamelen, data + opname_start_offset + operator_name_len, size - opname_start_offset - operator_name_len); // set proper size to package kiwi_header_set_size((kiwi_header_t *)rewrite_data, - size - operator_name_len + OD_HASH_LEN); + size - operator_name_len + opnamelen); return msg; } +static od_frontend_status_t od_frontend_deploy_prepared_stmt( + od_server_t *server, od_relay_t *relay, char *ctx, char *data, + int size /* to adcance or to write? */, od_hash_t body_hash, + char *opname, int opnamelen) +{ + od_route_t *route = server->route; + od_instance_t *instance = server->global->instance; + od_client_t *client = server->client; + + od_hashmap_elt_t desc; + desc.data = data; + desc.len = size; + + od_debug(&instance->logger, ctx, client, server, + "statement: %.*s, hash: %08x", desc.len, desc.data, body_hash); + + int refcnt = 0; + od_hashmap_elt_t value; + value.data = &refcnt; + value.len = sizeof(int); + od_hashmap_elt_t *value_ptr = &value; + + // send parse msg if needed + if (od_hashmap_insert(server->prep_stmts, body_hash, &desc, + &value_ptr) == 0) { + od_debug(&instance->logger, ctx, client, server, + "deploy %.*s operator %.*s to server", desc.len, + desc.data, opnamelen, opname); + // rewrite msg + // allocate prepered statement under name equal to body hash + + od_stat_parse(&route->stats); + + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen, + desc.data, desc.len); + if (pmsg == NULL) { + return OD_ESERVER_WRITE; + } + + if (instance->config.log_query || route->rule->log_query) { + od_frontend_log_parse(instance, client, "rewrite parse", + machine_msg_data(pmsg), + machine_msg_size(pmsg)); + } + + od_stat_parse(&route->stats); + // msg deallocated here + od_dbg_printf_on_dvl_lvl(1, "relay %p write msg %c\n", relay, + *(char *)machine_msg_data(pmsg)); + + od_write(&server->io, pmsg); + // advance? + // machine_iov_add(relay->iov, pmsg); + + return OD_OK; + } else { + int *refcnt; + refcnt = value_ptr->data; + *refcnt = 1 + *refcnt; + + od_stat_parse_reuse(&route->stats); + return OD_OK; + } +} + +static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg( + od_server_t *server, od_relay_t *relay, char *ctx, + machine_msg_t *msg /* to adcance or to write? */ +) +{ + char *data = machine_msg_data(msg); + int size = machine_msg_size(msg); + + od_hash_t body_hash = od_murmur_hash(data, size); + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size, + body_hash, opname, OD_HASH_LEN); +} + static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, char *data, int size) { @@ -990,14 +1080,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, configuration */ od_server_t *server = client->server; assert(server != NULL); + assert(server->parse_msg == NULL); if (instance->config.log_debug) od_debug(&instance->logger, "remote client", client, server, "%s", kiwi_fe_type_to_string(type)); od_frontend_status_t retstatus = OD_OK; - machine_msg_t *msg; - msg = NULL; bool forwarded = 0; switch (type) { case KIWI_FE_COPY_DONE: @@ -1031,16 +1120,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ECLIENT_READ; } if (type == KIWI_FE_DESCRIBE_PORTAL) { - break; // skip this, we obly need to rewrite statement + break; // skip this, we only need to rewrite statement } assert(client->prep_stmt_ids); retstatus = OD_SKIP; - int opname_start_offset = - kiwi_be_describe_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; - } od_hashmap_elt_t key; key.len = operator_name_len; @@ -1062,63 +1146,18 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hash_t body_hash = od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite describe", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - char opname[OD_HASH_LEN]; od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - int refcnt = 0; - od_hashmap_elt_t value; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - - // send parse msg if needed - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before describe", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - - msg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); - } - - od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "describe", - NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } - } else { - int *refcnt; - refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { + return OD_ESERVER_WRITE; } + machine_msg_t *msg; msg = kiwi_fe_write_describe(NULL, 'S', opname, OD_HASH_LEN); @@ -1134,14 +1173,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } // msg if deallocated automaictly - rc = od_write(&server->io, msg); + machine_iov_add(relay->iov, msg); + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", relay, + *(char *)machine_msg_data(msg)); forwarded = 1; - if (rc == -1) { - od_error(&instance->logger, "describe", NULL, - server, "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } } break; case KIWI_FE_PARSE: @@ -1151,7 +1187,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, if (route->rule->pool->reserve_prepared_statement) { /* skip client parse msg */ - retstatus = OD_SKIP; + retstatus = OD_REQ_SYNC; kiwi_prepared_statement_t desc; int rc; rc = kiwi_be_read_parse_dest(data, size, &desc); @@ -1176,17 +1212,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hashmap_elt_t *value_ptr = &value; - int opname_start_offset = - kiwi_be_parse_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; + server->parse_msg = + machine_msg_create(desc.description_len); + if (server->parse_msg == NULL) { + return OD_ESERVER_WRITE; } - - od_hash_t body_hash = - od_murmur_hash(data + opname_start_offset + - desc.operator_name_len, - size - opname_start_offset - - desc.operator_name_len); + memcpy(machine_msg_data(server->parse_msg), + desc.description, desc.description_len); assert(client->prep_stmt_ids); if (od_hashmap_insert(client->prep_stmt_ids, keyhash, @@ -1194,128 +1226,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, if (value_ptr->len != desc.description_len || strncmp(desc.description, value_ptr->data, value_ptr->len) != 0) { - value_ptr->len = desc.description_len; - value_ptr->data = desc.description; - - /* redeploy - * previous + /* + * Raise error: * client allocated prepared stmt with same name */ - char buf[OD_HASH_LEN]; - od_snprintf(buf, OD_HASH_LEN, "%08x", - body_hash); - - msg = kiwi_fe_write_close( - NULL, 'S', buf, OD_HASH_LEN); - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "parse", NULL, server, - "write error: %s", - od_io_error( - &server->io)); - return OD_ESERVER_WRITE; - } - msg = kiwi_fe_write_parse_description( - NULL, buf, OD_HASH_LEN, - desc.description, - desc.description_len); - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - } else { - char buf[OD_HASH_LEN]; - od_snprintf(buf, OD_HASH_LEN, "%08x", - body_hash); - msg = kiwi_fe_write_parse_description( - NULL, buf, OD_HASH_LEN, - desc.description, - desc.description_len); - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - } - } else { - char buf[OD_HASH_LEN]; - od_snprintf(buf, OD_HASH_LEN, "%08x", - body_hash); - msg = kiwi_fe_write_parse_description( - NULL, buf, OD_HASH_LEN, - desc.description, desc.description_len); - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - } - - key.len = desc.description_len; - key.data = desc.description; - - int refcnt = 0; - value.data = &refcnt; - value.len = sizeof(int); - - value_ptr = &value; - - if (od_hashmap_insert(server->prep_stmts, body_hash, - &key, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse initial deploy", client, - server, - "deploy %.*s operator hash %u to server", - key.len, key.data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); - } - - // stat backend parse msg - od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "parse", - NULL, server, - "write error: %s", - od_io_error(&server->io)); return OD_ESERVER_WRITE; } - } else { - int *refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; - - if (instance->config.log_query || - route->rule->log_query) { - od_stat_parse_reuse(&route->stats); - od_log(&instance->logger, "parse", - client, server, - "stmt already exists, simply report its ok"); - } - machine_msg_free(msg); - } - - machine_msg_t *pmsg; - pmsg = kiwi_be_write_parse_complete(NULL); - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - rc = od_write(&client->io, pmsg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "parse", client, - NULL, "write error: %s", - od_io_error(&client->io)); - return OD_ESERVER_WRITE; } } break; @@ -1337,12 +1253,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ECLIENT_READ; } - int opname_start_offset = - kiwi_be_bind_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; - } - od_hashmap_elt_t key; key.len = operator_name_len; key.data = operator_name; @@ -1364,66 +1274,28 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hash_t body_hash = od_murmur_hash(desc->data, desc->len); - - od_debug(&instance->logger, "rewrite bind", client, - server, "statement: %.*s, hash: %08x", - desc->len, desc->data, body_hash); - - od_hashmap_elt_t value; - int refcnt = 1; - value.data = &refcnt; - value.len = sizeof(int); - od_hashmap_elt_t *value_ptr = &value; - char opname[OD_HASH_LEN]; od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); - if (od_hashmap_insert(server->prep_stmts, body_hash, - desc, &value_ptr) == 0) { - od_debug( - &instance->logger, - "rewrite parse before bind", client, - server, - "deploy %.*s operator hash %u to server", - desc->len, desc->data, keyhash); - // rewrite msg - // allocate prepered statement under name equal to body hash - - msg = kiwi_fe_write_parse_description( - NULL, opname, OD_HASH_LEN, desc->data, - desc->len); - - if (msg == NULL) { - return OD_ESERVER_WRITE; - } - - if (instance->config.log_query || - route->rule->log_query) { - od_frontend_log_parse( - instance, client, - "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); - } + /* fill internals structs in, send parse if needed */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "parse before bind", + desc->data, desc->len, body_hash, opname, + OD_HASH_LEN) != OD_OK) { + return OD_ESERVER_WRITE; + } - od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "rewrite parse", NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } - } else { - int *refcnt = value_ptr->data; - *refcnt = 1 + *refcnt; + int opname_start_offset = + kiwi_be_bind_opname_offset(data, size); + if (opname_start_offset < 0) { + return OD_ECLIENT_READ; } + machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, - operator_name_len, - body_hash); + operator_name_len, opname, + OD_HASH_LEN); if (msg == NULL) { return OD_ESERVER_WRITE; @@ -1437,15 +1309,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - rc = od_write(&server->io, msg); - forwarded = 1; + machine_iov_add(relay->iov, msg); - if (rc == -1) { - od_error(&instance->logger, "rewrite bind", - NULL, server, "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", relay, + *(char *)machine_msg_data(msg)); + forwarded = 1; } break; case KIWI_FE_EXECUTE: @@ -1514,12 +1383,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } /* If the retstatus is not SKIP */ - if (route->rule->pool->reserve_prepared_statement && forwarded != 1 && - msg != NULL) { - msg = kiwi_fe_copy_msg(msg, data, size); - od_write(&server->io, msg); - retstatus = OD_SKIP; - } /* update server stats */ od_stat_query_start(&server->stats_state); return retstatus; @@ -1596,9 +1459,10 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, } static inline od_frontend_status_t -od_frontend_remote_process_server(od_server_t *server, od_client_t *client) +od_frontend_remote_process_server(od_server_t *server, od_client_t *client, + bool await_read) { - od_frontend_status_t status = od_relay_step(&server->relay); + od_frontend_status_t status = od_relay_step(&server->relay, await_read); int rc; od_instance_t *instance = client->global->instance; @@ -1754,9 +1618,10 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) break; server = client->server; + bool sync_req = 0; /* attach */ - status = od_relay_step(&client->relay); + status = od_relay_step(&client->relay, false); if (status == OD_ATTACH) { /* Check for replication lag and reject query if too big */ od_frontend_status_t catchup_status = @@ -1785,6 +1650,8 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) /* retry read operation after attach */ continue; + } else if (status == OD_REQ_SYNC) { + sync_req = 1; } else if (status != OD_OK) { break; } @@ -1792,7 +1659,97 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) if (server == NULL) continue; - status = od_frontend_remote_process_server(server, client); + status = od_frontend_remote_process_server(server, client, + false); + if (status != OD_OK) { + /* should not return this here */ + assert(status != OD_REQ_SYNC); + break; + } + + // are we requested to meet sync point? + + if (sync_req) { + od_log(&instance->logger, "sync-point", client, server, + "process, %d", od_server_synchronized(server)); + + while (1) { + if (od_server_synchronized(server)) { + break; + } + // await here + + od_log(&instance->logger, "sync-point", client, + server, "process await"); + status = od_frontend_remote_process_server( + server, client, true); + + if (status != OD_OK) { + break; + } + } + + if (status != OD_OK) { + break; + } + + // deploy here + + assert(server->parse_msg != NULL); + + /* fill internals structs in */ + if (od_frontend_deploy_prepared_stmt_msg( + server, &server->relay, "sync-point", + server->parse_msg) != OD_OK) { + status = OD_ESERVER_WRITE; + break; + } + server->parse_msg = NULL; + + machine_msg_t *msg; + msg = kiwi_fe_write_sync(NULL); + if (msg == NULL) { + status = OD_ESERVER_WRITE; + break; + } + rc = od_write(&server->io, msg); + if (rc == -1) { + status = OD_ESERVER_WRITE; + break; + } + + /* enter sync piont mode */ + server->sync_point = 1; + od_server_sync_request(server, 1); + + while (1) { + if (od_server_synchronized(server)) { + break; + } + // await here + + od_log(&instance->logger, "sync-point", client, + server, "process await"); + status = od_frontend_remote_process_server( + server, client, true); + + if (status != OD_OK) { + break; + } + } + + server->sync_point = 0; + if (status != OD_OK) { + break; + } + + machine_msg_t *pmsg; + pmsg = kiwi_be_write_parse_complete(NULL); + if (pmsg == NULL) { + return OD_ECLIENT_WRITE; + } + machine_iov_add(server->relay.iov, pmsg); + } if (status != OD_OK) { break; } @@ -1959,6 +1916,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context, break; case OD_UNDEF: case OD_SKIP: + case OD_REQ_SYNC: case OD_ATTACH: /* fallthrough */ case OD_DETACH: @@ -2110,6 +2068,9 @@ void od_frontend(void *arg) goto cleanup; } + char peer[128]; + od_getpeername(client->io.io, peer, sizeof(peer), 1, 0); + if (instance->config.log_session) { od_log(&instance->logger, "startup", client, NULL, "route '%s.%s' to '%s.%s'", diff --git a/sources/hba.c b/sources/hba.c index ee54b25fb..4badc9f00 100644 --- a/sources/hba.c +++ b/sources/hba.c @@ -39,32 +39,6 @@ void od_hba_reload(od_hba_t *hba, od_hba_rules_t *rules) od_hba_unlock(hba); } -bool od_hba_validate_addr(od_hba_rule_t *rule, struct sockaddr_storage *sa) -{ - struct sockaddr_in *sin = (struct sockaddr_in *)sa; - struct sockaddr_in *rule_addr = (struct sockaddr_in *)&rule->addr; - struct sockaddr_in *rule_mask = (struct sockaddr_in *)&rule->mask; - in_addr_t client_addr = sin->sin_addr.s_addr; - in_addr_t client_net = rule_mask->sin_addr.s_addr & client_addr; - return (client_net ^ rule_addr->sin_addr.s_addr) == 0; -} - -bool od_hba_validate_addr6(od_hba_rule_t *rule, struct sockaddr_storage *sa) -{ - struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa; - struct sockaddr_in6 *rule_addr = (struct sockaddr_in6 *)&rule->addr; - struct sockaddr_in6 *rule_mask = (struct sockaddr_in6 *)&rule->mask; - for (int i = 0; i < 16; ++i) { - uint8_t client_net_byte = rule_mask->sin6_addr.s6_addr[i] & - sin->sin6_addr.s6_addr[i]; - if (client_net_byte ^ rule_addr->sin6_addr.s6_addr[i]) { - return false; - } - } - - return true; -} - bool od_hba_validate_name(char *client_name, od_hba_rule_name_t *name, char *client_other_name) { @@ -128,14 +102,9 @@ int od_hba_process(od_client_t *client) } else if (rule->connection_type == OD_CONFIG_HBA_HOSTNOSSL && client->startup.is_ssl_request) { continue; - } else if (sa.ss_family == AF_INET) { - if (rule->addr.ss_family != AF_INET || - !od_hba_validate_addr(rule, &sa)) { - continue; - } - } else if (sa.ss_family == AF_INET6) { - if (rule->addr.ss_family != AF_INET6 || - !od_hba_validate_addr6(rule, &sa)) { + } else if (sa.ss_family == AF_INET || + sa.ss_family == AF_INET6) { + if (!od_address_validate(&rule->address_range, &sa)) { continue; } } diff --git a/sources/hba_reader.c b/sources/hba_reader.c index 3a38b46b5..c9c5583c4 100644 --- a/sources/hba_reader.c +++ b/sources/hba_reader.c @@ -9,6 +9,7 @@ #include #include #include +#include enum { OD_LLOCAL, @@ -174,69 +175,6 @@ static int od_hba_reader_value(od_config_reader_t *reader, void **dest) } } -static int od_hba_reader_address(struct sockaddr_storage *dest, - const char *addr) -{ - int rc; - rc = inet_pton(AF_INET, addr, &((struct sockaddr_in *)dest)->sin_addr); - if (rc > 0) { - dest->ss_family = AF_INET; - return 0; - } - if (inet_pton(AF_INET6, addr, - &((struct sockaddr_in6 *)dest)->sin6_addr) > 0) { - dest->ss_family = AF_INET6; - return 0; - } - return -1; -} - -static inline uint32 od_hba_bswap32(uint32 x) -{ - return ((x << 24) & 0xff000000) | ((x << 8) & 0x00ff0000) | - ((x >> 8) & 0x0000ff00) | ((x >> 24) & 0x000000ff); -} - -int od_hba_reader_prefix(od_hba_rule_t *hba, char *prefix) -{ - char *end = NULL; - long len = strtoul(prefix, &end, 10); - if (*prefix == '\0' || *end != '\0') { - return -1; - } - if (hba->addr.ss_family == AF_INET) { - if (len > 32) - return -1; - struct sockaddr_in *addr = (struct sockaddr_in *)&hba->mask; - long mask; - if (len > 0) - mask = (0xffffffffUL << (32 - (int)len)) & 0xffffffffUL; - else - mask = 0; - addr->sin_addr.s_addr = od_hba_bswap32(mask); - return 0; - } else if (hba->addr.ss_family == AF_INET6) { - if (len > 128) - return -1; - struct sockaddr_in6 *addr = (struct sockaddr_in6 *)&hba->mask; - int i; - for (i = 0; i < 16; i++) { - if (len <= 0) - addr->sin6_addr.s6_addr[i] = 0; - else if (len >= 8) - addr->sin6_addr.s6_addr[i] = 0xff; - else { - addr->sin6_addr.s6_addr[i] = - (0xff << (8 - (int)len)) & 0xff; - } - len -= 8; - } - return 0; - } - - return -1; -} - static int od_hba_reader_name(od_config_reader_t *reader, struct od_hba_rule_name *name, bool is_db) { @@ -347,8 +285,8 @@ int od_hba_reader_parse(od_config_reader_t *reader) if (mask) *mask++ = 0; - if (od_hba_reader_address(&hba->addr, address) == - NOT_OK_RESPONSE) { + if (od_address_read(&hba->address_range.addr, + address) == NOT_OK_RESPONSE) { od_hba_reader_error(reader, "invalid IP address"); goto error; @@ -356,7 +294,8 @@ int od_hba_reader_parse(od_config_reader_t *reader) /* network mask */ if (mask) { - if (od_hba_reader_prefix(hba, mask) == -1) { + if (od_address_range_read_prefix( + &hba->address_range, mask) == -1) { od_hba_reader_error( reader, "invalid network prefix length"); @@ -371,8 +310,8 @@ int od_hba_reader_parse(od_config_reader_t *reader) "expected network mask"); goto error; } - if (od_hba_reader_address(&hba->mask, - address) == -1) { + if (od_address_read(&hba->address_range.mask, + address) == -1) { od_hba_reader_error( reader, "invalid network mask"); goto error; diff --git a/sources/hba_reader.h b/sources/hba_reader.h index 4a3e767b6..37c4a3d74 100644 --- a/sources/hba_reader.h +++ b/sources/hba_reader.h @@ -2,6 +2,5 @@ #define ODYSSEY_HBA_READER_H int od_hba_reader_parse(od_config_reader_t *reader); -int od_hba_reader_prefix(od_hba_rule_t *hba, char *prefix); #endif /* ODYSSEY_HBA_READER_H */ diff --git a/sources/hba_rule.h b/sources/hba_rule.h index 02900227f..acfbe512c 100644 --- a/sources/hba_rule.h +++ b/sources/hba_rule.h @@ -42,8 +42,7 @@ struct od_hba_rule { od_hba_rule_conn_type_t connection_type; od_hba_rule_name_t database; od_hba_rule_name_t user; - struct sockaddr_storage addr; - struct sockaddr_storage mask; + od_address_range_t address_range; od_hba_rule_auth_method_t auth_method; od_list_t link; }; diff --git a/sources/ldap.c b/sources/ldap.c index 0129b0668..a71300849 100644 --- a/sources/ldap.c +++ b/sources/ldap.c @@ -358,8 +358,7 @@ od_ldap_server_t *od_ldap_server_pull(od_logger_t *logger, od_rule_t *rule, od_debug(logger, "auth_ldap", NULL, NULL, "pulling ldap_server from ldap_pool"); if (rule->ldap_pool_ttl > 0) { - if ((int)time(NULL) - - ldap_server->idle_timestamp > + if (time(NULL) - ldap_server->idle_timestamp > rule->ldap_pool_ttl) { od_debug( logger, "auth_ldap", NULL, NULL, @@ -418,7 +417,6 @@ od_ldap_server_t *od_ldap_server_pull(od_logger_t *logger, od_rule_t *rule, rc = od_ldap_endpoint_wait(le, timeout); if (rc == -1) { - od_ldap_endpoint_unlock(le); return NULL; } @@ -476,7 +474,7 @@ static inline od_retcode_t od_ldap_server_attach(od_client_t *client) OD_SERVER_UNDEF); od_ldap_server_free(server); } else { - server->idle_timestamp = (int)time(NULL); + server->idle_timestamp = time(NULL); od_ldap_server_pool_set( client->rule->ldap_endpoint->ldap_search_pool, server, OD_SERVER_IDLE); @@ -523,7 +521,7 @@ od_retcode_t od_auth_ldap(od_client_t *cl, kiwi_password_t *tok) switch (ldap_rc) { case LDAP_SUCCESS: { - serv->idle_timestamp = (int)time(NULL); + serv->idle_timestamp = time(NULL); od_ldap_server_pool_set(cl->rule->ldap_endpoint->ldap_auth_pool, serv, OD_SERVER_IDLE); rc = OK_RESPONSE; @@ -532,7 +530,7 @@ od_retcode_t od_auth_ldap(od_client_t *cl, kiwi_password_t *tok) case LDAP_INVALID_SYNTAX: /* fallthrough */ case LDAP_INVALID_CREDENTIALS: { - serv->idle_timestamp = (int)time(NULL); + serv->idle_timestamp = time(NULL); od_ldap_server_pool_set(cl->rule->ldap_endpoint->ldap_auth_pool, serv, OD_SERVER_IDLE); rc = NOT_OK_RESPONSE; @@ -654,7 +652,7 @@ od_retcode_t od_ldap_endpoint_free(od_ldap_endpoint_t *le) } if (le->ldap_auth_pool) { - od_ldap_server_pool_free(le->ldap_search_pool); + od_ldap_server_pool_free(le->ldap_auth_pool); } pthread_mutex_destroy(&le->lock); diff --git a/sources/mdb_iamproxy.c b/sources/mdb_iamproxy.c index 9c50f6836..dce7c85fc 100644 --- a/sources/mdb_iamproxy.c +++ b/sources/mdb_iamproxy.c @@ -176,6 +176,7 @@ int mdb_iamproxy_authenticate_user( od_error(&instance->logger, "auth", client, NULL, "failed to send username to msg_username"); authentication_result = MDB_IAMPROXY_CONN_ERROR; + machine_msg_free(msg_username); goto free_io; } @@ -190,6 +191,7 @@ int mdb_iamproxy_authenticate_user( od_error(&instance->logger, "auth", client, NULL, "failed to write token to msg_token"); authentication_result = MDB_IAMPROXY_CONN_ERROR; + machine_msg_free(msg_token); goto free_io; } @@ -252,6 +254,7 @@ int mdb_iamproxy_authenticate_user( free_auth_status: machine_msg_free(auth_status); free_io: + machine_close(io); machine_io_free(io); free_end: /*RETURN RESULT*/ diff --git a/sources/misc.c b/sources/misc.c index 76ac9bfb0..354c83be7 100644 --- a/sources/misc.c +++ b/sources/misc.c @@ -11,6 +11,32 @@ #include #include +int pg_strcasecmp(const char *s1, const char *s2) +{ + for (;;) { + unsigned char ch1 = (unsigned char)*s1++; + unsigned char ch2 = (unsigned char)*s2++; + + if (ch1 != ch2) { + if (ch1 >= 'A' && ch1 <= 'Z') + ch1 += 'a' - 'A'; + else if (IS_HIGHBIT_SET(ch1) && isupper(ch1)) + ch1 = tolower(ch1); + + if (ch2 >= 'A' && ch2 <= 'Z') + ch2 += 'a' - 'A'; + else if (IS_HIGHBIT_SET(ch2) && isupper(ch2)) + ch2 = tolower(ch2); + + if (ch1 != ch2) + return (int)ch1 - (int)ch2; + } + if (ch1 == 0) + break; + } + return 0; +} + int pg_strncasecmp(const char *s1, const char *s2, size_t n) { while (n-- > 0) { diff --git a/sources/misc.h b/sources/misc.h index 19bf65324..533024aea 100644 --- a/sources/misc.h +++ b/sources/misc.h @@ -7,6 +7,7 @@ * Scalable PostgreSQL connection pooler. */ +extern int pg_strcasecmp(const char *s1, const char *s2); extern bool parse_bool(const char *value, bool *result); extern bool parse_bool_with_len(const char *value, size_t len, bool *result); diff --git a/sources/od_ldap.h b/sources/od_ldap.h index a339f49b6..4a90e9e68 100644 --- a/sources/od_ldap.h +++ b/sources/od_ldap.h @@ -13,7 +13,7 @@ typedef struct { od_global_t *global; void *route; - int idle_timestamp; + int64_t idle_timestamp; od_list_t link; } od_ldap_server_t; diff --git a/sources/odyssey.h b/sources/odyssey.h index f8c6961b7..8e9da95bd 100644 --- a/sources/odyssey.h +++ b/sources/odyssey.h @@ -52,6 +52,8 @@ #include "sources/pam.h" #endif +#include "sources/address.h" + #include "sources/storage.h" #include "sources/group.h" #include "sources/pool.h" diff --git a/sources/relay.h b/sources/relay.h index 61c66b1c6..7bee82431 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -177,6 +177,8 @@ static inline od_frontend_status_t od_relay_on_packet_msg(od_relay_t *relay, case OD_SKIP: status = OD_OK; /* fallthrough */ + case OD_REQ_SYNC: + /* fallthrough */ default: machine_msg_free(msg); break; @@ -196,9 +198,16 @@ static inline od_frontend_status_t od_relay_on_packet(od_relay_t *relay, /* fallthrough */ case OD_DETACH: rc = machine_iov_add_pointer(relay->iov, data, size); + + od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay, + *data); if (rc == -1) return OD_EOOM; break; + case OD_REQ_SYNC: + /* fallthrough */ + relay->packet_skip = 1; + break; case OD_SKIP: relay->packet_skip = 1; status = OD_OK; @@ -277,7 +286,10 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) if (relay->packet_skip) return OD_OK; + rc = machine_iov_add_pointer(relay->iov, data, to_parse); + + od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay, *data); if (rc == -1) return OD_EOOM; @@ -294,6 +306,9 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) rc = od_relay_process(relay, &progress, current, end - current); current += progress; od_readahead_pos_read_advance(&relay->src->readahead, progress); + if (rc == OD_REQ_SYNC) { + return OD_REQ_SYNC; + } if (rc != OD_OK) { if (rc == OD_UNDEF) rc = OD_OK; @@ -354,11 +369,17 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) return OD_OK; } -static inline od_frontend_status_t od_relay_step(od_relay_t *relay) +static inline od_frontend_status_t od_relay_step(od_relay_t *relay, + bool await_read) { /* on read event */ + od_frontend_status_t retstatus; + retstatus = OD_OK; int rc; - if (machine_cond_try(relay->src->on_read)) { + rc = await_read ? + (machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0) : + machine_cond_try(relay->src->on_read); + if (rc || od_relay_data_pending(relay)) { if (relay->dst == NULL) { /* signal to retry on read logic */ machine_cond_signal(relay->src->on_read); @@ -371,7 +392,9 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) rc = od_relay_pipeline(relay); - if (rc != OD_OK) + if (rc == OD_REQ_SYNC) { + retstatus = OD_REQ_SYNC; + } else if (rc != OD_OK) return rc; if (machine_iov_pending(relay->iov)) { @@ -383,7 +406,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } if (relay->dst == NULL) - return OD_OK; + return retstatus; /* on write event */ if (machine_cond_try(relay->dst->on_write)) { @@ -408,7 +431,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } } - return OD_OK; + return retstatus; } static inline od_frontend_status_t od_relay_flush(od_relay_t *relay) diff --git a/sources/reset.c b/sources/reset.c index 757a70107..33698ab06 100644 --- a/sources/reset.c +++ b/sources/reset.c @@ -75,11 +75,12 @@ int od_reset(od_server_t *server) wait_timeout, wait_try); wait_try++; rc = od_backend_ready_wait(server, "reset", 1, - wait_timeout); - if (rc == -1) + wait_timeout, + 1 /*ignore server errors*/); + if (rc == NOT_OK_RESPONSE) break; } - if (rc == -1) { + if (rc == NOT_OK_RESPONSE) { if (!machine_timedout()) goto error; @@ -104,13 +105,28 @@ int od_reset(od_server_t *server) wait_try_cancel++; rc = od_cancel(server->global, route->rule->storage, &server->key, &server->id); - if (rc == -1) + if (rc == NOT_OK_RESPONSE) goto error; continue; } assert(od_server_synchronized(server)); break; } + + /* Request one more sync point here. + * In `od_server_synchronized` we + * count number of sync/query msg send to connection + * and number of RFQ received, if this numbers are equal, + * we decide server connection as sync. However, this might be + * not true, if client-server relay advanced some extended proto + * msgs without sync. To safely execute discard queries, we need to + * advadance sync point first. + */ + + if (od_backend_request_sync_point(server) == NOT_OK_RESPONSE) { + goto error; + } + od_debug(&instance->logger, "reset", server->client, server, "synchronized"); @@ -119,11 +135,11 @@ int od_reset(od_server_t *server) if (route->rule->pool->rollback) { if (server->is_transaction) { char query_rlb[] = "ROLLBACK"; - rc = od_backend_query(server, "reset-rollback", - query_rlb, NULL, - sizeof(query_rlb), wait_timeout, - 1); - if (rc == -1) + rc = od_backend_query( + server, "reset-rollback", query_rlb, NULL, + sizeof(query_rlb), wait_timeout, 1, + 0 /*do not ignore server error messages*/); + if (rc == NOT_OK_RESPONSE) goto error; assert(!server->is_transaction); } @@ -132,9 +148,10 @@ int od_reset(od_server_t *server) /* send DISCARD ALL */ if (route->rule->pool->discard) { char query_discard[] = "DISCARD ALL"; - rc = od_backend_query(server, "reset-discard", query_discard, - NULL, sizeof(query_discard), wait_timeout, - 1); + rc = od_backend_query( + server, "reset-discard", query_discard, NULL, + sizeof(query_discard), wait_timeout, 1, + 0 /*do not ignore server error messages*/); if (rc == NOT_OK_RESPONSE) goto error; } @@ -144,18 +161,20 @@ int od_reset(od_server_t *server) route->rule->pool->discard_query == NULL) { char query_discard[] = "SET SESSION AUTHORIZATION DEFAULT;RESET ALL;CLOSE ALL;UNLISTEN *;SELECT pg_advisory_unlock_all();DISCARD PLANS;DISCARD SEQUENCES;DISCARD TEMP;"; - rc = od_backend_query(server, "reset-discard-smart", - query_discard, NULL, - sizeof(query_discard), wait_timeout, 1); + rc = od_backend_query( + server, "reset-discard-smart", query_discard, NULL, + sizeof(query_discard), wait_timeout, 1, + 0 /*do not ignore server error messages*/); if (rc == NOT_OK_RESPONSE) goto error; } if (route->rule->pool->discard_query != NULL) { - rc = od_backend_query(server, "reset-discard-smart-string", - route->rule->pool->discard_query, NULL, - strlen(route->rule->pool->discard_query) + - 1, - wait_timeout, 1); + rc = od_backend_query( + server, "reset-discard-smart-string", + route->rule->pool->discard_query, NULL, + strlen(route->rule->pool->discard_query) + 1, + wait_timeout, 1, + 0 /*do not ignore server error messages*/); if (rc == NOT_OK_RESPONSE) goto error; } diff --git a/sources/route_id.h b/sources/route_id.h index 4e42c21ad..8d9ae24ad 100644 --- a/sources/route_id.h +++ b/sources/route_id.h @@ -43,6 +43,7 @@ static inline int od_route_id_copy(od_route_id_t *dest, od_route_id_t *id) return -1; memcpy(dest->database, id->database, id->database_len); dest->database_len = id->database_len; + dest->user = malloc(id->user_len); if (dest->user == NULL) { free(dest->database); @@ -51,6 +52,7 @@ static inline int od_route_id_copy(od_route_id_t *dest, od_route_id_t *id) } memcpy(dest->user, id->user, id->user_len); dest->user_len = id->user_len; + dest->physical_rep = id->physical_rep; dest->logical_rep = id->logical_rep; return 0; diff --git a/sources/router.c b/sources/router.c index 0c0aa6f4a..3b1a0b6e9 100644 --- a/sources/router.c +++ b/sources/router.c @@ -103,7 +103,9 @@ static inline int od_drop_obsolete_rule_connections_cb(od_route_t *route, assert(rule); assert(obsolete_rule); if (strcmp(rule->user_name, obsolete_rule->usr_name) == 0 && - strcmp(rule->db_name, obsolete_rule->db_name) == 0) { + strcmp(rule->db_name, obsolete_rule->db_name) == 0 && + od_address_range_equals(&rule->address_range, + &obsolete_rule->address_range)) { od_route_kill_client_pool(route); return 0; } @@ -138,7 +140,8 @@ int od_router_reconfigure(od_router_t *router, od_rules_t *rules) od_rule_key_t *rk; rk = od_container_of(i, od_rule_key_t, link); od_log(&instance->logger, "reload config", NULL, NULL, - "added rule: %s %s", rk->usr_name, rk->db_name); + "added rule: %s %s %s", rk->usr_name, + rk->db_name, rk->address_range.string_value); } od_list_foreach(&deleted, i) @@ -146,8 +149,8 @@ int od_router_reconfigure(od_router_t *router, od_rules_t *rules) od_rule_key_t *rk; rk = od_container_of(i, od_rule_key_t, link); od_log(&instance->logger, "reload config", NULL, NULL, - "deleted rule: %s %s", rk->usr_name, - rk->db_name); + "deleted rule: %s %s %s", rk->usr_name, + rk->db_name, rk->address_range.string_value); } { @@ -350,14 +353,24 @@ od_router_status_t od_router_route(od_router_t *router, od_client_t *client) /* match latest version of route rule */ od_rule_t *rule = NULL; // initialize rule for (line 365) and flag '-Wmaybe-uninitialized' + + struct sockaddr_storage sa; + int salen; + struct sockaddr *saddr; + int rc; switch (client->type) { case OD_POOL_CLIENT_INTERNAL: rule = od_rules_forward(&router->rules, startup->database.value, - startup->user.value, 1); + startup->user.value, NULL, 1); break; case OD_POOL_CLIENT_EXTERNAL: + salen = sizeof(sa); + saddr = (struct sockaddr *)&sa; + rc = machine_getpeername(client->io.io, saddr, &salen); + if (rc == -1) + return OD_ROUTER_ERROR; rule = od_rules_forward(&router->rules, startup->database.value, - startup->user.value, 0); + startup->user.value, &sa, 0); break; case OD_POOL_CLIENT_UNDEF: // create that case for correct work of '-Wswitch' flag break; @@ -368,8 +381,9 @@ od_router_status_t od_router_route(od_router_t *router, od_client_t *client) return OD_ROUTER_ERROR_NOT_FOUND; } od_debug(&instance->logger, "routing", NULL, NULL, - "matching rule: %s %s with %s routing type to %s client", + "matching rule: %s %s %s with %s routing type to %s client", rule->db_name, rule->user_name, + rule->address_range.string_value, rule->pool->routing_type == NULL ? "client visible" : rule->pool->routing_type, client->type == OD_POOL_CLIENT_INTERNAL ? "internal" : @@ -420,7 +434,7 @@ od_router_status_t od_router_route(od_router_t *router, od_client_t *client) switch (ldap_rc) { case OK_RESPONSE: { od_ldap_endpoint_lock(rule->ldap_endpoint); - ldap_server->idle_timestamp = (int)time(NULL); + ldap_server->idle_timestamp = time(NULL); od_ldap_server_pool_set( rule->ldap_endpoint->ldap_search_pool, ldap_server, OD_SERVER_IDLE); @@ -439,7 +453,7 @@ od_router_status_t od_router_route(od_router_t *router, od_client_t *client) } case LDAP_INSUFFICIENT_ACCESS: { od_ldap_endpoint_lock(rule->ldap_endpoint); - ldap_server->idle_timestamp = (int)time(NULL); + ldap_server->idle_timestamp = time(NULL); od_ldap_server_pool_set( rule->ldap_endpoint->ldap_search_pool, ldap_server, OD_SERVER_IDLE); @@ -644,6 +658,10 @@ od_router_status_t od_router_attach(od_router_t *router, od_client_t *client, if (server == NULL) return OD_ROUTER_ERROR; od_id_generate(&server->id, "s"); + od_dbg_printf_on_dvl_lvl(1, "server %s%.*s has relay %p\n", + server->id.id_prefix, + (signed)sizeof(server->id.id), server->id.id, + &server->relay); server->global = client->global; server->route = route; @@ -658,6 +676,8 @@ od_router_status_t od_router_attach(od_router_t *router, od_client_t *client, server->idle_time = 0; server->key_client = client->key; + assert(od_server_synchronized(server)); + /* * XXX: this logic breaks some external solutions that use * PostgreSQL logical replication. Need to tests this and fix @@ -692,6 +712,9 @@ void od_router_detach(od_router_t *router, od_client_t *client) /* detach from current machine event loop */ od_server_t *server = client->server; + + assert(server != NULL); + assert(od_server_synchronized(server)); od_io_detach(&server->io); od_route_lock(route); diff --git a/sources/rules.c b/sources/rules.c index 6f240a28c..4447232fa 100644 --- a/sources/rules.c +++ b/sources/rules.c @@ -450,6 +450,8 @@ void od_rules_rule_free(od_rule_t *rule) free(rule->db_name); if (rule->user_name) free(rule->user_name); + if (rule->address_range.string_value) + free(rule->address_range.string_value); if (rule->password) free(rule->password); if (rule->auth) @@ -530,12 +532,17 @@ void od_rules_unref(od_rule_t *rule) } od_rule_t *od_rules_forward(od_rules_t *rules, char *db_name, char *user_name, + struct sockaddr_storage *user_addr, int pool_internal) { - od_rule_t *rule_db_user = NULL; - od_rule_t *rule_db_default = NULL; - od_rule_t *rule_default_user = NULL; - od_rule_t *rule_default_default = NULL; + od_rule_t *rule_db_user_default = NULL; + od_rule_t *rule_db_default_default = NULL; + od_rule_t *rule_default_user_default = NULL; + od_rule_t *rule_default_default_default = NULL; + od_rule_t *rule_db_user_addr = NULL; + od_rule_t *rule_db_default_addr = NULL; + od_rule_t *rule_default_user_addr = NULL; + od_rule_t *rule_default_default_addr = NULL; od_list_t *i; od_list_foreach(&rules->rules, i) @@ -555,33 +562,67 @@ od_rule_t *od_rules_forward(od_rules_t *rules, char *db_name, char *user_name, } } if (rule->db_is_default) { - if (rule->user_is_default) - rule_default_default = rule; - else if (strcmp(rule->user_name, user_name) == 0) - rule_default_user = rule; + if (rule->user_is_default) { + if (rule->address_range.is_default) + rule_default_default_default = rule; + else if (od_address_validate( + &rule->address_range, + user_addr)) + rule_default_default_addr = rule; + } else if (strcmp(rule->user_name, user_name) == 0) { + if (rule->address_range.is_default) + rule_default_user_default = rule; + else if (od_address_validate( + &rule->address_range, + user_addr)) + rule_default_user_addr = rule; + } } else if (strcmp(rule->db_name, db_name) == 0) { - if (rule->user_is_default) - rule_db_default = rule; - else if (strcmp(rule->user_name, user_name) == 0) - rule_db_user = rule; + if (rule->user_is_default) { + if (rule->address_range.is_default) + rule_db_default_default = rule; + else if (od_address_validate( + &rule->address_range, + user_addr)) + rule_db_default_addr = rule; + } else if (strcmp(rule->user_name, user_name) == 0) { + if (rule->address_range.is_default) + rule_db_user_default = rule; + else if (od_address_validate( + &rule->address_range, + user_addr)) + rule_db_user_addr = rule; + } } } - if (rule_db_user) - return rule_db_user; + if (rule_db_user_addr) + return rule_db_user_addr; + + if (rule_db_user_default) + return rule_db_user_default; + + if (rule_db_default_addr) + return rule_db_default_addr; + + if (rule_default_user_addr) + return rule_default_user_addr; + + if (rule_db_default_default) + return rule_db_default_default; - if (rule_db_default) - return rule_db_default; + if (rule_default_user_default) + return rule_default_user_default; - if (rule_default_user) - return rule_default_user; + if (rule_default_default_addr) + return rule_default_default_addr; - return rule_default_default; + return rule_default_default_default; } od_rule_t *od_rules_match(od_rules_t *rules, char *db_name, char *user_name, - int db_is_default, int user_is_default, - int pool_internal) + od_address_range_t *address_range, int db_is_default, + int user_is_default, int pool_internal) { od_list_t *i; od_list_foreach(&rules->rules, i) @@ -601,15 +642,25 @@ od_rule_t *od_rules_match(od_rules_t *rules, char *db_name, char *user_name, } if (strcmp(rule->db_name, db_name) == 0 && strcmp(rule->user_name, user_name) == 0 && + rule->address_range.is_default == + address_range->is_default && rule->db_is_default == db_is_default && - rule->user_is_default == user_is_default) - return rule; + rule->user_is_default == user_is_default) { + if (address_range->is_default == 0) { + if (od_address_range_equals(&rule->address_range, + address_range)) + return rule; + } else { + return rule; + } + } } return NULL; } -static inline od_rule_t *od_rules_match_active(od_rules_t *rules, char *db_name, - char *user_name) +static inline od_rule_t * +od_rules_match_active(od_rules_t *rules, char *db_name, char *user_name, + od_address_range_t *address_range) { od_list_t *i; od_list_foreach(&rules->rules, i) @@ -619,7 +670,9 @@ static inline od_rule_t *od_rules_match_active(od_rules_t *rules, char *db_name, if (rule->obsolete) continue; if (strcmp(rule->db_name, db_name) == 0 && - strcmp(rule->user_name, user_name) == 0) + strcmp(rule->user_name, user_name) == 0 && + od_address_range_equals(&rule->address_range, + address_range)) return rule; } return NULL; @@ -881,7 +934,9 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, rule_new = od_container_of(j, od_rule_t, link); if (strcmp(rule_old->user_name, rule_new->user_name) == 0 && - strcmp(rule_old->db_name, rule_new->db_name) == 0) { + strcmp(rule_old->db_name, rule_new->db_name) == 0 && + od_address_range_equals(&rule_old->address_range, + &rule_new->address_range)) { ok = 1; break; } @@ -897,6 +952,9 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, rk->db_name = strndup(rule_old->db_name, rule_old->db_name_len); + od_address_range_copy(&rule_old->address_range, + &rk->address_range); + od_list_append(deleted, &rk->link); } }; @@ -917,7 +975,9 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, rule_old = od_container_of(j, od_rule_t, link); if (strcmp(rule_old->user_name, rule_new->user_name) == 0 && - strcmp(rule_old->db_name, rule_new->db_name) == 0) { + strcmp(rule_old->db_name, rule_new->db_name) == 0 && + od_address_range_equals(&rule_old->address_range, + &rule_new->address_range)) { ok = 1; break; } @@ -933,6 +993,9 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, rk->db_name = strndup(rule_new->db_name, rule_new->db_name_len); + od_address_range_copy(&rule_new->address_range, + &rk->address_range); + od_list_append(added, &rk->link); } }; @@ -946,7 +1009,8 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, /* find and compare origin rule */ od_rule_t *origin; origin = od_rules_match_active(rules, rule->db_name, - rule->user_name); + rule->user_name, + &rule->address_range); if (origin) { if (od_rules_rule_compare(origin, rule)) { origin->mark = 0; @@ -964,6 +1028,10 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, origin->user_name_len); rk->db_name = strndup(origin->db_name, origin->db_name_len); + + od_address_range_copy(&origin->address_range, + &rk->address_range); + od_list_append(to_drop, &rk->link); } @@ -1007,13 +1075,13 @@ __attribute__((hot)) int od_rules_merge(od_rules_t *rules, od_rules_t *src, } int od_pool_validate(od_logger_t *logger, od_rule_pool_t *pool, char *db_name, - char *user_name) + char *user_name, char *address_range_string) { /* pooling mode */ if (!pool->type) { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': pooling mode is not set", db_name, - user_name); + "rule '%s.%s %s': pooling mode is not set", db_name, + user_name, address_range_string); return NOT_OK_RESPONSE; } if (strcmp(pool->type, "session") == 0) { @@ -1024,8 +1092,8 @@ int od_pool_validate(od_logger_t *logger, od_rule_pool_t *pool, char *db_name, pool->pool = OD_RULE_POOL_STATEMENT; } else { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': unknown pooling mode", db_name, - user_name); + "rule '%s.%s %s': unknown pooling mode", db_name, + user_name, address_range_string); return NOT_OK_RESPONSE; } @@ -1033,16 +1101,16 @@ int od_pool_validate(od_logger_t *logger, od_rule_pool_t *pool, char *db_name, if (!pool->routing_type) { od_debug( logger, "rules", NULL, NULL, - "rule '%s.%s': pool routing mode is not set, assuming \"client_visible\" by default", - db_name, user_name); + "rule '%s.%s %s': pool routing mode is not set, assuming \"client_visible\" by default", + db_name, user_name, address_range_string); } else if (strcmp(pool->routing_type, "internal") == 0) { pool->routing = OD_RULE_POOL_INTERVAL; } else if (strcmp(pool->routing_type, "client_visible") == 0) { pool->routing = OD_RULE_POOL_CLIENT_VISIBLE; } else { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': unknown pool routing mode", db_name, - user_name); + "rule '%s.%s %s': unknown pool routing mode", db_name, + user_name, address_range_string); return NOT_OK_RESPONSE; } @@ -1051,24 +1119,24 @@ int od_pool_validate(od_logger_t *logger, od_rule_pool_t *pool, char *db_name, pool->pool == OD_RULE_POOL_SESSION) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': prepared statements support in session pool makes no sence", - db_name, user_name); + "rule '%s.%s %s': prepared statements support in session pool makes no sence", + db_name, user_name, address_range_string); return NOT_OK_RESPONSE; } if (pool->reserve_prepared_statement && pool->discard) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': pool discard is forbidden when using prepared statements support", - db_name, user_name); + "rule '%s.%s %s': pool discard is forbidden when using prepared statements support", + db_name, user_name, address_range_string); return NOT_OK_RESPONSE; } if (pool->smart_discard && !pool->reserve_prepared_statement) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': pool smart discard is forbidden without using prepared statements support", - db_name, user_name); + "rule '%s.%s %s': pool smart discard is forbidden without using prepared statements support", + db_name, user_name, address_range_string); return NOT_OK_RESPONSE; } @@ -1076,8 +1144,8 @@ int od_pool_validate(od_logger_t *logger, od_rule_pool_t *pool, char *db_name, if (strcasestr(pool->discard_query, "DEALLOCATE ALL")) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': cannot support prepared statements when 'DEALLOCATE ALL' present in discard string", - db_name, user_name); + "rule '%s.%s %s': cannot support prepared statements when 'DEALLOCATE ALL' present in discard string", + db_name, user_name, address_range_string); return NOT_OK_RESPONSE; } } @@ -1099,20 +1167,23 @@ int od_rules_autogenerate_defaults(od_rules_t *rules, od_logger_t *logger) /* match storage and make a copy of in the user rules */ if (rule->auth_query != NULL && !od_rules_match(rules, rule->db_name, rule->user_name, - rule->db_is_default, rule->user_is_default, - 1)) { + &rule->address_range, rule->db_is_default, + rule->user_is_default, 1)) { need_autogen = true; break; } } - if (!need_autogen || - od_rules_match(rules, "default_db", "default_user", 1, 1, 1)) { + od_address_range_t default_address_range = + od_address_range_create_default(); + + if (!need_autogen || od_rules_match(rules, "default_db", "default_user", + &default_address_range, 1, 1, 1)) { return OK_RESPONSE; } - default_rule = - od_rules_match(rules, "default_db", "default_user", 1, 1, 0); + default_rule = od_rules_match(rules, "default_db", "default_user", + &default_address_range, 1, 1, 0); if (!default_rule) { od_log(logger, "config", NULL, NULL, "skipping default internal rule auto-generation: no default rule provided"); @@ -1149,6 +1220,8 @@ int od_rules_autogenerate_defaults(od_rules_t *rules, od_logger_t *logger) if (rule->db_name == NULL) return NOT_OK_RESPONSE; + rule->address_range = default_address_range; + /* force several default settings */ #define OD_DEFAULT_INTERNAL_POLL_SZ 0 rule->pool->type = strdup("transaction"); @@ -1266,9 +1339,11 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, /* match storage and make a copy of in the user rules */ if (rule->storage_name == NULL) { - od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': no rule storage is specified", - rule->db_name, rule->user_name); + od_error( + logger, "rules", NULL, NULL, + "rule '%s.%s %s': no rule storage is specified", + rule->db_name, rule->user_name, + rule->address_range.string_value); return NOT_OK_RESPONSE; } @@ -1276,8 +1351,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, storage = od_rules_storage_match(rules, rule->storage_name); if (storage == NULL) { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': no rule storage '%s' found", + "rule '%s.%s %s': no rule storage '%s' found", rule->db_name, rule->user_name, + rule->address_range.string_value, rule->storage_name); return NOT_OK_RESPONSE; } @@ -1288,7 +1364,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, } if (od_pool_validate(logger, rule->pool, rule->db_name, - rule->user_name) == NOT_OK_RESPONSE) { + rule->user_name, + rule->address_range.string_value) == + NOT_OK_RESPONSE) { return NOT_OK_RESPONSE; } @@ -1296,16 +1374,18 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, if (rule->user_role != OD_RULE_ROLE_UNDEF) { od_error( logger, "rules validate", NULL, NULL, - "rule '%s.%s': role set for non-local storage", - rule->db_name, rule->user_name); + "rule '%s.%s %s': role set for non-local storage", + rule->db_name, rule->user_name, + rule->address_range.string_value); return NOT_OK_RESPONSE; } } else { if (rule->user_role == OD_RULE_ROLE_UNDEF) { od_error( logger, "rules validate", NULL, NULL, - "rule '%s.%s': force stat role for local storage", - rule->db_name, rule->user_name); + "rule '%s.%s %s': force stat role for local storage", + rule->db_name, rule->user_name, + rule->address_range.string_value); rule->user_role = OD_RULE_ROLE_STAT; } } @@ -1314,8 +1394,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, if (!rule->auth) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': authentication mode is not defined", - rule->db_name, rule->user_name); + "rule '%s.%s %s': authentication mode is not defined", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } if (strcmp(rule->auth, "none") == 0) { @@ -1332,7 +1413,8 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, logger, "rules", NULL, NULL, "auth query and pam service auth method cannot be " "used simultaneously", - rule->db_name, rule->user_name); + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } #endif @@ -1348,8 +1430,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, ) { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': password is not set", - rule->db_name, rule->user_name); + "rule '%s.%s %s': password is not set", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } } else if (strcmp(rule->auth, "md5") == 0) { @@ -1357,8 +1440,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, if (rule->password == NULL && rule->auth_query == NULL) { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': password is not set", - rule->db_name, rule->user_name); + "rule '%s.%s %s': password is not set", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } } else if (strcmp(rule->auth, "scram-sha-256") == 0) { @@ -1366,8 +1450,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, if (rule->password == NULL && rule->auth_query == NULL) { od_error(logger, "rules", NULL, NULL, - "rule '%s.%s': password is not set", - rule->db_name, rule->user_name); + "rule '%s.%s %s': password is not set", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } } else if (strcmp(rule->auth, "cert") == 0) { @@ -1375,8 +1460,9 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, } else { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': has unknown authentication mode", - rule->db_name, rule->user_name); + "rule '%s.%s %s': has unknown authentication mode", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } @@ -1385,15 +1471,17 @@ int od_rules_validate(od_rules_t *rules, od_config_t *config, if (rule->auth_query_user == NULL) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': auth_query_user is not set", - rule->db_name, rule->user_name); + "rule '%s.%s %s': auth_query_user is not set", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } if (rule->auth_query_db == NULL) { od_error( logger, "rules", NULL, NULL, - "rule '%s.%s': auth_query_db is not set", - rule->db_name, rule->user_name); + "rule '%s.%s %s': auth_query_db is not set", + rule->db_name, rule->user_name, + rule->address_range.string_value); return -1; } } @@ -1495,8 +1583,8 @@ void od_rules_print(od_rules_t *rules, od_logger_t *logger) rule = od_container_of(i, od_rule_t, link); if (rule->obsolete) continue; - od_log(logger, "rules", NULL, NULL, "<%s.%s>", rule->db_name, - rule->user_name); + od_log(logger, "rules", NULL, NULL, "<%s.%s %s>", rule->db_name, + rule->user_name, rule->address_range.string_value); od_log(logger, "rules", NULL, NULL, " authentication %s", rule->auth); if (rule->auth_common_name_default) @@ -1654,12 +1742,12 @@ void od_rules_print(od_rules_t *rules, od_logger_t *logger) od_log(logger, "rules", NULL, NULL, " storage_user %s", rule->storage_user); - if (rule->catchup_checks) + if (rule->catchup_timeout) od_log(logger, "rules", NULL, NULL, " catchup timeout %d", rule->catchup_timeout); if (rule->catchup_checks) od_log(logger, "rules", NULL, NULL, - " catchup timeout %d", rule->catchup_checks); + " catchup checks %d", rule->catchup_checks); od_log(logger, "rules", NULL, NULL, " log_debug %s", diff --git a/sources/rules.h b/sources/rules.h index fcfd8a51c..12f218719 100644 --- a/sources/rules.h +++ b/sources/rules.h @@ -44,6 +44,7 @@ typedef struct od_rule_key od_rule_key_t; struct od_rule_key { char *usr_name; char *db_name; + od_address_range_t address_range; od_list_t link; }; @@ -73,6 +74,7 @@ struct od_rule { char *user_name; int user_name_len; int user_is_default; + od_address_range_t address_range; od_rule_role_type_t user_role; /* auth */ @@ -182,12 +184,13 @@ void od_rules_ref(od_rule_t *); void od_rules_unref(od_rule_t *); int od_rules_compare(od_rule_t *, od_rule_t *); -od_rule_t *od_rules_forward(od_rules_t *, char *, char *, int); +od_rule_t *od_rules_forward(od_rules_t *, char *, char *, + struct sockaddr_storage *, int); /* search rule with desored characteristik */ od_rule_t *od_rules_match(od_rules_t *rules, char *db_name, char *user_name, - int db_is_default, int user_is_default, - int pool_internal); + od_address_range_t *address_range, int db_is_default, + int user_is_default, int pool_internal); /* group */ od_group_t *od_rules_group_allocate(od_global_t *global); diff --git a/sources/server.h b/sources/server.h index c5bc5123e..86cfa0535 100644 --- a/sources/server.h +++ b/sources/server.h @@ -35,6 +35,8 @@ struct od_server { uint64_t sync_request; uint64_t sync_reply; + /* to swallow some internal msgs */ + machine_msg_t *parse_msg; int idle_time; kiwi_key_t key; @@ -52,6 +54,7 @@ struct od_server { /* allocated prepared statements ids */ od_hashmap_t *prep_stmts; + int sync_point; od_global_t *global; int offline; @@ -78,6 +81,8 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts) server->deploy_sync = 0; server->sync_request = 0; server->sync_reply = 0; + server->sync_point = 0; + server->parse_msg = NULL; server->init_time_us = machine_time_us(); server->error_connect = NULL; server->offline = 0; @@ -140,8 +145,14 @@ static inline int od_server_in_deploy(od_server_t *server) return server->deploy_sync > 0; } +static inline int od_server_in_sync_point(od_server_t *server) +{ + return server->sync_point > 0; +} + static inline int od_server_synchronized(od_server_t *server) { + assert(server->sync_request >= server->sync_reply); return server->sync_request == server->sync_reply; } diff --git a/sources/status.h b/sources/status.h index ac4f29a66..4e404aaab 100644 --- a/sources/status.h +++ b/sources/status.h @@ -11,6 +11,7 @@ typedef enum { OD_UNDEF, OD_OK, OD_SKIP, + OD_REQ_SYNC, OD_ATTACH, OD_DETACH, OD_WAIT_SYNC, @@ -37,6 +38,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status) return "OD_OK"; case OD_SKIP: return "OD_SKIP"; + case OD_REQ_SYNC: + return "OD_REQ_SYNC"; case OD_ATTACH: return "OD_ATTACH"; case OD_DETACH: diff --git a/sources/storage.c b/sources/storage.c index 82327cb6b..483de724b 100644 --- a/sources/storage.c +++ b/sources/storage.c @@ -259,6 +259,7 @@ void od_storage_watchdog_watch(void *arg) return; } + watchdog_client->is_watchdog = true; watchdog_client->global = global; watchdog_client->type = OD_POOL_CLIENT_INTERNAL; od_id_generate(&watchdog_client->id, "a"); diff --git a/sources/system.c b/sources/system.c index 41816ed1f..075bbb40b 100644 --- a/sources/system.c +++ b/sources/system.c @@ -94,6 +94,11 @@ static inline void od_system_server(void *arg) continue; } od_id_generate(&client->id, "c"); + + od_dbg_printf_on_dvl_lvl(1, "client %s%.*s has relay %p\n", + client->id.id_prefix, + (signed)sizeof(client->id.id), + client->id.id, &client->relay); rc = od_io_prepare(&client->io, client_io, instance->config.readahead); if (rc == -1) { diff --git a/sources/util.h b/sources/util.h index 1a4f6d76c..5847e4aee 100644 --- a/sources/util.h +++ b/sources/util.h @@ -98,4 +98,10 @@ static inline long od_memtol(char *data, size_t data_size, char **end_ptr, return result; } +static inline uint32 od_bswap32(uint32 x) +{ + return ((x << 24) & 0xff000000) | ((x << 8) & 0x00ff0000) | + ((x >> 8) & 0x0000ff00) | ((x >> 24) & 0x000000ff); +} + #endif /* ODYSSEY_UTIL_H */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index af25bb9ca..4c5b41954 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -65,6 +65,7 @@ set(od_test_src ../sources/util.h ../sources/build.h ../sources/debugprintf.h + ../sources/address.c ../sources/hba.c ../sources/hba_rule.c ../sources/hba_reader.c diff --git a/test/odyssey/test_hba_parse.c b/test/odyssey/test_hba_parse.c index 41102b1d4..31f485f5a 100644 --- a/test/odyssey/test_hba_parse.c +++ b/test/odyssey/test_hba_parse.c @@ -7,13 +7,15 @@ void test_od_hba_reader_prefix(sa_family_t net, char *prefix, char *value) od_hba_rule_t *hba = NULL; char buffer[INET6_ADDRSTRLEN]; hba = od_hba_rule_create(); - hba->addr.ss_family = net; - test(od_hba_reader_prefix(hba, prefix) == 0); + hba->address_range.addr.ss_family = net; + test(od_address_range_read_prefix(&hba->address_range, prefix) == 0); if (net == AF_INET) { - struct sockaddr_in *addr = (struct sockaddr_in *)&hba->mask; + struct sockaddr_in *addr = + (struct sockaddr_in *)&hba->address_range.mask; inet_ntop(net, &addr->sin_addr, buffer, sizeof(buffer)); } else { - struct sockaddr_in6 *addr = (struct sockaddr_in6 *)&hba->mask; + struct sockaddr_in6 *addr = + (struct sockaddr_in6 *)&hba->address_range.mask; inet_ntop(net, &addr->sin6_addr, buffer, sizeof(buffer)); } test(memcmp(value, buffer, strlen(buffer)) == 0);