Skip to content

Commit

Permalink
Merge pull request #4028 from taosdata/feature/crash_gen
Browse files Browse the repository at this point in the history
Enhanced crash_gen tool
  • Loading branch information
guanshengliang authored Oct 28, 2020
2 parents 6d69044 + eceae10 commit c380e8f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
4 changes: 3 additions & 1 deletion tests/pytest/crash_gen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ if [[ $1 == '--valgrind' ]]; then
$CRASH_GEN_EXEC $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
elif [[ $1 == '--helgrind' ]]; then
shift
HELGRIND_OUT=helgrind.out
HELGRIND_ERR=helgrind.err
valgrind \
--tool=helgrind \
$PYTHON_EXEC \
$CRASH_GEN_EXEC $@
$CRASH_GEN_EXEC $@ > $HELGRIND_OUT 2> $HELGRIND_ERR
else
$PYTHON_EXEC $CRASH_GEN_EXEC $@
fi
Expand Down
43 changes: 36 additions & 7 deletions tests/pytest/crash_gen/crash_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,11 @@ def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
"To be implemeted by child classes, class name: {}".format(
self.__class__.__name__))

def _isServiceStable(self):
if not gSvcMgr:
return True # we don't run service, so let's assume it's stable
return gSvcMgr.isStable() # otherwise let's examine the service

def _isErrAcceptable(self, errno, msg):
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
Expand Down Expand Up @@ -1263,7 +1268,7 @@ def _isErrAcceptable(self, errno, msg):
return True
elif msg.find("duplicated column names") != -1: # also alter table tag issues
return True
elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
elif not self._isServiceStable(): # We are managing service, and ...
Logging.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
return True

Expand Down Expand Up @@ -1641,15 +1646,39 @@ def getEndState(cls):
def canBeginFrom(cls, state: AnyState):
return state.canReadData()

# def _canRestartService(self):
# if not gSvcMgr:
# return True # always
# return gSvcMgr.isActive() # only if it's running TODO: race condition here

def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
sTable = self._db.getFixedSuperTable()

# 1 in 5 chance, simulate a broken connection.
if random.randrange(5) == 0: # TODO: break connection in all situations
wt.getDbConn().close()
wt.getDbConn().open()
print("_r", end="", flush=True)

# 1 in 5 chance, simulate a broken connection, only if service stable (not restarting)
if random.randrange(20)==0: # and self._canRestartService(): # TODO: break connection in all situations
# Logging.info("Attempting to reconnect to server") # TODO: change to DEBUG
Progress.emit(Progress.SERVICE_RECONNECT_START)
try:
wt.getDbConn().close()
wt.getDbConn().open()
except ConnectionError as err: # may fail
if not gSvcMgr:
Logging.error("Failed to reconnect in client-only mode")
raise # Not OK if we are running in client-only mode
if gSvcMgr.isRunning(): # may have race conditon, but low prob, due to
Logging.error("Failed to reconnect when managed server is running")
raise # Not OK if we are running normally

Progress.emit(Progress.SERVICE_RECONNECT_FAILURE)
# Logging.info("Ignoring DB reconnect error")

# print("_r", end="", flush=True)
Progress.emit(Progress.SERVICE_RECONNECT_SUCCESS)
# The above might have taken a lot of time, service might be running
# by now, causing error below to be incorrectly handled due to timing issue
return # TODO: fix server restart status race condtion


dbc = wt.getDbConn()
dbName = self._db.getName()
for rTbName in sTable.getRegTables(dbc, dbName): # regular tables
Expand Down
8 changes: 7 additions & 1 deletion tests/pytest/crash_gen/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,17 @@ class Progress:
BEGIN_THREAD_STEP = 1
END_THREAD_STEP = 2
SERVICE_HEART_BEAT= 3
SERVICE_RECONNECT_START = 4
SERVICE_RECONNECT_SUCCESS = 5
SERVICE_RECONNECT_FAILURE = 6
tokens = {
STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[',
END_THREAD_STEP: '] ',
SERVICE_HEART_BEAT: '.Y.'
SERVICE_HEART_BEAT: '.Y.',
SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>',
SERVICE_RECONNECT_FAILURE: '.xr>',
}

@classmethod
Expand Down
15 changes: 12 additions & 3 deletions tests/pytest/crash_gen/service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,18 @@ def stop(self):
# process still alive, let's interrupt it
print("Terminate running process, send SIG_INT and wait...")
# sub process should end, then IPC queue should end, causing IO thread to end
self.subProcess.send_signal(signal.SIGINT)
# sig = signal.SIGINT
sig = signal.SIGKILL
self.subProcess.send_signal(sig) # SIGNINT or SIGKILL
self.subProcess.wait(20)
retCode = self.subProcess.returncode # should always be there
# May throw subprocess.TimeoutExpired exception above, therefore
# The process is guranteed to have ended by now
self.subProcess = None
if retCode != 0: # != (- signal.SIGINT):
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG {}, retCode={}".format(sig, retCode))
else:
Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
Logging.info("TSP.stop(): sub proc successfully terminated with SIG {}".format(sig))
return - retCode

class ServiceManager:
Expand Down Expand Up @@ -395,6 +397,13 @@ def isActive(self):
return True
return False

def isRunning(self):
for ti in self._tInsts:
if not ti.getStatus().isRunning():
return False
return True


# def isRestarting(self):
# """
# Determine if the service/cluster is being "restarted", i.e., at least
Expand Down

0 comments on commit c380e8f

Please sign in to comment.