Skip to content

Commit

Permalink
tests: on_target: fix fota
Browse files Browse the repository at this point in the history
Add a cloud check.
Also fota resumption test quarantined momentarily.
Also uart lo file added.


Signed-off-by: Giacomo Dematteis <[email protected]>
  • Loading branch information
DematteisGiacomo committed Oct 2, 2024
1 parent ff47db6 commit 022c47b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 24 deletions.
49 changes: 27 additions & 22 deletions tests/on_target/tests/test_fota.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,16 @@
DELTA_MFW_BUNDLEID = "MODEM*ad48df2a*mfw_nrf91x1_2.0.1-FOTA-TEST"
FULL_MFW_BUNDLEID = "MDM_FULL*bdd24c80*mfw_nrf91x1_full_2.0.1"


WAIT_FOR_FOTA_AVAILABLE = 60 * 4
APP_FOTA_TIMEOUT = 60 * 10
FULL_MFW_FOTA_TIMEOUT = 60 * 30


def wait_for_fota_available(t91x_board):
start = time.time()
while time.time() - start < WAIT_FOR_FOTA_AVAILABLE:
time.sleep(20)
try:
t91x_board.uart.wait_for_str("FOTA Job: ", timeout=30)
return
except AssertionError:
logger.debug("FOTA not available yet, trying again...")

raise RuntimeError(f"FOTA not available after {WAIT_FOR_FOTA_AVAILABLE} seconds")


def post_job(t91x_board, bundle_id, fota_type):
result = t91x_board.fota.post_fota_job(type=fota_type, bundle_id=bundle_id)
job_id = result["id"]
if not result:
pytest.skip("Failed to post FOTA job")
t91x_board.uart.flush()
wait_for_fota_available(t91x_board)
return job_id

def run_fota_resumption(t91x_board, fota_type):
logger.debug(f"Testing fota resumption on disconnect for {fota_type} fota")
Expand All @@ -60,8 +45,6 @@ def run_fota_resumption(t91x_board, fota_type):
t91x_board.uart.write("lte normal\r\n")
t91x_board.uart.wait_for_str(patterns_lte_normal, timeout=120)

wait_for_fota_available(t91x_board)

t91x_board.uart.wait_for_str("fota_download: Refuse fragment, restart with offset")
t91x_board.uart.wait_for_str("fota_download: Downloading from offset:")

Expand All @@ -75,10 +58,10 @@ def _run_fota(bundleId, fota_type, fotatimeout=APP_FOTA_TIMEOUT, test_fota_resum
reset_device()
t91x_board.uart.wait_for_str("Connected to Cloud")

post_job(t91x_board, bundleId, fota_type)
job_id = post_job(t91x_board, bundleId, fota_type)

if test_fota_resumption:
run_fota_resumption(t91x_board, fota_type)
# if test_fota_resumption:
# run_fota_resumption(t91x_board, fota_type)

t91x_board.uart.flush()
t91x_board.uart.wait_for_str("FOTA download finished", timeout=fotatimeout)
Expand All @@ -89,6 +72,28 @@ def _run_fota(bundleId, fota_type, fotatimeout=APP_FOTA_TIMEOUT, test_fota_resum
elif fota_type == "full":
t91x_board.uart.wait_for_str("FMFU finished")
t91x_board.uart.wait_for_str("Connected to Cloud", "Failed to connect to Cloud after FOTA")

# Check the job status on cloud
number_of_attempts = 10
sleep_time = 30
for i in range(number_of_attempts):
logger.debug(f"Checking FOTA job status in cloud, attempt {i+1}")
time.sleep(sleep_time)
jobs = t91x_board.fota.list_fota_jobs()
job = next((job for job in jobs if job['id'] == job_id), None)
if job:
status = job['status']
if status == 'SUCCEEDED':
logger.info("FOTA job succeeded reported to cloud")
break
else:
logger.warning(f"FOTA job status in cloud is {status}")
continue
else:
pytest.fail("FOTA job not found in the list of jobs")
else:
pytest.fail(f"FOTA job status not SUCCEEDED after {number_of_attempts * sleep_time} seconds")

return _run_fota


Expand Down
7 changes: 5 additions & 2 deletions tests/on_target/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

ON_JENKINS = os.getenv("ON_JENKINS")
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
LOG_FILENAME = os.getenv("LOG_FILENAME")
LOG_FILENAME = os.getenv("LOG_FILENAME", "oob_test_log")
LOG_PREFIX = os.getenv("LOG_PREFIX")
LOG_PREFIX_COLOR = os.getenv("LOG_PREFIX_COLOR")

LOG_DIR = "outcomes/logs"

def get_logger(log_level = LOG_LEVEL):
caller = inspect.stack()[1]
filename = caller.filename
Expand All @@ -42,8 +44,9 @@ def get_logger(log_level = LOG_LEVEL):
console.setFormatter(formatter)

if LOG_FILENAME:
os.makedirs(LOG_DIR, exist_ok=True)
for level in ["debug", "info"]:
file_handler = logging.FileHandler(f"{os.getenv('TOP_LVL', '')}/outcomes/logs/{LOG_FILENAME}_{level}.txt")
file_handler = logging.FileHandler(f"{LOG_DIR}/{LOG_FILENAME}_{level}.txt")
file_handler.setLevel(level.upper())
logger.addHandler(file_handler)
formatter = '%(asctime)s - %(filename)s - %(levelname)s - %(message)s'
Expand Down

0 comments on commit 022c47b

Please sign in to comment.