Skip to content

Commit

Permalink
reworking validations
Browse files Browse the repository at this point in the history
  • Loading branch information
ljstella committed Nov 8, 2024
1 parent 9c138f1 commit 11a1ca9
Showing 1 changed file with 110 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Detection_Abstract(SecurityContentObject):
search: str = Field(...)
how_to_implement: str = Field(..., min_length=4)
known_false_positives: str = Field(..., min_length=4)
rba: rba_object = Field(...)
rba: Optional[rba_object] = Field(...)
explanation: None | str = Field(
default=None,
exclude=True, #Don't serialize this value when dumping the object
Expand Down Expand Up @@ -761,81 +761,152 @@ def ensureThrottlingFieldsExist(self):


@model_validator(mode="after")
def ensureProperObservablesExist(self):
def ensureProperRBAConfig(self):
"""
If a detections is PRODUCTION and either TTP or ANOMALY, then it MUST have an Observable with the VICTIM role.
If a detection has an RBA deployment and is PRODUCTION, then it must have an RBA config, with at least one risk object
Returns:
self: Returns itself if the valdiation passes
self: Returns itself if the validation passes
"""

# NOTE: we ignore the type error around self.status because we are using Pydantic's
# use_enum_values configuration
# https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name
if self.status not in [DetectionStatus.production.value]: # type: ignore
# Only perform this validation on production detections
if self.status not in [DetectionStatus.production.value]: # type: ignore
return self

if self.deployment.alert_action.rba is None:
# confirm we don't have an RBA config
if self.rba is None:
return self
else:
raise ValueError(
"Detection does not have a matching RBA deployment config, the RBA portion should be omitted."
)
else:
if len(self.rba.risk_objects) > 0: # type: ignore
return self
else:
raise ValueError(
"Detection expects an RBA config with at least one risk object."
)

if self.type not in [AnalyticsType.TTP.value, AnalyticsType.Anomaly.value]:
# Only perform this validation on TTP and Anomaly detections
return self

# Detection is required to have a victim
roles: list[str] = []
for observable in self.tags.observable:
roles.extend(observable.role)

if roles.count("Victim") == 0:
raise ValueError(
"Error, there must be AT LEAST 1 Observable with the role 'Victim' declared in "
"Detection.tags.observables. However, none were found."
)

# Exactly one victim was found
return self
# TODO - Remove old observable code
# @model_validator(mode="after")
# def ensureProperObservablesExist(self):
# """
# If a detections is PRODUCTION and either TTP or ANOMALY, then it MUST have an Observable with the VICTIM role.

# Returns:
# self: Returns itself if the valdiation passes
# """
# # NOTE: we ignore the type error around self.status because we are using Pydantic's
# # use_enum_values configuration
# # https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name
# if self.status not in [DetectionStatus.production.value]: # type: ignore
# # Only perform this validation on production detections
# return self

# if self.type not in [AnalyticsType.TTP.value, AnalyticsType.Anomaly.value]:
# # Only perform this validation on TTP and Anomaly detections
# return self

# # Detection is required to have a victim
# roles: list[str] = []
# for observable in self.tags.observable:
# roles.extend(observable.role)

# if roles.count("Victim") == 0:
# raise ValueError(
# "Error, there must be AT LEAST 1 Observable with the role 'Victim' declared in "
# "Detection.tags.observables. However, none were found."
# )

# # Exactly one victim was found
# return self

@model_validator(mode="after")
def search_observables_exist_validate(self):
observable_fields = [ob.name.lower() for ob in self.tags.observable]
def search_rba_fields_exist_validate(self):
risk_fields = [ob.field.lower() for ob in self.rba.risk_objects]
threat_fields = [ob.field.lower() for ob in self.rba.threat_objects]
rba_fields = risk_fields + threat_fields

# All $field$ fields from the message must appear in the search
field_match_regex = r"\$([^\s.]*)\$"

missing_fields: set[str]
if self.tags.message:
matches = re.findall(field_match_regex, self.tags.message.lower())
if self.rba.message:
matches = re.findall(field_match_regex, self.rba.message.lower())
message_fields = [match.replace("$", "").lower() for match in matches]
missing_fields = set([field for field in observable_fields if field not in self.search.lower()])
missing_fields = set([field for field in rba_fields if field not in self.search.lower()])
else:
message_fields = []
missing_fields = set()

error_messages: list[str] = []
if len(missing_fields) > 0:
error_messages.append(
"The following fields are declared as observables, but do not exist in the "
"The following fields are declared in the rba config, but do not exist in the "
f"search: {missing_fields}"
)

missing_fields = set([field for field in message_fields if field not in self.search.lower()])
if len(missing_fields) > 0:
error_messages.append(
"The following fields are used as fields in the message, but do not exist in "
f"the search: {missing_fields}"
)

# NOTE: we ignore the type error around self.status because we are using Pydantic's
# use_enum_values configuration
# https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name
if len(error_messages) > 0 and self.status == DetectionStatus.production.value: # type: ignore
if len(error_messages) > 0 and self.status == DetectionStatus.production.value: #type: ignore
msg = (
"Use of fields in observables/messages that do not appear in search:\n\t- "
"Use of fields in rba/messages that do not appear in search:\n\t- "
"\n\t- ".join(error_messages)
)
raise ValueError(msg)

# Found everything
return self

# TODO: Remove old observable code
# @model_validator(mode="after")
# def search_observables_exist_validate(self):
# observable_fields = [ob.name.lower() for ob in self.tags.observable]

# # All $field$ fields from the message must appear in the search
# field_match_regex = r"\$([^\s.]*)\$"

# missing_fields: set[str]
# if self.tags.message:
# matches = re.findall(field_match_regex, self.tags.message.lower())
# message_fields = [match.replace("$", "").lower() for match in matches]
# missing_fields = set([field for field in observable_fields if field not in self.search.lower()])
# else:
# message_fields = []
# missing_fields = set()

# error_messages: list[str] = []
# if len(missing_fields) > 0:
# error_messages.append(
# "The following fields are declared as observables, but do not exist in the "
# f"search: {missing_fields}"
# )

# missing_fields = set([field for field in message_fields if field not in self.search.lower()])
# if len(missing_fields) > 0:
# error_messages.append(
# "The following fields are used as fields in the message, but do not exist in "
# f"the search: {missing_fields}"
# )

# # NOTE: we ignore the type error around self.status because we are using Pydantic's
# # use_enum_values configuration
# # https://docs.pydantic.dev/latest/api/config/#pydantic.config.ConfigDict.populate_by_name
# if len(error_messages) > 0 and self.status == DetectionStatus.production.value: # type: ignore
# msg = (
# "Use of fields in observables/messages that do not appear in search:\n\t- "
# "\n\t- ".join(error_messages)
# )
# raise ValueError(msg)

# # Found everything
# return self

@field_validator("tests", mode="before")
def ensure_yml_test_is_unittest(cls, v:list[dict]):
"""The typing for the tests field allows it to be one of
Expand Down

0 comments on commit 11a1ca9

Please sign in to comment.