diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9e3b51..fc2eae5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,4 +19,3 @@ jobs: max-line-length: "128" path: "." plugins: "flake8-bugbear" - diff --git a/pandora-box.py b/pandora-box.py index 89bc0b6..11554a4 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -77,8 +77,9 @@ # ----------------------------------------------------------- -class scanThread (threading.Thread): +class scanThread(threading.Thread): """Scanning thread""" + def __init__(self): threading.Thread.__init__(self) self.pandora = pypandora.PyPandora(root_url=pandora_root_url) @@ -107,11 +108,10 @@ def scan(self, file): # f'[{human_readable_size(file_size)}] ' # f'Thread-{id} ') - file_scan_start_time = time.time() + start_time = time.time() if is_fake_scan: status = "SKIPPED" else: - # do not scan files bigger than 1G if file_size > (1024 * 1024 * 1024): status = "TOO BIG" @@ -134,20 +134,17 @@ def scan(self, file): time.sleep(0.1) loop += 1 - file_scan_end_time = time.time() + end_time = time.time() # log the result - log( - f'Scan {file_name} ' - f'[{human_readable_size(file_size)}] ' - '-> ' - f'{status} ({(file_scan_end_time - file_scan_start_time):.1f}s)') + log(f"Scan {file_name} " f"[{human_readable_size(file_size)}] " "-> " f"{status} ({(end_time - start_time):.1f}s)") logging.info( f'boxname="{boxname}", ' f'file="{file_name}", ' f'size="{file_size}", ' f'status="{status}"", ' - f'duration="{int(file_scan_end_time - file_scan_start_time)}"') + f'duration="{int(end_time - start_time)}"' + ) # Get lock queue_lock.acquire() @@ -172,13 +169,12 @@ def scan(self, file): except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'error="{str(ex)}"', exc_info=True) + logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) # ---------------------------------------------------------- + def config(): global is_fake_scan, has_usb_auto_mount, pandora_root_url global has_quarantine, quarantine_folder, has_curses, maxThreads @@ -186,25 +182,26 @@ def config(): # intantiate a ConfirParser config_parser = configparser.ConfigParser() # read the config file - config_parser.read('pandora-box.ini') + config_parser.read("pandora-box.ini") # set values - is_fake_scan = config_parser['DEFAULT']['FAKE_SCAN'].lower() == "true" - has_usb_auto_mount = config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower() == "true" - pandora_root_url = config_parser['DEFAULT']['PANDORA_ROOT_URL'] + is_fake_scan = config_parser["DEFAULT"]["FAKE_SCAN"].lower() == "true" + has_usb_auto_mount = config_parser["DEFAULT"]["USB_AUTO_MOUNT"].lower() == "true" + pandora_root_url = config_parser["DEFAULT"]["PANDORA_ROOT_URL"] # Quarantine - has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower() == "true" - quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER'] + has_quarantine = config_parser["DEFAULT"]["QUARANTINE"].lower() == "true" + quarantine_folder = config_parser["DEFAULT"]["QUARANTINE_FOLDER"] # Curses - has_curses = config_parser['DEFAULT']['CURSES'].lower() == "true" + has_curses = config_parser["DEFAULT"]["CURSES"].lower() == "true" # MaxThreads - maxThreads = int(config_parser['DEFAULT']['THREADS']) + maxThreads = int(config_parser["DEFAULT"]["THREADS"]) # ---------------------------------------------------------- + def human_readable_size(size, decimal_places=1): - """ Convert size to human readble string """ - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + """Convert size to human readble string""" + for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024.0: return f"{size:.{decimal_places}f}{unit}" size /= 1024.0 @@ -215,8 +212,9 @@ def human_readable_size(size, decimal_places=1): # Image Screen # ----------------------------------------------------------- + def display_image(status): - """ Display image on screen """ + """Display image on screen""" if not has_curses: if status == "WAIT": image = "images/key*.png" @@ -235,8 +233,7 @@ def display_image(status): # display image if "*" in image: # slide show - os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} " - "/dev/null >/dev/null &") + os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image}" "/dev/null >/dev/null &") else: # only one image os.system(f"fim -qa {image} /dev/null >/dev/null &") @@ -319,7 +316,7 @@ def init_bar(): def update_bar(progress, flush=False): global last_update_time """Update progress bar""" - if (flush or ((time.time() - last_update_time) >= 1)): + if flush or ((time.time() - last_update_time) >= 1): last_update_time = time.time() if has_curses: if progress == 0: @@ -366,10 +363,8 @@ def print_screen(): print_serial("") init_bar() update_bar(0, flush=True) - log('Ready.', flush=True) - logging.info( - f'boxname="{boxname}", ' - "pandora-box-start") + log("Ready.", flush=True) + logging.info(f'boxname="{boxname}", ' "pandora-box-start") def end_curses(): @@ -386,6 +381,7 @@ def end_curses(): # Logging windows # ----------------------------------------------------------- + def initlog(): """Inititalize logging function""" global log_win @@ -393,10 +389,7 @@ def initlog(): log_win = curses.newwin(curses.LINES - 20, curses.COLS, 20, 0) log_win.border(0) logging.basicConfig( - filename='/var/log/pandora-box.log', - level=logging.INFO, - format='%(asctime)s - %(message)s', - datefmt='%m/%d/%y %H:%M' + filename="/var/log/pandora-box.log", level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%m/%d/%y %H:%M" ) @@ -426,12 +419,12 @@ def log_update(flush=False): """Update the log screen""" global last_update_time # do not refresh the screen too often - if (flush or ((time.time() - last_update_time) >= 1)): + if flush or ((time.time() - last_update_time) >= 1): last_update_time = time.time() log_win.clear() log_win.border(0) for i in range(min(curses.LINES - 22, len(logs))): - log_win.addstr(i + 1, 1, logs[i][:curses.COLS - 2], curses.color_pair(3)) + log_win.addstr(i + 1, 1, logs[i][: curses.COLS - 2], curses.color_pair(3)) log_win.refresh() @@ -439,10 +432,11 @@ def log_update(flush=False): # Device # ----------------------------------------------------------- + def mount_device(): """Mount USB device""" global mount_point - log('Mount device', flush=True) + log("Mount device", flush=True) if has_usb_auto_mount: mount_point = None loop = 0 @@ -454,7 +448,7 @@ def mount_device(): mount_point = partition.mountpoint loop += 1 if mount_device is None: - log('No partition mounted', flush=True) + log("No partition mounted", flush=True) else: mount_point = "/media/box" if not os.path.exists("/media/box"): @@ -499,13 +493,14 @@ def log_device_info(dev): f'model_id="{dev.get("ID_MODEL_ID")}", ' f'serial_short="dev.get("ID_SERIAL_SHORT")", ' f'serial="dev.get("ID_SERIAL")"' - ) + ) # ----------------------------------------------------------- # pandora # ----------------------------------------------------------- + def scan(): """Scan devce with pypandora""" global pandora, qfolder @@ -517,10 +512,7 @@ def scan(): statvfs = os.statvfs(mount_point) except Exception as ex: log(f"error={ex}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'error="{str(ex)}"', - exc_info=True) + logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) if not has_curses: display_image("ERROR") return "ERROR" @@ -574,19 +566,23 @@ def scan(): t.join() update_bar(100, flush=True) - log("Scan done in %.1fs, %d files scanned, %d files infected" % - ((time.time() - scan_start_time), file_count, len(infected_files)), - flush=True) + log( + "Scan done in %.1fs, %d files scanned, %d files infected" + % ((time.time() - scan_start_time), file_count, len(infected_files)), + flush=True, + ) logging.info( f'boxname="{boxname}", ' f'duration="{int(time.time() - scan_start_time)}", ' f'files_scanned="{file_count}", ' - f'files_infected="{len(infected_files)}"') + f'files_infected="{len(infected_files)}"' + ) return "CLEAN" # -------------------------------------- + def wait(): """Wait for insert of remove of USB device""" # handle error - first unmount the device @@ -604,18 +600,14 @@ def wait(): return device_removed() except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'error="{str(ex)}"', exc_info=True) + logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) return "STOP" def device_inserted(dev): global device log("Device inserted", flush=True) - logging.info( - f'boxname="{boxname}", ' - "device-inserted") + logging.info(f'boxname="{boxname}", ' "device-inserted") device = dev log_device_info(device) if not has_curses: @@ -632,9 +624,7 @@ def device_inserted(dev): def device_removed(): global device log("Device removed", flush=True) - logging.info( - f'boxname="{boxname}", ' - "device-removed") + logging.info(f'boxname="{boxname}", ' "device-removed") device = None if not has_curses: display_image("WAIT") @@ -651,11 +641,12 @@ def device_removed(): # -------------------------------------- + def mount(): - """ Mount device """ + """Mount device""" global mount_point mount_device() - log(f'Partition mounted at {mount_point}', flush=True) + log(f"Partition mounted at {mount_point}", flush=True) if mount_point is None: # no partition if not has_curses: @@ -665,9 +656,7 @@ def mount(): os.statvfs(mount_point) except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'error="{str(ex)}"', exc_info=True) + logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) if not has_curses: display_image("WAIT") return "WAIT" @@ -676,8 +665,9 @@ def mount(): # -------------------------------------- + def error(): - """ Display error message """ + """Display error message""" if not has_curses: display_image("ERROR") return "WAIT" @@ -700,9 +690,9 @@ def mouseClickThread(): while not enterEvent.is_set(): buf = mouse.read(3) if not (buf is None): - if ((buf[0] & 0x1) == 1): + if (buf[0] & 0x1) == 1: down = True - if (((buf[0] & 0x1) == 0) and down): + if ((buf[0] & 0x1) == 0) and down: break time.sleep(0.1) mouse.close() @@ -717,7 +707,7 @@ def enterKeyThread(): while not mouseEvent.is_set(): input = sys.stdin.readline() - if (len(input) > 0): + if len(input) > 0: break time.sleep(0.1) @@ -739,14 +729,13 @@ def waitMouseOrEnter(): # -------------------------------------- + def clean(): """Remove infected files""" if len(infected_files) > 0: # display message log(f"{len(infected_files)} infected files detecetd:") - logging.info( - f'boxname="{boxname}", ' - f"infeted_files={len(infected_files)}") + logging.info(f'boxname="{boxname}", ' f"infeted_files={len(infected_files)}") if not has_curses: display_image("BAD") @@ -757,10 +746,10 @@ def clean(): log(file) cnt = cnt + 1 if cnt >= 10: - log('...') + log("...") break # wait for clean - log('PRESS KEY TO CLEAN', flush=True) + log("PRESS KEY TO CLEAN", flush=True) waitMouseOrEnter() @@ -773,32 +762,25 @@ def clean(): try: os.remove(file) log(f"{file} removed") - logging.info( - f'boxname="{boxname}", ' - f'removed="{file}"') + logging.info(f'boxname="{boxname}", ' f'removed="{file}"') files_removed += 1 except Exception as ex: log(f"could not remove: {str(ex)}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'not_removed="{file}, ' - f'error="{str(ex)}"', exc_info=True) + logging.info(f'boxname="{boxname}", ' f'not_removed="{file}, ' f'error="{str(ex)}"', exc_info=True) has_error = True umount_device() - logging.info( - f'boxname="{boxname}", ' - f'cleaned="{files_removed}/{len(infected_files)}"') + logging.info(f'boxname="{boxname}", ' f'cleaned="{files_removed}/{len(infected_files)}"') if not has_error: if has_curses: - log('Device cleaned !', flush=True) + log("Device cleaned !", flush=True) else: display_image("OK") else: if has_curses: - log('Device not cleaned !', flush=True) + log("Device not cleaned !", flush=True) else: display_image("WAIT") else: @@ -809,6 +791,7 @@ def clean(): # -------------------------------------- + def move_to_script_folder(): """Move to pandora-box folder""" abspath = os.path.abspath(__file__) @@ -818,6 +801,7 @@ def move_to_script_folder(): # -------------------------------------- + def startup(): """Start Pandora-box""" global logo @@ -830,7 +814,7 @@ def startup(): # Initilize log initlog() # Read logo - with open('pandora-box.txt', mode='r', encoding='utf-8') as file1: + with open("pandora-box.txt", mode="r", encoding="utf-8") as file1: logo = file1.readlines() # Print logo screen print_screen() @@ -840,6 +824,7 @@ def startup(): # -------------------------------------- + def loop(state): """Main event loop""" match state: @@ -861,8 +846,9 @@ def loop(state): # -------------------------------------- + def get_lock(process_name): - """ Get a lock to check that Pandora-box is not already running """ + """Get a lock to check that Pandora-box is not already running""" # Without holding a reference to our socket somewhere it gets garbage # collected when the function exits @@ -873,16 +859,17 @@ def get_lock(process_name): # in the abstract namespace instead of being created # on the file system itself. # Works only in Linux - get_lock._lock_socket.bind('\0' + process_name) + get_lock._lock_socket.bind("\0" + process_name) except socket.error: - print('Pandora-box is already running !', file=sys.stderr) - os.execvp('/usr/bin/bash', ['/usr/bin/bash', '--norc']) + print("Pandora-box is already running !", file=sys.stderr) + os.execvp("/usr/bin/bash", ["/usr/bin/bash", "--norc"]) sys.exit() # -------------------------------------- + def main(_): """Main entry point""" try: @@ -892,13 +879,11 @@ def main(_): except Exception as ex: print({str(ex)}) log(f"Unexpected error: {str(ex)}", flush=True) - logging.info( - f'boxname="{boxname}", ' - f'error="{str(ex)}"', exc_info=True) + logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) finally: end_curses() if __name__ == "__main__": - get_lock('pandora-box') + get_lock("pandora-box") curses.wrapper(main)