Skip to content

Commit

Permalink
Revise for generic testbed usage; add a new script for dedicated usag…
Browse files Browse the repository at this point in the history
…e; traffic agent related CAPI is omitted.
  • Loading branch information
WFA-lliu committed Mar 5, 2024
1 parent 18a40ec commit 437ff53
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 25 deletions.
24 changes: 24 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ optional arguments:
directory for uploading
```

<details>
<summary><i>fakecall.py</i>, a companion for generic testbed</summary>

```sh
usage: fakecall.py [-h] [-v] [-d directory] [-f filename] [-i interpreted]
[-o oriented]

CLI argument parsing

optional arguments:
-h, --help show this help message and exit
-v, --verbose verbosity
-d directory, --directory directory
directory of UCC log and capture
-f filename, --filename filename
filename of UCC log
-i interpreted, --interpreted interpreted
interpreted handle
-o oriented, --oriented oriented
oriented IP
```

</details>

## Description:

A utility (tool) to validate the sniffer-agent by existing logs&captures. Test environments are simplified into sniffer-agent and this utility; elapsed time is shortened to the interaction time between sniffer-agent and this utility. This utility is design to be used in preliminary regression-test; this utility is not an alternate of regression-test.
Expand Down
82 changes: 82 additions & 0 deletions fakecall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#! python3
import os
import sys
import argparse
import logging
import re
from fakesniff import FakeSniff

class FakeCall(FakeSniff):
def __init__(self) -> None:
super().__init__()
#variables for pattern matching
self.patt["abort"] = True
self.patt["capi"]["traffic_agent_reset"] = self.__silence
self.patt["capi"]["traffic_agent_config"] = self.__silence
self.patt["capi"]["traffic_agent_receive_start"] = self.__silence
self.patt["capi"]["traffic_agent_receive_stop"] = self.__silence
self.patt["capi"]["traffic_agent_send"] = self.__silence
self.patt["capi"]["traffic_send_ping"] = self.__silence
self.patt["capi"]["traffic_stop_ping"] = self.__silence
#variables for basic configuration
self.cfg["telnet"] = True
self.cfg["tmo_result"] = int(300)
#variables for status (temporary) and statistics (finally)
logging.debug("patt: " + repr(self.patt))
logging.debug("cfg: " + repr(self.cfg))
logging.debug("status: " + repr(self.status))

def __silence(self, argv: list) -> bool:
#TODO: to be refactored
return self._FakeSniff__silence(argv)

if __name__ == "__main__":
my_parser = argparse.ArgumentParser(description="CLI argument parsing")
my_parser.add_argument("-v",
"--verbose",
action="store_true",
help="verbosity")
my_parser.add_argument("-d",
"--directory",
metavar="directory",
default="",
type=str,
help="directory of UCC log and capture")
my_parser.add_argument("-f",
"--filename",
metavar="filename",
default="",
type=str,
help="filename of UCC log")
my_parser.add_argument("-i",
"--interpreted",
metavar="interpreted",
default="192.168.250.6:9999",
type=str,
help="interpreted handle")
my_parser.add_argument("-o",
"--oriented",
metavar="oriented",
default="",
type=str,
help="oriented IP")

args = my_parser.parse_args()

if(args.verbose == True):
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.ERROR)
logging.debug("args: " + repr(args))

ret = True
handle_invoke = None
if len(args.interpreted.split(":")) >= 2:
handle_invoke = args.oriented + ":" + args.interpreted.split(":")[1]
fc = FakeCall()
(ret, stat) = fc.interpret(dir = args.directory, fn = args.filename, handle = args.interpreted, handle_invoke = handle_invoke)
print("state: " + repr(ret) + ";" + "statistics: " + repr(stat))

sys.exit(0 if ret is True else 255)

#FakeSniff6 - by Leo Liu
60 changes: 35 additions & 25 deletions fakesniff.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

class FakeSniff():
def __init__(self) -> None:
#variables for pattern matching
self.patt = dict()
self.patt["deli_req"] = "--->"
self.patt["deli_rsp"] = "<--"
self.patt["deli_rsp"] = "<--\s+"
self.patt["deli_arg"] = ","
self.patt["api_idx"] = int(0)
self.patt["ret_idx"] = int(1)
Expand All @@ -29,14 +30,19 @@ def __init__(self) -> None:
self.patt["capi"]["sniffer_control_upload"] = self.__silence
self.patt["capi"]["sniffer_get_info"] = self.__silence
self.patt["capi_ret"] = self.__returned_check
#variables for basic configuration
self.cfg = dict()
self.cfg["tmpdir"] = "tmp"
self.cfg["uldir"] = None
self.cfg["dec_idx"] = int(0)
self.cfg["reuse"] = True
self.cfg["telnet"] = True
self.cfg["tmo_running"] = int(5)
self.cfg["tmo_result"] = int(45)
self.cfg["dispose"] = False
self.cfg["object_restore"] = None
self.cfg["object_invoke"] = None
#variables for status (temporary) and statistics (finally)
self.status = dict()
self.reset()

Expand Down Expand Up @@ -158,32 +164,36 @@ def __invoke_ul(self, argv: list) -> bool:
def __invoke(self, argv: list) -> bool:
logging.debug("INVOKE: " + argv[0])
#last state depends on the CAPI invocation result
invoke_running_tmo: int = 5
invoke_result_tmo: int = 45
invoke_running_tmo: int = self.cfg["tmo_running"]
invoke_result_tmo: int = self.cfg["tmo_result"]
ret: bool = False
try:
if self.cfg["object_invoke"] is None:
self.cfg["object_invoke"] = Telnet()
self.cfg["object_invoke"].open(host = self.cfg["handle_invoke"].split(":")[0], port = int(self.cfg["handle_invoke"].split(":")[1]))
capi = self.patt["deli_arg"].join(argv) + "\r\n"
self.cfg["object_invoke"].write(bytes(capi, "UTF-8"))
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_running_tmo)
rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"])
if rsp[0] == "status" and rsp[1] == "RUNNING":
#status running shall be hidden
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_result_tmo)
if self.cfg["telnet"] is True:
try:
if self.cfg["object_invoke"] is None:
self.cfg["object_invoke"] = Telnet()
self.cfg["object_invoke"].open(host = self.cfg["handle_invoke"].split(":")[0], port = int(self.cfg["handle_invoke"].split(":")[1]))
capi = self.patt["deli_arg"].join(argv) + "\r\n"
self.cfg["object_invoke"].write(bytes(capi, "UTF-8"))
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_running_tmo)
rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"])
if len(rsp) >= 2:
self.status["invoked"] = argv[0]
self.status["returned"] = rcv.decode("UTF-8").rstrip()
self.status["silenced"] = False
if rsp[0] == "status" and rsp[1] == "RUNNING":
#status running shall be hidden
rcv = self.cfg["object_invoke"].read_until(b"\r\n", invoke_result_tmo)
rsp = rcv.decode("UTF-8").rstrip().split(self.patt["deli_arg"])
if len(rsp) >= 2:
self.status["invoked"] = argv[0]
self.status["returned"] = rcv.decode("UTF-8").rstrip()
self.status["silenced"] = False
ret = True
ret = True
except Exception as e:
logging.exception(e)
finally:
if self.cfg["reuse"] is False:
self.cfg["object_invoke"].close()
self.cfg["object_invoke"] = None
except Exception as e:
logging.exception(e)
finally:
if self.cfg["reuse"] is False:
self.cfg["object_invoke"].close()
self.cfg["object_invoke"] = None
else:
pass
if ret is False:
logging.error("INVOKE: " + argv[0])
if self.patt["abort"] is False:
Expand Down Expand Up @@ -282,7 +292,7 @@ def interpret(self, dir: str = "", fn: str = "", suff: str = "", handle: str = "
else:
break
elif ret_patt_rsp_search is not None:
capi_rsp = line[ret_patt_rsp_search.end()+1:].lstrip().rstrip().split(self.patt["deli_arg"])
capi_rsp = line[ret_patt_rsp_search.end()+0:].lstrip().rstrip().split(self.patt["deli_arg"])
#callback
ret = self.patt["capi_ret"](capi_rsp)
if ret is False:
Expand Down

0 comments on commit 437ff53

Please sign in to comment.