Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parsing of new snort3 rule types #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 77 additions & 67 deletions idstools/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@

# Rule actions we expect to see.
actions = (
"alert", "config", "log", "pass", "activate", "dynamic", "drop", "reject", "sdrop")
"activate", "alert", "config", "drop", "dynamic", "file_id", "log", "pass",
"reject", "rewrite", "sdrop"
)

# Services
services = (
"dce_http_proxy", "dce_http_server", "dcerpc", "dnp3", "file", "ftp",
"http", "http2", "icmp", "imap", "ip", "modbus", "mms", "netflow",
"netbios-ssn", "pop3", "rpc", "s7commplus", "sip", "smtp", "ssh",
"ssl", "tcp", "telnet", "udp")

class Rule(dict):
"""Class representing a rule.
Expand Down Expand Up @@ -199,6 +208,69 @@ def find_opt_end(options):
else:
return offset + i

def parse_header(header):
"""Parse the rule reader

:param header: A string containing the header of a rule

:returns: A dict with keys action, proto, source_addr, source_port,
direction, dest_addr, dest_port.
"""
info = dict(
action=None,
proto=None,
source_addr=None,
source_port=None,
direction=None,
dest_addr=None,
dest_port=None
)
words = header.split(" ", maxsplit=2)
info["action"] = words[0]
if info["action"] not in actions:
return None
if len(words) == 1:
return info
info["proto"] = words[1]
if info["proto"] not in services:
return None
if len(words) == 2:
return info

states = [
"source_addr",
"source_port",
"direction",
"dest_addr",
"dest_port",
]
state = 0

rem = words[2]
while state < len(states):
if not rem:
return None
if rem[0] == "[":
end = rem.find("]")
if end < 0:
return None
end += 1
token = rem[:end].strip()
rem = rem[end:].strip()
else:
end = rem.find(" ")
if end < 0:
token = rem
rem = ""
else:
token = rem[:end].strip()
rem = rem[end:].strip()

info[states[state]] = token
state += 1

return info

def parse(buf, group=None):
""" Parse a single rule for a string buffer.

Expand All @@ -221,75 +293,13 @@ def parse(buf, group=None):
enabled = True

header = m.group("header").strip()

# If a decoder rule, the header will be one word.
if len(header.split(" ")) == 1:
action = header
proto = None
source_addr = None
source_port = None
direction = None
dest_addr = None
dest_port = None
else:
states = ["action",
"proto",
"source_addr",
"source_port",
"direction",
"dest_addr",
"dest_port",
]
state = 0

rem = header
while state < len(states):
if not rem:
return None
if rem[0] == "[":
end = rem.find("]")
if end < 0:
return
end += 1
token = rem[:end].strip()
rem = rem[end:].strip()
else:
end = rem.find(" ")
if end < 0:
token = rem
rem = ""
else:
token = rem[:end].strip()
rem = rem[end:].strip()

if states[state] == "action":
action = token
elif states[state] == "proto":
proto = token
elif states[state] == "source_addr":
source_addr = token
elif states[state] == "source_port":
source_port = token
elif states[state] == "direction":
direction = token
elif states[state] == "dest_addr":
dest_addr = token
elif states[state] == "dest_port":
dest_port = token

state += 1

if action not in actions:
info = parse_header(header)
if info is None:
return None

rule = Rule(enabled=enabled, action=action, group=group)
rule = Rule(enabled=enabled, action=info["action"], group=group)
rule["header"] = header
rule["proto"] = proto
rule["source_addr"] = source_addr
rule["source_port"] = source_port
rule["direction"] = direction
rule["dest_addr"] = dest_addr
rule["dest_port"] = dest_port
rule.update(info)

options = m.group("options")

Expand Down