diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 1100b72b..34374a88 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -36,7 +36,7 @@ from contentctl.objects.integration_test import IntegrationTest from contentctl.objects.data_source import DataSource from contentctl.objects.base_test_result import TestResultStatus - +from contentctl.objects.drilldown import Drilldown, DRILLDOWN_SEARCH_PLACEHOLDER from contentctl.objects.enums import ProvidingTechnology from contentctl.enrichments.cve_enrichment import CveEnrichmentObj import datetime @@ -90,6 +90,7 @@ class Detection_Abstract(SecurityContentObject): test_groups: list[TestGroup] = [] data_source_objects: list[DataSource] = [] + drilldown_searches: list[Drilldown] = Field(default=[], description="A list of Drilldowns that should be included with this search") def get_conf_stanza_name(self, app:CustomApp)->str: stanza_name = CONTENTCTL_DETECTION_STANZA_NAME_FORMAT_TEMPLATE.format(app_label=app.label, detection_name=self.name) @@ -564,6 +565,46 @@ def model_post_init(self, __context: Any) -> None: # Derive TestGroups and IntegrationTests, adjust for ManualTests, skip as needed self.adjust_tests_and_groups() + # Ensure that if there is at least 1 drilldown, at least + # 1 of the drilldowns contains the string Drilldown.SEARCH_PLACEHOLDER. + # This is presently a requirement when 1 or more drilldowns are added to a detection. + # Note that this is only required for production searches that are not hunting + + if self.type == AnalyticsType.Hunting.value or self.status != DetectionStatus.production.value: + #No additional check need to happen on the potential drilldowns. + pass + else: + found_placeholder = False + if len(self.drilldown_searches) < 2: + raise ValueError(f"This detection is required to have 2 drilldown_searches, but only has [{len(self.drilldown_searches)}]") + for drilldown in self.drilldown_searches: + if DRILLDOWN_SEARCH_PLACEHOLDER in drilldown.search: + found_placeholder = True + if not found_placeholder: + raise ValueError("Detection has one or more drilldown_searches, but none of them " + f"contained '{DRILLDOWN_SEARCH_PLACEHOLDER}. This is a requirement " + "if drilldown_searches are defined.'") + + # Update the search fields with the original search, if required + for drilldown in self.drilldown_searches: + drilldown.perform_search_substitutions(self) + + #For experimental purposes, add the default drilldowns + #self.drilldown_searches.extend(Drilldown.constructDrilldownsFromDetection(self)) + + @property + def drilldowns_in_JSON(self) -> list[dict[str,str]]: + """This function is required for proper JSON + serializiation of drilldowns to occur in savedsearches.conf. + It returns the list[Drilldown] as a list[dict]. + Without this function, the jinja template is unable + to convert list[Drilldown] to JSON + + Returns: + list[dict[str,str]]: List of Drilldowns dumped to dict format + """ + return [drilldown.model_dump() for drilldown in self.drilldown_searches] + @field_validator('lookups', mode="before") @classmethod def getDetectionLookups(cls, v:list[str], info:ValidationInfo) -> list[Lookup]: diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py index c8dce678..b1d489f4 100644 --- a/contentctl/objects/detection_tags.py +++ b/contentctl/objects/detection_tags.py @@ -79,7 +79,7 @@ def severity(self)->RiskSeverity: security_domain: SecurityDomain = Field(...) cve: List[CVE_TYPE] = [] atomic_guid: List[AtomicTest] = [] - drilldown_search: Optional[str] = None + # enrichment mitre_attack_enrichments: List[MitreAttackEnrichment] = Field([], validate_default=True) @@ -114,7 +114,7 @@ def cis20(self) -> list[Cis18Value]: # TODO (#268): Validate manual_test has length > 0 if not None manual_test: Optional[str] = None - + # The following validator is temporarily disabled pending further discussions # @validator('message') # def validate_message(cls,v,values): diff --git a/contentctl/objects/drilldown.py b/contentctl/objects/drilldown.py new file mode 100644 index 00000000..3fe41e7c --- /dev/null +++ b/contentctl/objects/drilldown.py @@ -0,0 +1,70 @@ +from __future__ import annotations +from pydantic import BaseModel, Field, model_serializer +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from contentctl.objects.detection import Detection +from contentctl.objects.enums import AnalyticsType +DRILLDOWN_SEARCH_PLACEHOLDER = "%original_detection_search%" +EARLIEST_OFFSET = "$info_min_time$" +LATEST_OFFSET = "$info_max_time$" +RISK_SEARCH = "index = risk starthoursago = 168 endhoursago = 0 | stats count values(search_name) values(risk_message) values(analyticstories) values(annotations._all) values(annotations.mitre_attack.mitre_tactic) " + +class Drilldown(BaseModel): + name: str = Field(..., description="The name of the drilldown search", min_length=5) + search: str = Field(..., description="The text of a drilldown search. This must be valid SPL.", min_length=1) + earliest_offset:None | str = Field(..., + description="Earliest offset time for the drilldown search. " + f"The most common value for this field is '{EARLIEST_OFFSET}', " + "but it is NOT the default value and must be supplied explicitly.", + min_length= 1) + latest_offset:None | str = Field(..., + description="Latest offset time for the driolldown search. " + f"The most common value for this field is '{LATEST_OFFSET}', " + "but it is NOT the default value and must be supplied explicitly.", + min_length= 1) + + @classmethod + def constructDrilldownsFromDetection(cls, detection: Detection) -> list[Drilldown]: + victim_observables = [o for o in detection.tags.observable if o.role[0] == "Victim"] + if len(victim_observables) == 0 or detection.type == AnalyticsType.Hunting: + # No victims, so no drilldowns + return [] + print(f"Adding default drilldowns for [{detection.name}]") + variableNamesString = ' and '.join([f"${o.name}$" for o in victim_observables]) + nameField = f"View the detection results for {variableNamesString}" + appendedSearch = " | search " + ' '.join([f"{o.name} = ${o.name}$" for o in victim_observables]) + search_field = f"{detection.search}{appendedSearch}" + detection_results = cls(name=nameField, earliest_offset=EARLIEST_OFFSET, latest_offset=LATEST_OFFSET, search=search_field) + + + nameField = f"View risk events for the last 7 days for {variableNamesString}" + fieldNamesListString = ', '.join([o.name for o in victim_observables]) + search_field = f"{RISK_SEARCH}by {fieldNamesListString} {appendedSearch}" + risk_events_last_7_days = cls(name=nameField, earliest_offset=None, latest_offset=None, search=search_field) + + return [detection_results,risk_events_last_7_days] + + + def perform_search_substitutions(self, detection:Detection)->None: + """Replaces the field DRILLDOWN_SEARCH_PLACEHOLDER (%original_detection_search%) + with the search contained in the detection. We do this so that the YML does not + need the search copy/pasted from the search field into the drilldown object. + + Args: + detection (Detection): Detection to be used to update the search field of the drilldown + """ + self.search = self.search.replace(DRILLDOWN_SEARCH_PLACEHOLDER, detection.search) + + + @model_serializer + def serialize_model(self) -> dict[str,str]: + #Call serializer for parent + model:dict[str,str] = {} + + model['name'] = self.name + model['search'] = self.search + if self.earliest_offset is not None: + model['earliest_offset'] = self.earliest_offset + if self.latest_offset is not None: + model['latest_offset'] = self.latest_offset + return model \ No newline at end of file diff --git a/contentctl/output/templates/savedsearches_detections.j2 b/contentctl/output/templates/savedsearches_detections.j2 index f2f345aa..396bb2c6 100644 --- a/contentctl/output/templates/savedsearches_detections.j2 +++ b/contentctl/output/templates/savedsearches_detections.j2 @@ -112,7 +112,8 @@ alert.suppress.fields = {{ detection.tags.throttling.conf_formatted_fields() }} alert.suppress.period = {{ detection.tags.throttling.period }} {% endif %} search = {{ detection.search | escapeNewlines() }} - +action.notable.param.drilldown_searches = {{ detection.drilldowns_in_JSON | tojson | escapeNewlines() }} {% endif %} + {% endfor %} ### END {{ app.label }} DETECTIONS ### diff --git a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml index 1a4af7b1..a101fd7d 100644 --- a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml +++ b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml @@ -29,6 +29,15 @@ references: - https://attack.mitre.org/techniques/T1560/001/ - https://www.microsoft.com/security/blog/2021/01/20/deep-dive-into-the-solorigate-second-stage-activation-from-sunburst-to-teardrop-and-raindrop/ - https://thedfirreport.com/2021/01/31/bazar-no-ryuk/ +drilldown_searches: +- name: View the detection results for $user$ and $dest$ + search: '%original_detection_search% | search user = $user$ dest = $dest$' + earliest_offset: $info_min_time$ + latest_offset: $info_max_time$ +- name: View risk events for the last 7 days for $user$ and $dest$ + search: '| from datamodel Risk.All_Risk | search normalized_risk_object IN ($user$, $dest$) starthoursago=168 endhoursago=1 | stats count min(_time) as firstTime max(_time) as lastTime values(search_name) as "Search Name" values(risk_message) as "Risk Message" values(analyticstories) as "Analytic Stories" values(annotations._all) as "Annotations" values(annotations.mitre_attack.mitre_tactic) as "ATT&CK Tactics" by normalized_risk_object | `security_content_ctime(firstTime)` | `security_content_ctime(lastTime)`' + earliest_offset: $info_min_time$ + latest_offset: $info_max_time$ tags: analytic_story: - Cobalt Strike @@ -80,4 +89,4 @@ tests: attack_data: - data: https://media.githubusercontent.com/media/splunk/attack_data/master/datasets/attack_techniques/T1560.001/archive_utility/windows-sysmon.log source: XmlWinEventLog:Microsoft-Windows-Sysmon/Operational - sourcetype: xmlwineventlog \ No newline at end of file + sourcetype: xmlwineventlog