Skip to content
This repository has been archived by the owner on Oct 7, 2024. It is now read-only.

Commit

Permalink
#40 Do not try to start dead broker several times
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed Aug 12, 2016
1 parent 8f7a1a2 commit 398531a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
5 changes: 5 additions & 0 deletions bubuku/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def run(self, current_actions) -> bool:
def can_run_at_exit(self) -> bool:
return False

def on_remove(self):
pass


class Check(object):
def __init__(self, check_interval_s=5):
Expand Down Expand Up @@ -98,9 +101,11 @@ def _run_changes(self, running_changes: dict, ip: str) -> list:
def _release_changes_lock(self, changes_to_remove):
if changes_to_remove:
for change_name in changes_to_remove:
removed_change = self.changes[change_name][0]
del self.changes[change_name][0]
if not self.changes[change_name]:
del self.changes[change_name]
removed_change.on_remove()
with self.zk.lock():
for name in changes_to_remove:
self.zk.unregister_change(name)
Expand Down
16 changes: 14 additions & 2 deletions bubuku/features/restart_if_dead.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@


class StartBrokerChange(Change):
def __init__(self, broker: BrokerManager, zk: BukuExhibitor):
def __init__(self, broker: BrokerManager, zk: BukuExhibitor, processed_callback):
self.broker = broker
self.zk = zk
self.stopped = False
self.processed_callback = processed_callback

def get_name(self) -> str:
return 'start'
Expand All @@ -36,6 +37,10 @@ def run(self, current_actions) -> bool:
return True
return False

def on_remove(self):
if self.processed_callback:
self.processed_callback()

def __str__(self):
return 'StartBrokerChange({})'.format(self.get_name())

Expand All @@ -45,12 +50,19 @@ def __init__(self, broker: BrokerManager, zk: BukuExhibitor):
super().__init__()
self.broker = broker
self.zk = zk
self.need_check = True

def check(self) -> Change:
if not self.need_check:
return None
if self.broker.is_running_and_registered():
return None
_LOG.info('Oops! Broker is dead, triggering restart')
return StartBrokerChange(self.broker, self.zk)
self.need_check = False
return StartBrokerChange(self.broker, self.zk, self.on_check_removed)

def on_check_removed(self):
self.need_check = True

def __str__(self):
return 'CheckBrokerStopped'

0 comments on commit 398531a

Please sign in to comment.