-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusb_scanner.py
111 lines (89 loc) · 3.61 KB
/
usb_scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
import os
import os.path
import threading
import time
import sys
import subprocess
from pyudev import Context, Monitor, MonitorObserver
def loading_indicator(event):
"""Displays a loading indicator."""
sys.stdout.write("Scanning")
sys.stdout.flush()
while not event.is_set():
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
print() # Move to the new line at the end
def on_device_event(device):
"""Callback function to be called when a device is added."""
if device.action == 'add' and device.get('ID_FS_TYPE') in ['vfat', 'ntfs', 'exfat', 'ext4']:
dev_path = device.get('DEVNAME')
fs_type = device.get('ID_FS_TYPE')
print(f"Device Added: {dev_path}, FS Type: {fs_type}")
# Wait for the device to be mounted.
time.sleep(2)
# Find the mount point.
mount_point = None
result = subprocess.run(
['df', '--output=target', dev_path], capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
# The second line contains the mount point.
mount_point = lines[1]
if mount_point:
print(f"USB Device Added: {dev_path} at {mount_point}")
scan_device(mount_point)
else:
print(
f"Could not find the mount point for {dev_path}. Skipping scan.")
def scan_device(dev_path):
"""Function to scan the device using ClamAV"""
command = ['sudo', 'clamscan', '-r', '--remove', dev_path]
log_file_name = 'clamscan_log.txt'
log_file_path = os.path.join(dev_path, log_file_name)
backup_log_file_path = os.path.join(os.path.expanduser('~'), 'Desktop', log_file_name)
stop_event = threading.Event()
loading_thread = threading.Thread(target=loading_indicator, args=(stop_event,))
loading_thread.start()
try:
result = subprocess.run(command, capture_output=True, text=True)
stop_event.set()
loading_thread.join()
# Output the result to the terminal
print(result.stdout)
# Try to write the result to the log file on the drive
try:
with open(log_file_path, 'a') as log_file:
log_file.write(result.stdout)
print(f"Scan Completed! Log saved to {log_file_path}.")
except Exception as e:
print(f"Failed to write log to {log_file_path}, Error: {str(e)}")
# Attempt to write to the backup location on Desktop
try:
with open(backup_log_file_path, 'a') as backup_log_file:
backup_log_file.write(result.stdout)
print(f"Backup log saved to {backup_log_file_path}.")
except Exception as e:
print(f"Failed to write backup log to {backup_log_file_path}, Error: {str(e)}")
print("This terminal can now be closed.")
input() # Wait for the user to press Enter
except Exception as e:
stop_event.set()
loading_thread.join()
print(f"Error occurred while scanning: {str(e)}")
def main():
"""Main function to setup device monitor"""
context = Context()
monitor = Monitor.from_netlink(context)
monitor.filter_by(subsystem='block')
observer = MonitorObserver(
monitor, callback=on_device_event, name='observer')
observer.daemon = False
observer.start()
print("Monitoring USB ports. Plug in a USB device to test...")
for device in iter(monitor.poll, None):
on_device_event(device)
if __name__ == "__main__":
main()