diff --git a/idstools/rule.py b/idstools/rule.py index d0af8da..90c20a1 100644 --- a/idstools/rule.py +++ b/idstools/rule.py @@ -53,7 +53,16 @@ # Rule actions we expect to see. actions = ( - "alert", "config", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop") + "activate", "alert", "config", "drop", "dynamic", "file_id", "log", "pass", + "reject", "rewrite", "sdrop" +) + +# Services +services = ( + "dce_http_proxy", "dce_http_server", "dcerpc", "dnp3", "file", "ftp", + "http", "http2", "icmp", "imap", "ip", "modbus", "mms", "netflow", + "netbios-ssn", "pop3", "rpc", "s7commplus", "sip", "smtp", "ssh", + "ssl", "tcp", "telnet", "udp") class Rule(dict): """Class representing a rule. @@ -199,6 +208,69 @@ def find_opt_end(options): else: return offset + i +def parse_header(header): + """Parse the rule reader + + :param header: A string containing the header of a rule + + :returns: A dict with keys action, proto, source_addr, source_port, + direction, dest_addr, dest_port. + """ + info = dict( + action=None, + proto=None, + source_addr=None, + source_port=None, + direction=None, + dest_addr=None, + dest_port=None + ) + words = header.split(" ", maxsplit=2) + info["action"] = words[0] + if info["action"] not in actions: + return None + if len(words) == 1: + return info + info["proto"] = words[1] + if info["proto"] not in services: + return None + if len(words) == 2: + return info + + states = [ + "source_addr", + "source_port", + "direction", + "dest_addr", + "dest_port", + ] + state = 0 + + rem = words[2] + while state < len(states): + if not rem: + return None + if rem[0] == "[": + end = rem.find("]") + if end < 0: + return None + end += 1 + token = rem[:end].strip() + rem = rem[end:].strip() + else: + end = rem.find(" ") + if end < 0: + token = rem + rem = "" + else: + token = rem[:end].strip() + rem = rem[end:].strip() + + info[states[state]] = token + state += 1 + + return info + def parse(buf, group=None): """ Parse a single rule for a string buffer. @@ -221,75 +293,13 @@ def parse(buf, group=None): enabled = True header = m.group("header").strip() - - # If a decoder rule, the header will be one word. - if len(header.split(" ")) == 1: - action = header - proto = None - source_addr = None - source_port = None - direction = None - dest_addr = None - dest_port = None - else: - states = ["action", - "proto", - "source_addr", - "source_port", - "direction", - "dest_addr", - "dest_port", - ] - state = 0 - - rem = header - while state < len(states): - if not rem: - return None - if rem[0] == "[": - end = rem.find("]") - if end < 0: - return - end += 1 - token = rem[:end].strip() - rem = rem[end:].strip() - else: - end = rem.find(" ") - if end < 0: - token = rem - rem = "" - else: - token = rem[:end].strip() - rem = rem[end:].strip() - - if states[state] == "action": - action = token - elif states[state] == "proto": - proto = token - elif states[state] == "source_addr": - source_addr = token - elif states[state] == "source_port": - source_port = token - elif states[state] == "direction": - direction = token - elif states[state] == "dest_addr": - dest_addr = token - elif states[state] == "dest_port": - dest_port = token - - state += 1 - - if action not in actions: + info = parse_header(header) + if info is None: return None - rule = Rule(enabled=enabled, action=action, group=group) + rule = Rule(enabled=enabled, action=info["action"], group=group) rule["header"] = header - rule["proto"] = proto - rule["source_addr"] = source_addr - rule["source_port"] = source_port - rule["direction"] = direction - rule["dest_addr"] = dest_addr - rule["dest_port"] = dest_port + rule.update(info) options = m.group("options")