Skip to content

Commit

Permalink
Revise prototype code.
Browse files Browse the repository at this point in the history
  • Loading branch information
WFA-lliu committed Oct 24, 2023
1 parent 2210268 commit e232c47
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions fakesniff.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def __init__(self) -> None:
self.patt["capi"] = dict()
self.patt["capi"]["*"] = self.__invoke
self.patt["capi"]["sniffer_control_start"] = self.__restore
#self.patt["capi"]["sniffer_decrypt_trace"] = self.__silence
self.patt["capi"]["sniffer_decrypt_trace"] = self.__invoke_ul
self.patt["capi"]["sniffer_control_stop"] = self.__silence
self.patt["capi"]["sniffer_control_upload"] = self.__silence
self.patt["capi"]["sniffer_get_info"] = self.__silence
self.patt["capi_ret"] = self.__returned_check
self.cfg = dict()
self.cfg["tmpdir"] = "tmp"
self.cfg["uldir"] = None
self.cfg["dec_idx"] = int(0)
self.cfg["reuse"] = True
self.cfg["dispose"] = False
Expand Down Expand Up @@ -136,22 +137,37 @@ def __restore(self, argv: list) -> bool:
pass
return ret

def __invoke_ul(self, argv: list) -> bool:
logging.debug("INVOKE_UL: " + argv[0])
dir_param_key = "destpath"
dir_param_idx = [item.lower() for item in argv].index(dir_param_key.lower())
dir = argv[dir_param_idx + 1]
if self.cfg["uldir"] is not None and os.path.isdir(self.cfg["uldir"]):
path = self.cfg["uldir"]
if not path.endswith(os.path.sep):
path += os.path.sep
path += os.path.basename(dir)
os.makedirs(path, mode = 0o777, exist_ok = True)
argv[dir_param_idx + 1] = path
return self.__invoke(argv)

def __invoke(self, argv: list) -> bool:
logging.debug("INVOKE: " + argv[0])
#last state depends on the CAPI invocation result
invoke_tmo: int = 10
invoke_running_tmo: int = 5
invoke_result_tmo: int = 45
ret: bool = False
try:
if self.cfg["object_invoke"] is None:
self.cfg["object_invoke"] = Telnet()
self.cfg["object_invoke"].open(host = self.cfg["handle_invoke"].split(":")[0], port = int(self.cfg["handle_invoke"].split(":")[1]))
capi = self.patt["deli_arg"].join(argv) + "\r\n"
self.cfg["object_invoke"].write(bytes(capi, "UTF-8"))
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_tmo)
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_running_tmo)
rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"])
if rsp[0] == "status" and rsp[1] == "RUNNING":
#status running shall be hidden
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_tmo)
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_result_tmo)
rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"])
if len(rsp) >= 2:
self.status["invoked"] = argv[0]
Expand Down Expand Up @@ -214,7 +230,7 @@ def __returned_check(self, argv: list) -> bool:
self.status["verdict"][verdict] += 1
return True

def interpret(self, dir: str = "", fn: str = "", suff: str = "", handle: str = "127.0.0.1:9999", handle_restore: str = "127.0.0.1:69", handle_invoke: str = "127.0.0.1:9999") -> tuple:
def interpret(self, dir: str = "", fn: str = "", suff: str = "", handle: str = "127.0.0.1:9999", handle_restore: str = "127.0.0.1:69", handle_invoke: str = "127.0.0.1:9999", uldir: str = None) -> tuple:
path = dir
if not path.endswith(os.path.sep):
path += os.path.sep
Expand All @@ -226,6 +242,7 @@ def interpret(self, dir: str = "", fn: str = "", suff: str = "", handle: str = "
self.patt["handle"] = handle
self.cfg["handle_restore"] = handle_restore
self.cfg["handle_invoke"] = handle_invoke
self.cfg["uldir"] = uldir
os.makedirs(self.cfg["tmpdir"], mode = 0o777, exist_ok = True)
ret: bool = True
if FakeSniff.is_valid_ip(self.patt["handle"].split(":")[0]) is False:
Expand Down Expand Up @@ -390,6 +407,12 @@ def is_valid_ip(ip) -> bool:
default="fakesniff6-report.txt",
type=str,
help="filename of report after interpreted under auto-mode")
my_parser.add_argument("-u",
"--uploading",
metavar="uploading",
default=None,
type=str,
help="directory for uploading")

args = my_parser.parse_args()

Expand All @@ -411,7 +434,7 @@ def is_valid_ip(ip) -> bool:
if len(hdl.split(":")) >= 2:
handle_restore = args.oriented + ":" + str(69)
handle_invoke = args.oriented + ":" + hdl.split(":")[1]
(ret, stat) = fs.interpret(dir = f, fn = filename, suff = args.suffix, handle = hdl, handle_restore = handle_restore, handle_invoke = handle_invoke)
(ret, stat) = fs.interpret(dir = f, fn = filename, suff = args.suffix, handle = hdl, handle_restore = handle_restore, handle_invoke = handle_invoke, uldir = args.uploading)
print("dir: \"%s\"; fn: \"%s\"; suffix: \"%s\"; state: %s; statistics: %s" % (f, filename, args.suffix, "true" if ret is True else "false", repr(stat)), file = rpt)
print("dir: \"%s\"; fn: \"%s\"; suffix: \"%s\"; state: %s; statistics: %s" % (f, filename, args.suffix, "true" if ret is True else "false", repr(stat)))
fs.reset()
Expand All @@ -423,7 +446,7 @@ def is_valid_ip(ip) -> bool:
handle_restore = args.oriented + ":" + str(69)
handle_invoke = args.oriented + ":" + args.interpreted.split(":")[1]
fs = FakeSniff()
(ret, stat) = fs.interpret(dir = args.directory, fn = args.filename, suff = args.suffix, handle = args.interpreted, handle_restore = handle_restore, handle_invoke = handle_invoke)
(ret, stat) = fs.interpret(dir = args.directory, fn = args.filename, suff = args.suffix, handle = args.interpreted, handle_restore = handle_restore, handle_invoke = handle_invoke, uldir = args.uploading)
print("state: " + repr(ret) + ";" + "statistics: " + repr(stat))

sys.exit(0 if ret is True else 255)
Expand Down

0 comments on commit e232c47

Please sign in to comment.