From 8fce894a6211ce1bd3f9250148e5c95c7adc31a1 Mon Sep 17 00:00:00 2001 From: Leo Liu Date: Sat, 21 Oct 2023 23:58:31 +0800 Subject: [PATCH] Add prototype code. --- fakesniff.py | 345 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 fakesniff.py diff --git a/fakesniff.py b/fakesniff.py new file mode 100644 index 0000000..67f5a44 --- /dev/null +++ b/fakesniff.py @@ -0,0 +1,345 @@ +#! python3 +import os +import sys +import argparse +import logging +import re +import tarfile +import gzip +import shutil +from ptftplib.tftpclient import TFTPClient +from ptftplib import proto +from ptftplib import notify +from telnetlib import Telnet + +class FakeSniff(): + def __init__(self) -> None: + self.patt = dict() + self.patt["deli_req"] = "--->" + self.patt["deli_rsp"] = "<--" + self.patt["deli_arg"] = "," + self.patt["api_idx"] = int(0) + self.patt["ret_idx"] = int(1) + self.patt["abort"] = True + self.patt["capi"] = dict() + self.patt["capi"]["*"] = self.__invoke + self.patt["capi"]["sniffer_control_start"] = self.__restore + #self.patt["capi"]["sniffer_decrypt_trace"] = self.__silence + self.patt["capi"]["sniffer_control_stop"] = self.__silence + self.patt["capi"]["sniffer_control_upload"] = self.__silence + self.patt["capi"]["sniffer_get_info"] = self.__silence + self.patt["capi_ret"] = self.__returned_check + self.status = dict() + self.status["silenced"] = True + self.status["invoked"] = "" + self.status["returned"] = "status" + self.patt["deli_arg"] + "COMPLETE" + self.status["verdict"] = dict() + self.status["verdict"]["consistent"] = int(0) + self.status["verdict"]["inconsistent"] = int(0) + self.status["verdict"]["malformed"] = int(0) + self.status["verdict"]["omitted"] = int(0) + self.cfg = dict() + self.cfg["tmpdir"] = "tmp" + self.cfg["dec_idx"] = int(0) + self.cfg["reuse"] = True + self.cfg["dispose"] = False + self.cfg["object_restore"] = None + self.cfg["object_invoke"] = None + + def __silence(self, argv: list) -> bool: + logging.debug("SILENCE: " + argv[0]) + #force last state as fine + self.status["invoked"] = argv[0] + self.status["returned"] = "status" + self.patt["deli_arg"] + "COMPLETE" + self.status["silenced"] = True + return True + + def __restore(self, argv: list) -> bool: + logging.debug("RESTORE: " + argv[0]) + #last state depends on the SCP result + ret: bool = False + fn_param_key = "filename" + fn_param_idx = [item.lower() for item in argv].index(fn_param_key.lower()) + #compressed file is expected + fn = argv[fn_param_idx + 1] + path = self.cfg["dir"] + if not path.endswith(os.path.sep): + path += os.path.sep + path += fn + path += self.cfg["suff"] + logging.debug("path: " + path) + path_rmt = None + path_lcl = None + if os.path.isfile(path): + avail: bool = False + if self.cfg["suff"] == ".pcapng.gz": + path_lcl = self.cfg["tmpdir"] + os.path.sep + fn + with gzip.open(path, "rb") as fnc: + with open(path_lcl, "wb") as fnct: + shutil.copyfileobj(fnc, fnct) + path_rmt = fn + avail = True + elif self.cfg["suff"] == ".tar.gz": + fnc = tarfile.open(path) + if len(fnc.getnames()) > 0: + fnc.extractall(self.cfg["tmpdir"]) + path_lcl = self.cfg["tmpdir"] + os.path.sep + fnc.getnames()[self.cfg["dec_idx"]] + path_rmt = os.path.basename(fnc.getnames()[self.cfg["dec_idx"]]) + fnc.close() + avail = True + else: + pass + if avail is True: + logging.debug("path_rmt: " + path_rmt) + logging.debug("path_lcl: " + path_lcl) + if os.path.isfile(path_lcl): + #ready to put file by TFTP client + cwd = os.getcwd() + nwd = os.path.dirname(path_lcl) + os.chdir(nwd) + try: + if self.cfg["object_restore"] is None: + hdl = (self.cfg["handle_restore"].split(":")[0], int(self.cfg["handle_restore"].split(":")[1])) + exts = {proto.TFTP_OPTION_WINDOWSIZE: int(1), proto.TFTP_OPTION_BLKSIZE: int(1024)} + self.cfg["object_restore"] = TFTPClient(peer = hdl, opts = exts, mode = "octet", rfc1350 = False) + self.cfg["object_restore"].connect() + l = notify.getLogger('tftp-proto') + #l.setLevel(logging.root.level) + l.setLevel(logging.ERROR) + else: + pass + args = [os.path.basename(path_lcl)] + ret = self.cfg["object_restore"].put(args) + except Exception as e: + logging.exception(e) + finally: + if self.cfg["reuse"] is False: + self.cfg["object_restore"].finish() + self.cfg["object_restore"] = None + os.chdir(cwd) + logging.debug("cwd: " + os.getcwd()) + self.status["invoked"] = argv[0] + self.status["returned"] = "status" + self.patt["deli_arg"] + "COMPLETE" + self.status["silenced"] = False + else: + ret = False + if ret is False: + logging.error("RESTORE: " + fn) + if self.patt["abort"] is False: + ret = True + else: + pass + else: + pass + return ret + + def __invoke(self, argv: list) -> bool: + logging.debug("INVOKE: " + argv[0]) + #last state depends on the CAPI invocation result + invoke_tmo: int = 10 + ret: bool = False + try: + if self.cfg["object_invoke"] is None: + self.cfg["object_invoke"] = Telnet() + self.cfg["object_invoke"].open(host = self.cfg["handle_invoke"].split(":")[0], port = int(self.cfg["handle_invoke"].split(":")[1])) + capi = self.patt["deli_arg"].join(argv) + "\r\n" + self.cfg["object_invoke"].write(bytes(capi, "UTF-8")) + rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_tmo) + rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"]) + if rsp[0] == "status" and rsp[1] == "RUNNING": + #status running shall be hidden + rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_tmo) + rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"]) + if len(rsp) >= 2: + self.status["invoked"] = argv[0] + self.status["returned"] = rcv.decode("UTF-8").rstrip() + self.status["silenced"] = False + ret = True + except Exception as e: + logging.exception(e) + finally: + if self.cfg["reuse"] is False: + self.cfg["object_invoke"].close() + self.cfg["object_invoke"] = None + if ret is False: + logging.error("INVOKE: " + argv[0]) + if self.patt["abort"] is False: + ret = True + else: + pass + return ret + + def __returned_check(self, argv: list) -> bool: + logging.debug("RETURNED CHECK: " + argv[1]) + #force last state as fine + capi_req = self.status["invoked"].split(self.patt["deli_arg"]) + capi_rsp = self.status["returned"].split(self.patt["deli_arg"]) + verdict: str = "omitted" + if self.status["silenced"] is False: + argc = len(argv) + if argc == len(capi_rsp): + if argv[0] == capi_rsp[0]: + if argv[1] == capi_rsp[1]: + if argc >= 4: + if argv[2] == capi_rsp[2]: + if argv[3] == capi_rsp[3]: + verdict = "consistent" + else: + verdict = "inconsistent" + else: + verdict = "malformed" + else: + if argc == 2: + verdict = "consistent" + else: + verdict = "malformed" + else: + verdict = "inconsistent" + else: + verdict = "malformed" + else: + verdict = "malformed" + else: + verdict = "omitted" + if verdict == "consistent": + logging.info("capi: " + capi_req[0] + ";" + "result: " + verdict) + else: + logging.info("invoked: " + self.status["invoked"] + ";") + logging.info("returned: " + self.status["returned"] + ";") + logging.info("argv: " + self.patt["deli_arg"].join(argv) + ";") + logging.info("result: " + verdict) + self.status["verdict"][verdict] += 1 + return True + + def interpret(self, dir: str = "", fn: str = "", suff: str = "", handle: str = "127.0.0.1:9999", handle_restore: str = "127.0.0.1:69", handle_invoke: str = "127.0.0.1:9999") -> bool: + path = dir + if not path.endswith(os.path.sep): + path += os.path.sep + path += fn + logging.debug("PATH: " + path) + self.cfg["dir"] = dir + self.cfg["fn"] = fn + self.cfg["suff"] = suff + self.patt["handle"] = handle + self.cfg["handle_restore"] = handle_restore + self.cfg["handle_invoke"] = handle_invoke + os.makedirs(self.cfg["tmpdir"], mode = 0o777, exist_ok = True) + ret: bool = True + if FakeSniff.is_valid_ip(self.patt["handle"].split(":")[0]) is False: + ret = False + elif FakeSniff.is_valid_ip(self.cfg["handle_restore"].split(":")[0]) is False: + ret = False + elif FakeSniff.is_valid_ip(self.cfg["handle_invoke"].split(":")[0]) is False: + ret = False + else: + pass + if ret is True: + with open(path) as file: + patt_req_search = self.patt["handle"] + ".*" + self.patt["deli_req"] + patt_rsp_search = self.patt["handle"] + ".*" + self.patt["deli_rsp"] + logging.debug("patt_req_search: " + patt_req_search) + logging.debug("patt_rsp_search: " + patt_rsp_search) + for line in file: + ret_patt_req_search = re.search(patt_req_search, line) + ret_patt_rsp_search = re.search(patt_rsp_search, line) + if ret_patt_req_search is not None: + capi_req = line[ret_patt_req_search.end()+1:].rstrip().split(self.patt["deli_arg"]) + capi = capi_req[self.patt["api_idx"]].strip() + if capi in self.patt["capi"]: + #callback specific/hit + ret = self.patt["capi"][capi](capi_req[0::]) + else: + #callback wildcard/fall-through + ret = self.patt["capi"]["*"](capi_req[0::]) + if ret is False: + logging.error("REQ: " + capi_req[self.patt["api_idx"]].strip()) + if self.patt["abort"] is False: + ret = True + else: + break + elif ret_patt_rsp_search is not None: + capi_rsp = line[ret_patt_rsp_search.end()+1:].rstrip().split(self.patt["deli_arg"]) + #callback + ret = self.patt["capi_ret"](capi_rsp) + if ret is False: + logging.error("RSP: " + capi_rsp[self.patt["ret_idx"]].strip()) + if self.patt["abort"] is False: + ret = True + else: + break + else: + pass + if self.cfg["object_restore"] is not None: + self.cfg["object_restore"].finish() + self.cfg["object_restore"] = None + if self.cfg["object_invoke"] is not None: + self.cfg["object_invoke"].close() + self.cfg["object_invoke"] = None + if self.cfg["dispose"] is True: + try: + shutil.rmtree(self.cfg["tmpdir"]) + except Exception as e: + logging.exception(e) + return ret + + @staticmethod + def is_valid_ip(ip) -> bool: + m = re.match(r"^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$", ip) + return bool(m) and all(map(lambda n: 0 <= int(n) <= 255, m.groups())) + + +if __name__ == "__main__": + my_parser = argparse.ArgumentParser(description="CLI argument parsing") + my_parser.add_argument("-v", + "--verbose", + action="store_true", + help="verbosity") + my_parser.add_argument("-d", + "--directory", + metavar="directory", + default="", + type=str, + help="directory of UCC log and capture") + my_parser.add_argument("-f", + "--filename", + metavar="filename", + default="", + type=str, + help="filename of UCC log") + my_parser.add_argument("-s", + "--suffix", + metavar="suffix", + default=".pcapng.gz", + type=str, + help="suffix of capture") + my_parser.add_argument("-i", + "--interpreted", + metavar="interpreted", + default="192.168.250.6:9999", + type=str, + help="interpreted handle") + my_parser.add_argument("-o", + "--oriented", + metavar="oriented", + default="192.168.250.66", + type=str, + help="oriented IP") + + args = my_parser.parse_args() + + if(args.verbose == True): + logging.basicConfig(level=logging.INFO) + else: + logging.basicConfig(level=logging.ERROR) + logging.debug("args: " + repr(args)) + handle_restore = None + handle_invoke = None + if len(args.interpreted.split(":")) >= 2: + handle_restore = args.oriented + ":" + str(69) + handle_invoke = args.oriented + ":" + args.interpreted.split(":")[1] + fs = FakeSniff() + ret = fs.interpret(dir = args.directory, fn = args.filename, suff = args.suffix, handle = "192.168.250.96:9999", handle_restore = handle_restore, handle_invoke = handle_invoke) + print("state: " + repr(ret) + ";" + "statistics: " + repr(fs.status["verdict"])) + sys.exit(0 if ret is True else 255) + +#FakeSniff6 - by Leo Liu