diff --git a/docker/bin/setup b/docker/bin/setup index 211daff86..aebb54700 100755 --- a/docker/bin/setup +++ b/docker/bin/setup @@ -33,8 +33,6 @@ host tsa_db user_ro 127.0.0.1/32 trust host tsa_db user_rw 127.0.0.1/32 trust EOF -sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/14/main/postgresql.conf - rm -fr /var/lib/postgresql/14/main/ rm -fr /var/lib/postgresql/14/repl @@ -43,6 +41,7 @@ cp -r /etc/postgresql/14/main /etc/postgresql/14/repl # create primary sudo -u postgres /usr/lib/postgresql/14/bin/initdb -D /var/lib/postgresql/14/main/ +sed -i 's/max_connections = 100/max_connections = 2000/g' /var/lib/postgresql/14/main/postgresql.conf sudo -u postgres /usr/lib/postgresql/14/bin/pg_ctl -D /var/lib/postgresql/14/main/ start # create replica diff --git a/docker/ody-integration-test/pkg/client-server.go b/docker/ody-integration-test/pkg/client-server.go index eadf6cfae..778f260fc 100644 --- a/docker/ody-integration-test/pkg/client-server.go +++ b/docker/ody-integration-test/pkg/client-server.go @@ -26,6 +26,7 @@ func usrReadResultWhilesigusr2Test( if err != nil { return err } + defer db.Close() t, err := db.Beginx() if err != nil { return err @@ -61,6 +62,7 @@ func select42(ctx context.Context, ch chan error, wg *sync.WaitGroup) { fmt.Println(err) return } + defer db.Close() if _, err := db.Query("Select 42"); err != nil { ch <- err @@ -265,7 +267,6 @@ func sigusr2Test( func odyClientServerInteractionsTestSet(ctx context.Context) error { - if err := usrReadResultWhilesigusr2Test(ctx); err != nil { err = fmt.Errorf("usrReadResultWhilesigusr2: %w", err) fmt.Println(err) diff --git a/docker/ody-integration-test/pkg/cores.go b/docker/ody-integration-test/pkg/cores.go index 82467629e..2d11a7f4e 100644 --- a/docker/ody-integration-test/pkg/cores.go +++ b/docker/ody-integration-test/pkg/cores.go @@ -9,6 +9,7 @@ import ( _ "os" "os/exec" "strconv" + "sync" "syscall" "time" ) @@ -19,7 +20,9 @@ const procName = "odyssey" const signal = syscall.SIGTERM const testCount = 100 -func bunchProcess(ctx context.Context) { +func bunchProcess(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + _, err := exec.CommandContext(ctx, "pgbench", "--builtin", "select-only", "-c", "40", @@ -49,13 +52,17 @@ func SigTermAfterHighLoad(ctx context.Context) error { return err } - go bunchProcess(ctx) + var wg sync.WaitGroup + wg.Add(1) + go bunchProcess(ctx, &wg) time.Sleep(timeSleep * time.Second) if _, err := signalToProc(signal, procName); err != nil { fmt.Println(err.Error()) } + + wg.Wait() } files, err := ioutil.ReadDir("/var/cores") diff --git a/docker/ody-integration-test/pkg/showerrs.go b/docker/ody-integration-test/pkg/showerrs.go index cf2b24f67..544b08ae0 100644 --- a/docker/ody-integration-test/pkg/showerrs.go +++ b/docker/ody-integration-test/pkg/showerrs.go @@ -57,6 +57,7 @@ func showErrors(ctx context.Context) error { if err != nil { return err } + defer db.Close() if mp, err := getErrs(ctx, db); err != nil { return err @@ -108,6 +109,7 @@ func showErrorsAfterPgRestart(ctx context.Context) error { if err != nil { return err } + defer db.Close() cor_cnt := 10 for i := 0; i < cor_cnt; i++ { diff --git a/docker/ody-integration-test/pkg/util.go b/docker/ody-integration-test/pkg/util.go index 492a1a05d..1ac0a9bb6 100644 --- a/docker/ody-integration-test/pkg/util.go +++ b/docker/ody-integration-test/pkg/util.go @@ -166,8 +166,11 @@ func OdysseyIsAlive(ctx context.Context) error { } func waitOnOdysseyAlive(ctx context.Context, timeout time.Duration) error { - for ok := false; !ok && timeout > 0; ok = OdysseyIsAlive(ctx) == nil { + for OdysseyIsAlive(ctx) != nil { timeout -= time.Second + if timeout < 0 { + return fmt.Errorf("timeout expired") + } time.Sleep(time.Second) fmt.Printf("waiting for od up: remamining time %d\n", timeout/time.Second) } diff --git a/sources/frontend.c b/sources/frontend.c index 1179ab89d..40195c61f 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -1551,6 +1551,22 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, od_dbg_printf_on_dvl_lvl( 1, "client %s polling replica for catchup with timeout %d\n", client->id.id, timeout); + + /* + * Ensure heartbeet is initialized at least once. + * Heartbeat might be 0 after reload\restart. + */ + int absent_heartbeat_checks = 0; + while (route->last_heartbeat == 0) { + machine_sleep(ODYSSEY_CATCHUP_RECHECK_INTERVAL); + if (absent_heartbeat_checks++ > + (timeout * 1000 / ODYSSEY_CATCHUP_RECHECK_INTERVAL)) { + od_debug(&instance->logger, "catchup", client, NULL, + "No heartbeat for route detected\n"); + return OD_ECATCHUP_TIMEOUT; + } + } + for (int check = 1; check <= route->rule->catchup_checks; ++check) { od_dbg_printf_on_dvl_lvl(1, "current cached time %d\n", machine_timeofday_sec()); @@ -1566,9 +1582,9 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, "client %s replication %d lag is over catchup timeout %d\n", client->id.id, lag, timeout); /* - * TBD: Consider configuring `ODYSSEY_CATCHUP_RECHECK_INTERVAL` in - * frontend rule. - */ + * TBD: Consider configuring `ODYSSEY_CATCHUP_RECHECK_INTERVAL` in + * frontend rule. + */ if (check < route->rule->catchup_checks) { machine_sleep(ODYSSEY_CATCHUP_RECHECK_INTERVAL); } @@ -1685,7 +1701,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) od_server_t *server = NULL; od_instance_t *instance = client->global->instance; - /* Consult lag polling logic and immudiately close connection with + /* Consult lag polling logic and immudiately close connection with * error, if lag polling policy says so. */ @@ -2322,4 +2338,4 @@ void od_frontend(void *arg) od_router_unroute(router, client); /* close frontend connection */ od_frontend_close(client); -} +} \ No newline at end of file