Skip to content

Commit

Permalink
fix: token replacement for other_mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
dvarasani-crest committed Sep 12, 2024
1 parent 0e876f4 commit 409037f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 74 deletions.
107 changes: 42 additions & 65 deletions pytest_splunk_addon/sample_generation/sample_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,13 @@ def get_token_extractions_count(self, token):
tokens_in_extractions = 0
if (
self.requirement_test_data is not None
and "cim_fields" in self.requirement_test_data.keys()
and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys())
):
for extracted_field in self.requirement_test_data["cim_fields"].values():
if isinstance(extracted_field, str):
tokens_in_extractions += len(re.findall(token, extracted_field))
elif isinstance(extracted_field, list):
for each_filed in extracted_field:
tokens_in_extractions += len(re.findall(token, each_filed))

for extracted_field in self.requirement_test_data.get(
"other_fields", {}
).values():
field_values = [
*self.requirement_test_data.get("cim_fields", {}).values(),
*self.requirement_test_data.get("other_fields", {}).values(),
]
for extracted_field in field_values:
if isinstance(extracted_field, str):
tokens_in_extractions += len(re.findall(token, extracted_field))
elif isinstance(extracted_field, list):
Expand All @@ -324,21 +319,24 @@ def replace_token(self, token, token_values):
sample_tokens = re.finditer(token, self.event, flags=re.MULTILINE)

for _, token_value in enumerate(token_values):
token_value = token_value.value
match_object = next(sample_tokens)
match_str = (
match_object.group(0)
if len(match_object.groups()) == 0
else match_object.group(1)
)
match_str = re.escape(match_str)
self.event = re.sub(
match_str,
lambda x: str(token_value),
self.event,
1,
flags=re.MULTILINE,
)
try:
token_value = token_value.value
match_object = next(sample_tokens)
match_str = (
match_object.group(0)
if len(match_object.groups()) == 0
else match_object.group(1)
)
match_str = re.escape(match_str)
self.event = re.sub(
match_str,
lambda x: str(token_value),
self.event,
1,
flags=re.MULTILINE,
)
except StopIteration:
break
else:
self.event = re.sub(
token, lambda x: str(token_values), self.event, flags=re.MULTILINE
Expand Down Expand Up @@ -368,47 +366,26 @@ def update_requirement_test_field(self, field, token, token_values):
if field != "_time":
if (
self.requirement_test_data is not None
and "cim_fields" in self.requirement_test_data.keys()
and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys())
):
for cim_field, value in self.requirement_test_data[
"cim_fields"
].items():
if token in value:
if isinstance(token_values, list):
if len(token_values) == 1:
self.requirement_test_data["cim_fields"][cim_field] = (
value.replace(token, str(token_values[0].key))
)
else:
self.requirement_test_data["cim_fields"][cim_field] = [
value.replace(token, str(token_value.key))
for token_value in token_values
]
else:
self.requirement_test_data["cim_fields"][cim_field] = (
value.replace(token, str(token_values.key))
)

for cim_field, value in self.requirement_test_data.get(
"other_fields", {}
).items():
if token in value:
if isinstance(token_values, list):
if len(token_values) == 1:
self.requirement_test_data["other_fields"][
cim_field
] = value.replace(token, str(token_values[0].key))
fields_key = ["cim_fields", "other_fields"]
for key in fields_key:
for field_name, value in self.requirement_test_data.get(key, {}).items():
if token in value:
if isinstance(token_values, list):
if len(token_values) == 1:
self.requirement_test_data[key][field_name] = (
value.replace(token, str(token_values[0].key))
)
else:
self.requirement_test_data[key][field_name] = [
value.replace(token, str(token_value.key))
for token_value in token_values
]
else:
self.requirement_test_data["other_fields"][
cim_field
] = [
value.replace(token, str(token_value.key))
for token_value in token_values
]
else:
self.requirement_test_data["other_fields"][cim_field] = (
value.replace(token, str(token_values.key))
)
self.requirement_test_data[key][field_name] = (
value.replace(token, str(token_values.key))
)

def get_key_fields(self):
"""
Expand Down
1 change: 1 addition & 0 deletions tests/e2e/addons/TA_transition_from_req/default/props.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ EVAL-app = "psa"
FIELDALIAS-user = tester AS user
FIELDALIAS-src = ip AS src
EVAL-status = case(action=="success", "PASS", action=="failure", "FAIL", 0==0, "OTHER")
EVAL-access = if(action=="success", "allowed", "denied")
EVAL-vendor_product = "Pytest Splunk Addon"
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ token.3.token = ##result_mapping##
token.3.replacementType = all
token.3.replacement = file[$SPLUNK_HOME/etc/apps/TA_transition_from_req/samples/result_mapping:2]

token.4.token = ##access_mapping##
token.4.replacementType = all
token.4.replacement = file[$SPLUNK_HOME/etc/apps/TA_transition_from_req/samples/result_mapping:3]

#[sample_requirement.xml]
#requirement_test_sample = 1
#sourcetype = juniper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
success,PASS
failure,FAIL
error,OTHER
success,PASS,allowed
failure,FAIL,denied
error,OTHER,denied
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
</cim>
<other_mappings>
<field name="vendor_product" value="Pytest Splunk Addon"/>
<field name="access" value="##access_mapping##" />
</other_mappings>
</event>
</device>
14 changes: 8 additions & 6 deletions tests/e2e/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@
"*test_splunk_app_req.py::Test_App::test_cim_fields_recommended[Authentication-*::sample_name::sample_modinput.xml::* PASSED*",
"*test_splunk_app_req.py::Test_App::test_splunk_internal_errors PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1* PASSED *",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::access* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::action* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::app* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::dest* PASSED*",
Expand All @@ -799,9 +800,10 @@
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::tester* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::user* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::vendor_product* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-4* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-5* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-6* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-7* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-8* PASSED*",
"*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-9* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::access* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::action* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::app* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::dest* PASSED*",
Expand All @@ -813,9 +815,9 @@
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::tester* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::user* PASSED*",
"*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::vendor_product* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-4* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-5* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-6* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-7* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-8* PASSED*",
"*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-9* PASSED*",
'*test_splunk_app_req.py::Test_App::test_tags[eventtype="test_auth"::tag::authentication* PASSED*',
"*test_splunk_app_req.py::Test_App::test_eventtype[eventtype::test_auth* PASSED*",
]
Expand Down

0 comments on commit 409037f

Please sign in to comment.