From 0e876f438bfbab0bca76e22a509bd08cdca093c1 Mon Sep 17 00:00:00 2001 From: Darshan Varasani Date: Wed, 11 Sep 2024 18:45:52 +0530 Subject: [PATCH 1/3] fix: token replacement for other mappings --- .../sample_generation/sample_event.py | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/pytest_splunk_addon/sample_generation/sample_event.py b/pytest_splunk_addon/sample_generation/sample_event.py index 01cbb8680..0531f7080 100644 --- a/pytest_splunk_addon/sample_generation/sample_event.py +++ b/pytest_splunk_addon/sample_generation/sample_event.py @@ -300,6 +300,15 @@ def get_token_extractions_count(self, token): elif isinstance(extracted_field, list): for each_filed in extracted_field: tokens_in_extractions += len(re.findall(token, each_filed)) + + for extracted_field in self.requirement_test_data.get( + "other_fields", {} + ).values(): + if isinstance(extracted_field, str): + tokens_in_extractions += len(re.findall(token, extracted_field)) + elif isinstance(extracted_field, list): + for each_filed in extracted_field: + tokens_in_extractions += len(re.findall(token, each_filed)) return 1 if tokens_in_extractions > 0 else 0 def replace_token(self, token, token_values): @@ -367,18 +376,39 @@ def update_requirement_test_field(self, field, token, token_values): if token in value: if isinstance(token_values, list): if len(token_values) == 1: - self.requirement_test_data["cim_fields"][ + self.requirement_test_data["cim_fields"][cim_field] = ( + value.replace(token, str(token_values[0].key)) + ) + else: + self.requirement_test_data["cim_fields"][cim_field] = [ + value.replace(token, str(token_value.key)) + for token_value in token_values + ] + else: + self.requirement_test_data["cim_fields"][cim_field] = ( + value.replace(token, str(token_values.key)) + ) + + for cim_field, value in self.requirement_test_data.get( + "other_fields", {} + ).items(): + if token in value: + if isinstance(token_values, list): + if len(token_values) == 1: + self.requirement_test_data["other_fields"][ cim_field ] = value.replace(token, str(token_values[0].key)) else: - self.requirement_test_data["cim_fields"][cim_field] = [ + self.requirement_test_data["other_fields"][ + cim_field + ] = [ value.replace(token, str(token_value.key)) for token_value in token_values ] else: - self.requirement_test_data["cim_fields"][ - cim_field - ] = value.replace(token, str(token_values.key)) + self.requirement_test_data["other_fields"][cim_field] = ( + value.replace(token, str(token_values.key)) + ) def get_key_fields(self): """ From 409037fa1effaeab0015f0300d2dc2a2e75a92ec Mon Sep 17 00:00:00 2001 From: Darshan Varasani Date: Thu, 12 Sep 2024 13:29:04 +0530 Subject: [PATCH 2/3] fix: token replacement for other_mappings --- .../sample_generation/sample_event.py | 107 +++++++----------- .../TA_transition_from_req/default/props.conf | 1 + .../default/pytest-splunk-addon-data.conf | 4 + .../samples/result_mapping | 6 +- .../samples/sample_modinput.xml | 1 + tests/e2e/constants.py | 14 ++- 6 files changed, 59 insertions(+), 74 deletions(-) diff --git a/pytest_splunk_addon/sample_generation/sample_event.py b/pytest_splunk_addon/sample_generation/sample_event.py index 0531f7080..232d412a3 100644 --- a/pytest_splunk_addon/sample_generation/sample_event.py +++ b/pytest_splunk_addon/sample_generation/sample_event.py @@ -292,18 +292,13 @@ def get_token_extractions_count(self, token): tokens_in_extractions = 0 if ( self.requirement_test_data is not None - and "cim_fields" in self.requirement_test_data.keys() + and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys()) ): - for extracted_field in self.requirement_test_data["cim_fields"].values(): - if isinstance(extracted_field, str): - tokens_in_extractions += len(re.findall(token, extracted_field)) - elif isinstance(extracted_field, list): - for each_filed in extracted_field: - tokens_in_extractions += len(re.findall(token, each_filed)) - - for extracted_field in self.requirement_test_data.get( - "other_fields", {} - ).values(): + field_values = [ + *self.requirement_test_data.get("cim_fields", {}).values(), + *self.requirement_test_data.get("other_fields", {}).values(), + ] + for extracted_field in field_values: if isinstance(extracted_field, str): tokens_in_extractions += len(re.findall(token, extracted_field)) elif isinstance(extracted_field, list): @@ -324,21 +319,24 @@ def replace_token(self, token, token_values): sample_tokens = re.finditer(token, self.event, flags=re.MULTILINE) for _, token_value in enumerate(token_values): - token_value = token_value.value - match_object = next(sample_tokens) - match_str = ( - match_object.group(0) - if len(match_object.groups()) == 0 - else match_object.group(1) - ) - match_str = re.escape(match_str) - self.event = re.sub( - match_str, - lambda x: str(token_value), - self.event, - 1, - flags=re.MULTILINE, - ) + try: + token_value = token_value.value + match_object = next(sample_tokens) + match_str = ( + match_object.group(0) + if len(match_object.groups()) == 0 + else match_object.group(1) + ) + match_str = re.escape(match_str) + self.event = re.sub( + match_str, + lambda x: str(token_value), + self.event, + 1, + flags=re.MULTILINE, + ) + except StopIteration: + break else: self.event = re.sub( token, lambda x: str(token_values), self.event, flags=re.MULTILINE @@ -368,47 +366,26 @@ def update_requirement_test_field(self, field, token, token_values): if field != "_time": if ( self.requirement_test_data is not None - and "cim_fields" in self.requirement_test_data.keys() + and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys()) ): - for cim_field, value in self.requirement_test_data[ - "cim_fields" - ].items(): - if token in value: - if isinstance(token_values, list): - if len(token_values) == 1: - self.requirement_test_data["cim_fields"][cim_field] = ( - value.replace(token, str(token_values[0].key)) - ) - else: - self.requirement_test_data["cim_fields"][cim_field] = [ - value.replace(token, str(token_value.key)) - for token_value in token_values - ] - else: - self.requirement_test_data["cim_fields"][cim_field] = ( - value.replace(token, str(token_values.key)) - ) - - for cim_field, value in self.requirement_test_data.get( - "other_fields", {} - ).items(): - if token in value: - if isinstance(token_values, list): - if len(token_values) == 1: - self.requirement_test_data["other_fields"][ - cim_field - ] = value.replace(token, str(token_values[0].key)) + fields_key = ["cim_fields", "other_fields"] + for key in fields_key: + for field_name, value in self.requirement_test_data.get(key, {}).items(): + if token in value: + if isinstance(token_values, list): + if len(token_values) == 1: + self.requirement_test_data[key][field_name] = ( + value.replace(token, str(token_values[0].key)) + ) + else: + self.requirement_test_data[key][field_name] = [ + value.replace(token, str(token_value.key)) + for token_value in token_values + ] else: - self.requirement_test_data["other_fields"][ - cim_field - ] = [ - value.replace(token, str(token_value.key)) - for token_value in token_values - ] - else: - self.requirement_test_data["other_fields"][cim_field] = ( - value.replace(token, str(token_values.key)) - ) + self.requirement_test_data[key][field_name] = ( + value.replace(token, str(token_values.key)) + ) def get_key_fields(self): """ diff --git a/tests/e2e/addons/TA_transition_from_req/default/props.conf b/tests/e2e/addons/TA_transition_from_req/default/props.conf index ee1bf418e..6e427554b 100644 --- a/tests/e2e/addons/TA_transition_from_req/default/props.conf +++ b/tests/e2e/addons/TA_transition_from_req/default/props.conf @@ -11,4 +11,5 @@ EVAL-app = "psa" FIELDALIAS-user = tester AS user FIELDALIAS-src = ip AS src EVAL-status = case(action=="success", "PASS", action=="failure", "FAIL", 0==0, "OTHER") +EVAL-access = if(action=="success", "allowed", "denied") EVAL-vendor_product = "Pytest Splunk Addon" \ No newline at end of file diff --git a/tests/e2e/addons/TA_transition_from_req/default/pytest-splunk-addon-data.conf b/tests/e2e/addons/TA_transition_from_req/default/pytest-splunk-addon-data.conf index 8149ba975..c2cb4a8c0 100644 --- a/tests/e2e/addons/TA_transition_from_req/default/pytest-splunk-addon-data.conf +++ b/tests/e2e/addons/TA_transition_from_req/default/pytest-splunk-addon-data.conf @@ -29,6 +29,10 @@ token.3.token = ##result_mapping## token.3.replacementType = all token.3.replacement = file[$SPLUNK_HOME/etc/apps/TA_transition_from_req/samples/result_mapping:2] +token.4.token = ##access_mapping## +token.4.replacementType = all +token.4.replacement = file[$SPLUNK_HOME/etc/apps/TA_transition_from_req/samples/result_mapping:3] + #[sample_requirement.xml] #requirement_test_sample = 1 #sourcetype = juniper diff --git a/tests/e2e/addons/TA_transition_from_req/samples/result_mapping b/tests/e2e/addons/TA_transition_from_req/samples/result_mapping index d7370d211..1390b04ca 100644 --- a/tests/e2e/addons/TA_transition_from_req/samples/result_mapping +++ b/tests/e2e/addons/TA_transition_from_req/samples/result_mapping @@ -1,3 +1,3 @@ -success,PASS -failure,FAIL -error,OTHER \ No newline at end of file +success,PASS,allowed +failure,FAIL,denied +error,OTHER,denied \ No newline at end of file diff --git a/tests/e2e/addons/TA_transition_from_req/samples/sample_modinput.xml b/tests/e2e/addons/TA_transition_from_req/samples/sample_modinput.xml index 1a1b2369a..8689842d7 100644 --- a/tests/e2e/addons/TA_transition_from_req/samples/sample_modinput.xml +++ b/tests/e2e/addons/TA_transition_from_req/samples/sample_modinput.xml @@ -28,6 +28,7 @@ + \ No newline at end of file diff --git a/tests/e2e/constants.py b/tests/e2e/constants.py index 403c1910b..c5c5177ec 100644 --- a/tests/e2e/constants.py +++ b/tests/e2e/constants.py @@ -788,6 +788,7 @@ "*test_splunk_app_req.py::Test_App::test_cim_fields_recommended[Authentication-*::sample_name::sample_modinput.xml::* PASSED*", "*test_splunk_app_req.py::Test_App::test_splunk_internal_errors PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1* PASSED *", + "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::access* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::action* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::app* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::dest* PASSED*", @@ -799,9 +800,10 @@ "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::tester* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::user* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields[test:data:1::field::vendor_product* PASSED*", - "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-4* PASSED*", - "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-5* PASSED*", - "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-6* PASSED*", + "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-7* PASSED*", + "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-8* PASSED*", + "*test_splunk_app_req.py::Test_App::test_requirements_fields[sample_name::sample_modinput.xml::host::so1-9* PASSED*", + "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::access* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::action* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::app* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::dest* PASSED*", @@ -813,9 +815,9 @@ "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::tester* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::user* PASSED*", "*test_splunk_app_req.py::Test_App::test_props_fields_no_dash_not_empty[test:data:1::field::vendor_product* PASSED*", - "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-4* PASSED*", - "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-5* PASSED*", - "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-6* PASSED*", + "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-7* PASSED*", + "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-8* PASSED*", + "*test_splunk_app_req.py::Test_App::test_datamodels[Authentication::sample_name::sample_modinput.xml::host::so1-9* PASSED*", '*test_splunk_app_req.py::Test_App::test_tags[eventtype="test_auth"::tag::authentication* PASSED*', "*test_splunk_app_req.py::Test_App::test_eventtype[eventtype::test_auth* PASSED*", ] From 6d2adca073d3ff0251621c27a0f6c7b2c08a09d7 Mon Sep 17 00:00:00 2001 From: Darshan Varasani Date: Thu, 12 Sep 2024 13:38:23 +0530 Subject: [PATCH 3/3] chore: fix lint --- .../sample_generation/sample_event.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pytest_splunk_addon/sample_generation/sample_event.py b/pytest_splunk_addon/sample_generation/sample_event.py index 232d412a3..9bea2af71 100644 --- a/pytest_splunk_addon/sample_generation/sample_event.py +++ b/pytest_splunk_addon/sample_generation/sample_event.py @@ -290,9 +290,9 @@ def get_token_extractions_count(self, token): token (str): Token name """ tokens_in_extractions = 0 - if ( - self.requirement_test_data is not None - and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys()) + if self.requirement_test_data is not None and ( + "cim_fields" in self.requirement_test_data.keys() + or "other_fields" in self.requirement_test_data.keys() ): field_values = [ *self.requirement_test_data.get("cim_fields", {}).values(), @@ -364,28 +364,30 @@ def register_field_value(self, field, token_values): def update_requirement_test_field(self, field, token, token_values): if field != "_time": - if ( - self.requirement_test_data is not None - and ("cim_fields" in self.requirement_test_data.keys() or "other_fields" in self.requirement_test_data.keys()) + if self.requirement_test_data is not None and ( + "cim_fields" in self.requirement_test_data.keys() + or "other_fields" in self.requirement_test_data.keys() ): fields_key = ["cim_fields", "other_fields"] for key in fields_key: - for field_name, value in self.requirement_test_data.get(key, {}).items(): + for field_name, value in self.requirement_test_data.get( + key, {} + ).items(): if token in value: if isinstance(token_values, list): if len(token_values) == 1: - self.requirement_test_data[key][field_name] = ( - value.replace(token, str(token_values[0].key)) - ) + self.requirement_test_data[key][ + field_name + ] = value.replace(token, str(token_values[0].key)) else: self.requirement_test_data[key][field_name] = [ value.replace(token, str(token_value.key)) for token_value in token_values ] else: - self.requirement_test_data[key][field_name] = ( - value.replace(token, str(token_values.key)) - ) + self.requirement_test_data[key][ + field_name + ] = value.replace(token, str(token_values.key)) def get_key_fields(self): """