Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MDB-33721: extracted wal_reciever info, fix finish_iteration position and cascade replication #59

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
.tox/
*.egg-info/
htmlcov/
./Dockerfile
/Dockerfile*
docker/zookeeper/zookeeper*.tar.gz
test_ssh_key*
.idea
Expand All @@ -12,3 +12,4 @@ logs/
venv/
.python-version
.mypy_cache
build/
22 changes: 22 additions & 0 deletions development.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Local development
## Install dependencies
```shell
sudo apt install tox python3 python3-venv
```

## Test all features
```shell
make check
```

## Test specific feature
```shell
TEST_ARGS='-i archive.feature' make check
```

## Debug
```shell
export DEBUG=true

TEST_ARGS='-i cascade.feature -t @fail_replication_source' make check
```
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ psycopg2-binary==2.9.10
lockfile==0.12.2
python-daemon==3.1.2
pyyaml==6.0.2
behave==1.2.6
docker==7.1.0
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def init_logging(config):
"""
level = getattr(logging, config.get('global', 'log_level').upper())
logging.getLogger('kazoo').setLevel(logging.WARN)
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s:\t%(message)s')
logging.basicConfig(level=level, format='%(asctime)s %(levelname)-7s:\t%(message)s')


def start(config):
Expand Down
2 changes: 1 addition & 1 deletion src/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def wrapper(*args, **kwargs):
else:
current_sleep = min(sleep_time, retrying_end - time.time())
if current_sleep > 0:
logging.info(f'Waiting {current_sleep} for {event_name}')
logging.debug(f'Waiting {current_sleep:.2f} for {event_name}'.format())
time.sleep(current_sleep)
sleep_time = 1.1 * sleep_time + 0.1 * random.random()
logging.warning('Retrying timeout expired.')
Expand Down
84 changes: 38 additions & 46 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class pgconsul(object):
DESTRUCTIVE_OPERATIONS = ['rewind']

def __init__(self, **kwargs):
logging.debug('Initializing main class.')
logging.info('Initializing main class.')
self.config = kwargs.get('config')
self._cmd_manager = CommandManager(self.config)
self._should_run = True
Expand Down Expand Up @@ -270,14 +270,14 @@ def run_iteration(self, my_prio):
if not terminal_state:
logging.debug('Database is starting up or shutting down')
role = self.db.get_role()
logging.debug('Role: %s', str(role))
logging.info('Role: %s ----------', str(role))

db_state = self.db.get_state()
self.notifier.notify()
logging.debug(db_state)
logging.debug('db_state: {}'.format(db_state))
try:
zk_state = self.zk.get_state()
logging.debug(zk_state)
logging.debug('zk_state: {}'.format(zk_state))
helpers.write_status_file(db_state, zk_state, self.config.get('global', 'working_dir'))
self.update_maintenance_status(role, db_state.get('primary_fqdn'))
self._zk_alive_refresh(role, db_state, zk_state)
Expand All @@ -299,10 +299,12 @@ def run_iteration(self, my_prio):
logging.error("Upper exception was for replica")
self.handle_detached_replica(db_state)
self.re_init_zk()
self.finish_iteration(timer)
else:
self.re_init_zk()

self.finish_iteration(timer)
return

stream_from = self.config.get('global', 'stream_from')
if role is None:
self.dead_iter(db_state, zk_state, is_in_terminal_state=terminal_state)
Expand Down Expand Up @@ -332,7 +334,7 @@ def run_iteration(self, my_prio):
self.finish_iteration(timer)

def finish_iteration(self, timer):
logging.debug('Finished iteration.')
logging.info('Finished iteration')
timer.sleep(self.config.getfloat('global', 'iteration_timeout'))

def release_lock_and_return_to_cluster(self):
Expand All @@ -356,15 +358,7 @@ def single_node_primary_iter(self, db_state, zk_state):

self.zk.write(self.zk.TIMELINE_INFO_PATH, db_state['timeline'])

pooler_port_available, pooler_service_running = self.db.pgpooler('status')
if pooler_service_running and not pooler_port_available:
logging.warning('Service alive, but pooler not accepting connections, restarting.')
self.db.pgpooler('stop')
self.db.pgpooler('start')
elif not pooler_service_running:
logging.debug('Here we should open for load.')
self.db.pgpooler('start')

self.db.ensure_pooler_started()
self.db.ensure_archiving_wal()

# Enable async replication
Expand Down Expand Up @@ -410,8 +404,6 @@ def primary_iter(self, db_state, zk_state):

self._reset_simple_primary_switch_try()

self.checks['primary_switch'] = 0

# release replication source locks
self._acquire_replication_source_slot_lock(None)

Expand Down Expand Up @@ -444,15 +436,7 @@ def primary_iter(self, db_state, zk_state):

self._drop_stale_switchover(db_state)

pooler_port_available, pooler_service_running = self.db.pgpooler('status')
if pooler_service_running and not pooler_port_available:
logging.warning('Service alive, but pooler not accepting connections, restarting.')
self.db.pgpooler('stop')
self.db.pgpooler('start')
elif not pooler_service_running:
logging.debug('Here we should open for load.')
self.db.pgpooler('start')

self.db.ensure_pooler_started()
# Ensure that wal archiving is enabled. It can be disabled earlier due to
# some zk connectivity issues.
self.db.ensure_archiving_wal()
Expand All @@ -465,12 +449,13 @@ def primary_iter(self, db_state, zk_state):
logging.debug('Checking ha replics for aliveness')
alive_hosts = self.zk.get_alive_hosts(timeout=3, catch_except=False)
ha_replics = {replica for replica in ha_replics_config if replica in alive_hosts}
logging.debug('alive_hosts: {}, ha_replics: {}'.format(alive_hosts, ha_replics))
except Exception:
logging.exception('Fail to get replica status')
ha_replics = ha_replics_config
if len(ha_replics) != len(ha_replics_config):
logging.debug(
'Some of the replics is unavailable, config replics % alive replics %s',
'Some of the replics is unavailable, config replics %s alive replics %s',
str(ha_replics_config),
str(ha_replics),
)
Expand Down Expand Up @@ -550,7 +535,7 @@ def handle_detached_replica(self, db_state):
zk_write_delay = now - self.last_zk_host_stat_write
if zk_write_delay < close_detached_replica_after:
logging.debug(
f'Replica ZK write delay {zk_write_delay} within '
f'Replica ZK write delay {zk_write_delay:.2f} within '
f'{close_detached_replica_after} seconds; keeping replica open'
)
return
Expand Down Expand Up @@ -636,18 +621,18 @@ def replica_return(self, db_state, zk_state):
my_hostname = helpers.get_hostname()
self.write_host_stat(my_hostname, db_state)
holder = zk_state['lock_holder']

self.checks['failover'] = 0
limit = self.config.getfloat('replica', 'recovery_timeout')

# Try to resume WAL replaying, it can be paused earlier
logging.debug('ACTION. Replica is returning. So we resume WAL replay to {}'.format(holder))
self.db.pg_wal_replay_resume()

if not self._check_archive_recovery(holder, limit) and not self._wait_for_streaming(holder, limit):
# Wal receiver is not running and
# postgresql isn't in archive recovery
# We should try to restart
logging.warning('We should try switch primary one more time here.')
logging.warning('We should try switch primary to {} again'.format(holder))
return self._return_to_cluster(holder, 'replica', is_dead=False)

def _get_streaming_replica_from_replics_info(self, fqdn, replics_info):
Expand Down Expand Up @@ -698,10 +683,11 @@ def non_ha_replica_iter(self, db_state, zk_state):
stream_from, zk_state.get(self.zk.REPLICS_INFO_PATH)
)
wal_receiver_info = self._zk_get_wal_receiver_info(stream_from)
logging.debug('wal_receiver_info: {}'.format(wal_receiver_info))
replication_source_streams = bool(
wal_receiver_info and wal_receiver_info[0].get('status') == 'streaming'
wal_receiver_info and wal_receiver_info.get('status') == 'streaming'
)
logging.error(replication_source_replica_info)
logging.error('replication_source_replica_info: {}'.format(replication_source_replica_info))

if replication_source_is_dead:
# Replication source is dead. We need to streaming from primary while it became alive and start streaming from primary.
Expand Down Expand Up @@ -742,7 +728,6 @@ def non_ha_replica_iter(self, db_state, zk_state):
'My replication source %s seems alive. But it don\'t streaming. Waiting it starts streaming from primary.',
stream_from,
)
self.checks['primary_switch'] = 0
self.start_pooler()
self._reset_simple_primary_switch_try()
self._handle_slots()
Expand Down Expand Up @@ -844,8 +829,6 @@ def replica_iter(self, db_state, zk_state):

return self.replica_return(db_state, zk_state)

self.checks['primary_switch'] = 0

self.start_pooler()
self._reset_simple_primary_switch_try()

Expand Down Expand Up @@ -1021,6 +1004,7 @@ def _verify_timeline(self, db_state, zk_state, without_leader_lock=False):
return True

def _reset_simple_primary_switch_try(self):
self.checks['primary_switch'] = 0
simple_primary_switch_path = self.zk.get_simple_primary_switch_try_path(get_hostname())
if self.zk.noexcept_get(simple_primary_switch_path) != 'no':
self.zk.noexcept_write(simple_primary_switch_path, 'no', need_lock=False)
Expand Down Expand Up @@ -1050,7 +1034,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
primary_switch_checks = self.config.getint('replica', 'primary_switch_checks')
need_restart = self.config.getboolean('replica', 'primary_switch_restart')

logging.info('Starting simple primary switch.')
logging.info('ACTION. Starting simple primary switch to {}'.format(new_primary))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are just beginning a sequence of steps to perform switch, it is not a simple action that should be logged as intent with ACTION.

if self.checks['primary_switch'] >= primary_switch_checks:
self._set_simple_primary_switch_try()

Expand Down Expand Up @@ -1084,7 +1068,8 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
#
# The easy way succeeded.
#
logging.info('Simple primary switch succeeded.')
logging.info('ACTION. Simple switch primary to {} succeeded'.format(new_primary))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as on R1037, it is not an ACTION, but a branch of execution that ended here.

self._reset_simple_primary_switch_try()
return True
else:
return False
Expand Down Expand Up @@ -1228,13 +1213,13 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Return to cluster (try stupid method, if it fails we try rewind)
"""
logging.info('Starting returning to cluster.')
logging.info('Starting return to cluster. New primary: {}'.format(new_primary))
if self.checks['primary_switch'] >= 0:
self.checks['primary_switch'] += 1
else:
self.checks['primary_switch'] = 1
logging.debug("primary_switch checks is %d", self.checks['primary_switch'])

logging.debug('primary switch checks is {}'.format(self.checks['primary_switch']))
self._acquire_replication_source_slot_lock(new_primary)
failover_state = self.zk.noexcept_get(self.zk.FAILOVER_INFO_PATH)
if failover_state is not None and failover_state not in ('finished', 'promoting', 'checkpointing'):
Expand Down Expand Up @@ -1262,11 +1247,18 @@ def _return_to_cluster(self, new_primary, role, is_dead=False):
# rewinding and failed. So only hard way possible in this case.
#
last_op = self.zk.noexcept_get('%s/%s/op' % (self.zk.MEMBERS_PATH, helpers.get_hostname()))
logging.info('Last op is: %s' % str(last_op))
if role != 'primary' and not self.is_op_destructive(last_op) and not self._is_simple_primary_switch_tried():
logging.info('Trying to do a simple primary switch.')
tried = self._is_simple_primary_switch_tried()
if role == 'primary' or self.is_op_destructive(last_op) or tried:
logging.info('Could not do a simple primary switch')
logging.debug('Possible reasons: Role: %s, Last op is destructive: %s, Simple primary switch tried: %s',
role, self.is_op_destructive(last_op), tried
)
else:
logging.info('Trying to do a simple primary switch: {}'.format(new_primary))
result = self._try_simple_primary_switch_with_lock(limit, new_primary, is_dead)
logging.info('Primary switch count: %s finish with result: %s', self.checks['primary_switch'], result)
if not result:
logging.error('ACTION-FAILED. Could not simple switch to primary: %s, attempts: %s',
new_primary, self.checks['primary_switch'])
return None

#
Expand Down Expand Up @@ -1607,7 +1599,7 @@ def _check_archive_recovery(self, new_primary, limit):

def check_recovery_start():
if self._check_postgresql_streaming(new_primary):
logging.debug('PostgreSQL is already streaming from primary')
logging.debug('PostgreSQL is already streaming from {}'.format(new_primary))
return True

# we can get here with another role or
Expand Down Expand Up @@ -1667,7 +1659,7 @@ def _check_postgresql_streaming(self, primary):
return False

if replica_infos is not None and (pgconsul._is_caught_up(replica_infos) and self.db.check_walreceiver()):
logging.debug('PostgreSQL has started streaming from primary.')
logging.debug('PostgreSQL has started streaming from {}'.format(primary))
return True

return None
Expand All @@ -1678,7 +1670,7 @@ def _wait_for_streaming(self, primary, limit=-1):
With limit=-1 the loop here can be infinite.
"""
check_streaming = functools.partial(self._check_postgresql_streaming, primary)
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from {}'.format(primary))

def _wait_for_lock(self, lock, limit=-1):
"""
Expand Down
Loading