-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasyncio_wait.py
53 lines (42 loc) · 1.56 KB
/
asyncio_wait.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
async def start_sniffing_and_capturing_traffic(file_path: str, device_id: str):
lockdown = LockdownClient(serial=device_id)
with open(file_path, 'wb') as fo:
PcapdService.write_to_pcap(fo, PcapdService(lockdown=lockdown).watch())
def main() -> None:
"""Run local instance of network sniffer."""
def _configure_logging(log_level: int = logging.INFO) -> None:
"""Configure console logging.
Args:
log_level: Log level.
Returns:
None.
"""
root = logging.getLogger()
root.setLevel(log_level)
root.addHandler(rich_logging.RichHandler(rich_tracebacks=True))
_configure_logging()
parser = argparse.ArgumentParser(
description="Run network sniffer on target device for determined time."
)
parser.add_argument(
"--device-id", type=str, required=True, help="The ID of the device."
)
parser.add_argument(
"--pcp-file", type=str, required=True, help="The name of pcap file."
)
parser.add_argument(
"--duration", type=int, required=True, help="The duration."
)
args = parser.parse_args()
device_id: str = args.device_id
duration: int = args.duration
pcap_file: str = args.pcap_file
# create task
task = asyncio.create_task(start_sniffing_and_capturing_traffic(device_id=device_id, file_path=pcap_file))
# run function for the given duration
try:
asyncio.run(asyncio.wait_for(task, duration))
except asyncio.TimeoutError:
task.cancel()
if __name__ == '__main__':
main()