Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test fix #546

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docker/bin/setup
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ host tsa_db user_ro 127.0.0.1/32 trust
host tsa_db user_rw 127.0.0.1/32 trust
EOF

sed -i 's/max_connections = 100/max_connections = 2000/g' /etc/postgresql/14/main/postgresql.conf


rm -fr /var/lib/postgresql/14/main/
rm -fr /var/lib/postgresql/14/repl
Expand All @@ -43,6 +41,7 @@ cp -r /etc/postgresql/14/main /etc/postgresql/14/repl

# create primary
sudo -u postgres /usr/lib/postgresql/14/bin/initdb -D /var/lib/postgresql/14/main/
sed -i 's/max_connections = 100/max_connections = 2000/g' /var/lib/postgresql/14/main/postgresql.conf
sudo -u postgres /usr/lib/postgresql/14/bin/pg_ctl -D /var/lib/postgresql/14/main/ start

# create replica
Expand Down
3 changes: 2 additions & 1 deletion docker/ody-integration-test/pkg/client-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func usrReadResultWhilesigusr2Test(
if err != nil {
return err
}
defer db.Close()
t, err := db.Beginx()
if err != nil {
return err
Expand Down Expand Up @@ -61,6 +62,7 @@ func select42(ctx context.Context, ch chan error, wg *sync.WaitGroup) {
fmt.Println(err)
return
}
defer db.Close()

if _, err := db.Query("Select 42"); err != nil {
ch <- err
Expand Down Expand Up @@ -265,7 +267,6 @@ func sigusr2Test(

func odyClientServerInteractionsTestSet(ctx context.Context) error {


if err := usrReadResultWhilesigusr2Test(ctx); err != nil {
err = fmt.Errorf("usrReadResultWhilesigusr2: %w", err)
fmt.Println(err)
Expand Down
11 changes: 9 additions & 2 deletions docker/ody-integration-test/pkg/cores.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "os"
"os/exec"
"strconv"
"sync"
"syscall"
"time"
)
Expand All @@ -19,7 +20,9 @@ const procName = "odyssey"
const signal = syscall.SIGTERM
const testCount = 100

func bunchProcess(ctx context.Context) {
func bunchProcess(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

_, err := exec.CommandContext(ctx, "pgbench",
"--builtin", "select-only",
"-c", "40",
Expand Down Expand Up @@ -49,13 +52,17 @@ func SigTermAfterHighLoad(ctx context.Context) error {
return err
}

go bunchProcess(ctx)
var wg sync.WaitGroup
wg.Add(1)
go bunchProcess(ctx, &wg)

time.Sleep(timeSleep * time.Second)

if _, err := signalToProc(signal, procName); err != nil {
fmt.Println(err.Error())
}

wg.Wait()
}

files, err := ioutil.ReadDir("/var/cores")
Expand Down
2 changes: 2 additions & 0 deletions docker/ody-integration-test/pkg/showerrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func showErrors(ctx context.Context) error {
if err != nil {
return err
}
defer db.Close()

if mp, err := getErrs(ctx, db); err != nil {
return err
Expand Down Expand Up @@ -108,6 +109,7 @@ func showErrorsAfterPgRestart(ctx context.Context) error {
if err != nil {
return err
}
defer db.Close()
cor_cnt := 10

for i := 0; i < cor_cnt; i++ {
Expand Down
5 changes: 4 additions & 1 deletion docker/ody-integration-test/pkg/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,11 @@ func OdysseyIsAlive(ctx context.Context) error {
}

func waitOnOdysseyAlive(ctx context.Context, timeout time.Duration) error {
for ok := false; !ok && timeout > 0; ok = OdysseyIsAlive(ctx) == nil {
for OdysseyIsAlive(ctx) != nil {
timeout -= time.Second
if timeout < 0 {
return fmt.Errorf("timeout expired")
}
time.Sleep(time.Second)
fmt.Printf("waiting for od up: remamining time %d\n", timeout/time.Second)
}
Expand Down
26 changes: 21 additions & 5 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,22 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client,
od_dbg_printf_on_dvl_lvl(
1, "client %s polling replica for catchup with timeout %d\n",
client->id.id, timeout);

/*
* Ensure heartbeet is initialized at least once.
* Heartbeat might be 0 after reload\restart.
*/
int absent_heartbeat_checks = 0;
while (route->last_heartbeat == 0) {
machine_sleep(ODYSSEY_CATCHUP_RECHECK_INTERVAL);
if (absent_heartbeat_checks++ >
(timeout * 1000 / ODYSSEY_CATCHUP_RECHECK_INTERVAL)) {
od_debug(&instance->logger, "catchup", client, NULL,
"No heartbeat for route detected\n");
return OD_ECATCHUP_TIMEOUT;
}
}

for (int check = 1; check <= route->rule->catchup_checks; ++check) {
od_dbg_printf_on_dvl_lvl(1, "current cached time %d\n",
machine_timeofday_sec());
Expand All @@ -1566,9 +1582,9 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client,
"client %s replication %d lag is over catchup timeout %d\n",
client->id.id, lag, timeout);
/*
* TBD: Consider configuring `ODYSSEY_CATCHUP_RECHECK_INTERVAL` in
* frontend rule.
*/
* TBD: Consider configuring `ODYSSEY_CATCHUP_RECHECK_INTERVAL` in
* frontend rule.
*/
if (check < route->rule->catchup_checks) {
machine_sleep(ODYSSEY_CATCHUP_RECHECK_INTERVAL);
}
Expand Down Expand Up @@ -1685,7 +1701,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
od_server_t *server = NULL;
od_instance_t *instance = client->global->instance;

/* Consult lag polling logic and immudiately close connection with
/* Consult lag polling logic and immudiately close connection with
* error, if lag polling policy says so.
*/

Expand Down Expand Up @@ -2322,4 +2338,4 @@ void od_frontend(void *arg)
od_router_unroute(router, client);
/* close frontend connection */
od_frontend_close(client);
}
}
Loading