diff --git a/docs/mitre_attack_data/examples.rst b/docs/mitre_attack_data/examples.rst index 2330ef8d..8c3e4f4e 100644 --- a/docs/mitre_attack_data/examples.rst +++ b/docs/mitre_attack_data/examples.rst @@ -13,9 +13,10 @@ Getting An ATT&CK Object * `get_object_by_stix_id.py `_ * `get_object_by_attack_id.py `_ -* `get_object_by_name.py `_ -* `get_group_by_alias.py `_ +* `get_objects_by_name.py `_ +* `get_groups_by_alias.py `_ * `get_software_by_alias.py `_ +* `get_campaigns_by_alias.py `_ * `get_stix_type.py `_ * `get_attack_id.py `_ * `get_name.py `_ @@ -41,6 +42,7 @@ Getting Multiple ATT&CK Objects * `get_techniques_by_platform.py `_ * `get_objects_by_content.py `_ * `get_objects_created_after.py `_ +* `get_objects_modified_after.py `_ Related Objects ------------------- @@ -108,4 +110,3 @@ Campaign:Group Relationships * `get_groups_attributing_to_campaign.py `_ * `get_all_campaigns_attributed_to_all_groups.py `_ * `get_campaigns_attributed_to_group.py `_ - diff --git a/docs/mitre_attack_data/mitre_attack_data.rst b/docs/mitre_attack_data/mitre_attack_data.rst index fc03809a..ac1ab413 100644 --- a/docs/mitre_attack_data/mitre_attack_data.rst +++ b/docs/mitre_attack_data/mitre_attack_data.rst @@ -57,7 +57,7 @@ a lookup table of STIX ID to related objects and relationships. # [ # { # "object": Malware, # S0061 - # "relationship": Relationship # relationship between G0019 and S0061 + # "relationships": Relationship[] # relationships between G0019 and S0061 # }, # { # ... @@ -79,8 +79,7 @@ by ATT&CK. mitigations = mitre_attack_data.get_mitigations(remove_revoked_deprecated=True) -To remove revoked and deprecated objects from the results of a method without a parameter to -automatically remove revoked and deprecated objects: +To separately remove revoked and deprecated objects from the results of a method: .. code-block:: python diff --git a/docs/notice.rst b/docs/notice.rst index 46ecf9aa..9751230f 100644 --- a/docs/notice.rst +++ b/docs/notice.rst @@ -1,6 +1,6 @@ Notice ============================================== -Copyright 2022 The MITRE Corporation +Copyright 2023 The MITRE Corporation Approved for Public Release; Distribution Unlimited. Case Number 19-0486. diff --git a/examples/get_all_mitigations_mitigating_all_techniques.py b/examples/get_all_mitigations_mitigating_all_techniques.py index 78048e9c..3ee5cf55 100644 --- a/examples/get_all_mitigations_mitigating_all_techniques.py +++ b/examples/get_all_mitigations_mitigating_all_techniques.py @@ -9,7 +9,7 @@ def main(): print(f"Mitigations mitigating techniques ({len(mitigations_mitigating.keys())} techniques):") for id, mitigations in mitigations_mitigating.items(): - print(f"* {id} - mitigated by {len(mitigations)} {'mitigation' if len(mitigations) == 1 else 'mitigation'}") + print(f"* {id} - mitigated by {len(mitigations)} {'mitigation' if len(mitigations) == 1 else 'mitigations'}") if __name__ == "__main__": diff --git a/examples/get_campaigns_by_alias.py b/examples/get_campaigns_by_alias.py new file mode 100644 index 00000000..c37de79c --- /dev/null +++ b/examples/get_campaigns_by_alias.py @@ -0,0 +1,14 @@ +from mitreattack.stix20 import MitreAttackData + + +def main(): + mitre_attack_data = MitreAttackData("enterprise-attack.json") + + campaigns = mitre_attack_data.get_campaigns_by_alias("Frankenstein") + + for campaign in campaigns: + print(f"{campaign.name} ({mitre_attack_data.get_attack_id(campaign.id)})") + + +if __name__ == "__main__": + main() diff --git a/examples/get_group_by_alias.py b/examples/get_group_by_alias.py deleted file mode 100644 index 62ee0f43..00000000 --- a/examples/get_group_by_alias.py +++ /dev/null @@ -1,13 +0,0 @@ -from mitreattack.stix20 import MitreAttackData - - -def main(): - mitre_attack_data = MitreAttackData("enterprise-attack.json") - - G0016 = mitre_attack_data.get_group_by_alias("Cozy Bear") - - print(G0016.serialize(pretty=True)) - - -if __name__ == "__main__": - main() diff --git a/examples/get_campaign_by_alias.py b/examples/get_groups_by_alias.py similarity index 50% rename from examples/get_campaign_by_alias.py rename to examples/get_groups_by_alias.py index fe4473f6..6ffce3a0 100644 --- a/examples/get_campaign_by_alias.py +++ b/examples/get_groups_by_alias.py @@ -4,9 +4,10 @@ def main(): mitre_attack_data = MitreAttackData("enterprise-attack.json") - C0001 = mitre_attack_data.get_campaign_by_alias("Frankenstein") + groups = mitre_attack_data.get_groups_by_alias("Cozy Bear") - print(C0001.serialize(pretty=True)) + for group in groups: + print(f"{group.name} ({mitre_attack_data.get_attack_id(group.id)})") if __name__ == "__main__": diff --git a/examples/get_object_by_name.py b/examples/get_object_by_name.py deleted file mode 100644 index 8f2879d6..00000000 --- a/examples/get_object_by_name.py +++ /dev/null @@ -1,13 +0,0 @@ -from mitreattack.stix20 import MitreAttackData - - -def main(): - mitre_attack_data = MitreAttackData("enterprise-attack.json") - - T1082 = mitre_attack_data.get_object_by_name("System Information Discovery", "attack-pattern") - - print(T1082.serialize(pretty=True)) - - -if __name__ == "__main__": - main() diff --git a/examples/get_objects_by_name.py b/examples/get_objects_by_name.py new file mode 100644 index 00000000..4c93485f --- /dev/null +++ b/examples/get_objects_by_name.py @@ -0,0 +1,14 @@ +from mitreattack.stix20 import MitreAttackData + + +def main(): + mitre_attack_data = MitreAttackData("enterprise-attack.json") + + techniques = mitre_attack_data.get_objects_by_name("System Information Discovery", "attack-pattern") + + for technique in techniques: + print(technique.serialize(pretty=True)) + + +if __name__ == "__main__": + main() diff --git a/examples/get_objects_created_after.py b/examples/get_objects_created_after.py index c61fe322..358cceb4 100644 --- a/examples/get_objects_created_after.py +++ b/examples/get_objects_created_after.py @@ -6,7 +6,7 @@ def main(): objects = mitre_attack_data.get_objects_created_after("2022-10-01T00:00:00.000Z") - print(f"There were {len(objects)} objects created after 1 October 2022") + print(f"There are {len(objects)} objects created after 1 October 2022") if __name__ == "__main__": diff --git a/examples/get_objects_modified_after.py b/examples/get_objects_modified_after.py index c9ff86cf..4ec9f258 100644 --- a/examples/get_objects_modified_after.py +++ b/examples/get_objects_modified_after.py @@ -7,7 +7,7 @@ def main(): date = "2022-10-01" objects = mitre_attack_data.get_objects_modified_after(date) - print(f"There were {len(objects)} objects modified after {date}") + print(f"There are {len(objects)} objects modified after {date}") if __name__ == "__main__": diff --git a/examples/get_procedure_examples_by_tactic.py b/examples/get_procedure_examples_by_tactic.py index f76d97c9..89e9264e 100644 --- a/examples/get_procedure_examples_by_tactic.py +++ b/examples/get_procedure_examples_by_tactic.py @@ -6,7 +6,7 @@ def print_procedure_examples(mitre_attack_data, attack_objects_using_technique): stix_object = attack_object["object"] attack_id = mitre_attack_data.get_attack_id(stix_id=stix_object["id"]) name = stix_object["name"] - procedure_description = attack_object["relationship"].get("description") + procedure_description = attack_object["relationships"][0].get("description") print(f"[{attack_id}] {name}: {procedure_description}") diff --git a/examples/get_software_by_alias.py b/examples/get_software_by_alias.py index c4ec6ed5..81082890 100644 --- a/examples/get_software_by_alias.py +++ b/examples/get_software_by_alias.py @@ -4,9 +4,10 @@ def main(): mitre_attack_data = MitreAttackData("enterprise-attack.json") - S0196 = mitre_attack_data.get_software_by_alias("ShellTea") + software = mitre_attack_data.get_software_by_alias("ShellTea") - print(S0196.serialize(pretty=True)) + for s in software: + print(f"{s.name} ({mitre_attack_data.get_attack_id(s.id)})") if __name__ == "__main__": diff --git a/mitreattack/stix20/MitreAttackData.py b/mitreattack/stix20/MitreAttackData.py index dcc613cc..16af97bd 100644 --- a/mitreattack/stix20/MitreAttackData.py +++ b/mitreattack/stix20/MitreAttackData.py @@ -358,8 +358,8 @@ def get_techniques_by_platform(self, platform: str, remove_revoked_deprecated=Fa list a list of AttackPattern objects under the given platform """ - filter = [Filter("type", "=", "attack-pattern"), Filter("x_mitre_platforms", "contains", platform)] - techniques = self.src.query(filter) + filters = [Filter("type", "=", "attack-pattern"), Filter("x_mitre_platforms", "contains", platform)] + techniques = self.src.query(filters) if remove_revoked_deprecated: techniques = self.remove_revoked_deprecated(techniques) return techniques @@ -524,12 +524,12 @@ def get_object_by_stix_id(self, stix_id: str) -> object: stix2.v20.sdo._DomainObject | CustomStixObject the STIX Domain Object specified by the STIX ID """ - object = self.src.get(stix_id) + sdo = self.src.get(stix_id) - if not object: + if not sdo: raise ValueError(f"{stix_id} not found in {self.stix_filepath}") - return StixObjectFactory(object) + return StixObjectFactory(sdo) def get_object_by_attack_id(self, attack_id: str, stix_type: str) -> object: """Retrieve a single object by its ATT&CK ID. @@ -557,17 +557,17 @@ def get_object_by_attack_id(self, attack_id: str, stix_type: str) -> object: if stix_type not in self.stix_types: raise ValueError(f"stix_type must be one of {self.stix_types}") - object = self.src.query( + sdo = self.src.query( [ Filter("external_references.external_id", "=", attack_id.upper()), Filter("type", "=", stix_type), ] ) - if not object: + if not sdo: return None - return StixObjectFactory(object[0]) + return StixObjectFactory(sdo[0]) def get_objects_by_name(self, name: str, stix_type: str) -> list: """Retrieve objects by name. @@ -592,8 +592,8 @@ def get_objects_by_name(self, name: str, stix_type: str) -> list: if stix_type not in self.stix_types: raise ValueError(f"stix_type must be one of {self.stix_types}") - filter = [Filter("type", "=", stix_type), Filter("name", "=", name)] - objects = self.src.query(filter) + filters = [Filter("type", "=", stix_type), Filter("name", "=", name)] + objects = self.src.query(filters) if not objects: return [] @@ -616,8 +616,8 @@ def get_groups_by_alias(self, alias: str) -> list: list a list of stix2.v20.sdo.IntrusionSet objects corresponding to the alias """ - filter = [Filter("type", "=", "intrusion-set"), Filter("aliases", "contains", alias)] - return self.src.query(filter) + filters = [Filter("type", "=", "intrusion-set"), Filter("aliases", "contains", alias)] + return self.src.query(filters) def get_campaigns_by_alias(self, alias: str) -> list: """Retrieve the campaigns corresponding to a given alias. @@ -634,8 +634,8 @@ def get_campaigns_by_alias(self, alias: str) -> list: list a list of stix2.v20.sdo.Campaign objects corresponding to the alias """ - filter = [Filter("type", "=", "campaign"), Filter("aliases", "contains", alias)] - return self.src.query(filter) + filters = [Filter("type", "=", "campaign"), Filter("aliases", "contains", alias)] + return self.src.query(filters) def get_software_by_alias(self, alias: str) -> list: """Retrieve the software corresponding to a given alias. @@ -661,7 +661,7 @@ def get_software_by_alias(self, alias: str) -> list: # Get Object Information ################################### - def get_attack_id(self, stix_id: str) -> str: + def get_attack_id(self, stix_id: str) -> str | None: """Get the object's ATT&CK ID. Parameters @@ -723,19 +723,19 @@ def get_related(self, source_type: str, relationship_type: str, target_type: str Parameters ---------- source_type : str - source type for the relationships, e.g. 'attack-pattern' + source type for the relationships, e.g. 'intrusion-set' relationship_type : str relationship type for the relationships, e.g. 'uses' target_type : str - target type for the relationships, e.g. 'intrusion-set' + target type for the relationships, e.g. 'attack-pattern' reverse : bool, optional build reverse mapping of target to source, by default False Returns ------- dict - if reverse=False, relationship mapping of source_object_id => [{target_object, relationship}]; - if reverse=True, relationship mapping of target_object_id => [{source_object, relationship}] + if reverse=False, relationship mapping of source_object_id => [{target_object, relationship[]}]; + if reverse=True, relationship mapping of target_object_id => [{source_object, relationship[]}] """ relationships = self.src.query( [ @@ -777,9 +777,12 @@ def get_related(self, source_type: str, relationship_type: str, target_type: str # all objects of relevant type if not reverse: - targets = self.src.query([Filter("type", "=", target_type), Filter("revoked", "=", False)]) + targets = self.src.query([Filter("type", "=", target_type)]) else: - targets = self.src.query([Filter("type", "=", source_type), Filter("revoked", "=", False)]) + targets = self.src.query([Filter("type", "=", source_type)]) + + # remove revoked/deprecated objects + targets = self.remove_revoked_deprecated(targets) # build lookup of stixID to stix object id_to_target = {} @@ -791,10 +794,10 @@ def get_related(self, source_type: str, relationship_type: str, target_type: str for stix_id in id_to_related: value = [] for related in id_to_related[stix_id]: - if not related["id"] in id_to_target: + if related["id"] not in id_to_target: continue # targeting a revoked object value.append( - {"object": StixObjectFactory(id_to_target[related["id"]]), "relationship": related["relationship"]} + {"object": StixObjectFactory(id_to_target[related["id"]]), "relationships": [related["relationship"]]} ) output[stix_id] = value return output @@ -820,6 +823,60 @@ def merge(self, map_a: dict, map_b: dict) -> dict: else: map_a[id] = map_b[id] return map_a + + def add_inherited_campaign_relationships(self, related_campaigns, inherited_campaign_relationships, object_relationships) -> dict: + """Helper function for adding inherited relationships to list of [{"object": object, "relationships": [relationship]}] + + Parameters + ---------- + related_campaigns : dict + campaigns related to the object: [object_stix_id => [ {campaign, [campaign_uses_object]} ]] + inherited_campaign_relationships : dict + relationships to be inherited from campaigns: [campaign_id => [ {related_object, [campaign_uses_related_object]} ]] + object_relationships : dict + direct relationships with the object itself: [object_stix_id => [ {related_object, [relationship]} ]] + """ + for stix_id, campaigns in related_campaigns.items(): + for campaign in campaigns: + if campaign["object"]["id"] not in inherited_campaign_relationships: + # no relationships inherited from campaign + continue + + # inheriting relationships from campaign + for stix_object in inherited_campaign_relationships[campaign["object"]["id"]]: + # append inherited campaign relationship and campaign/group relationship + relationship = {"object": stix_object["object"], "relationships": stix_object["relationships"] + campaign["relationships"]} + + if stix_id not in object_relationships: + # add inherited relationship entry from attributed campaign + object_relationships[stix_id] = [relationship] + continue + + # object exists, add inherited campaign relationships to existing list + object_relationships[stix_id].append(relationship) + + # remove duplicates + object_relationships = self.remove_duplicates(object_relationships) + return object_relationships + + def remove_duplicates(self, relationship_map) -> dict: + """Helper function to remove duplicate objects in a list of [{"object": object, "relationships": [relationship]}]""" + deduplicated_map = {} # {stix_id => [{"object": object, "relationships": []}]} + for stix_id, sdos in relationship_map.items(): + sdo_list = [] + seen_sdo_ids = [] + for sdo in sdos: + if sdo["object"]["id"] not in seen_sdo_ids: + seen_sdo_ids.append(sdo["object"]["id"]) + sdo_list.append(sdo) + continue + + # seen this object before, append relationships + for index, item in enumerate(sdo_list): + if item["object"].id == sdo["object"].id: + sdo_list[index]["relationships"] = item["relationships"] + sdo["relationships"] + deduplicated_map[stix_id] = sdo_list + return deduplicated_map ################################### # Software/Group Relationships @@ -831,50 +888,31 @@ def get_all_software_used_by_all_groups(self) -> dict: Returns ------- dict - a mapping of group_stix_id => [{'object': Software, 'relationship': Relationship}] for each software used by the group and each software used + a mapping of group_stix_id => [{"object": Malware|Tool, "relationships": Relationship[]}] for each software used by the group and each software used by campaigns attributed to the group """ # return data if it has already been fetched if self.all_software_used_by_all_groups: return self.all_software_used_by_all_groups - # get all software used by groups - tools_used_by_group = self.get_related("intrusion-set", "uses", "tool") - malware_used_by_group = self.get_related("intrusion-set", "uses", "malware") - software_used_by_group = self.merge( - tools_used_by_group, malware_used_by_group - ) # group_id -> {software, relationship} - - # get groups attributing to campaigns and all software used by campaigns - tools_used_by_campaign = self.get_related("campaign", "uses", "tool") - malware_used_by_campaign = self.get_related("campaign", "uses", "malware") - software_used_by_campaign = self.merge( - tools_used_by_campaign, malware_used_by_campaign - ) # campaign_id => {software, relationship} - - campaigns_attributed_to_group = { - "campaigns": self.get_related( - "campaign", "attributed-to", "intrusion-set", reverse=True - ), # group_id => {campaign, relationship} - "software": software_used_by_campaign, # campaign_id => {software, relationship} - } + # get software used by groups: [group_id => [ {software, [group_uses_software]} ]] + tools_used_by_groups = self.get_related("intrusion-set", "uses", "tool") + malware_used_by_groups = self.get_related("intrusion-set", "uses", "malware") + software_used_by_groups = self.merge(tools_used_by_groups, malware_used_by_groups) - for group_id in campaigns_attributed_to_group["campaigns"]: - software_used_by_campaigns = [] - # check if attributed campaign is using software - for campaign in campaigns_attributed_to_group["campaigns"][group_id]: - campaign_id = campaign["object"]["id"] - if campaign_id in campaigns_attributed_to_group["software"]: - software_used_by_campaigns.extend(campaigns_attributed_to_group["software"][campaign_id]) - - # update software used by group to include software used by a groups attributed campaign - if group_id in software_used_by_group: - software_used_by_group[group_id].extend(software_used_by_campaigns) - else: - software_used_by_group[group_id] = software_used_by_campaigns + # get software used by campaigns: [campaign_id => [ {software, [campaign_uses_software]} ]] + tools_used_by_campaigns = self.get_related("campaign", "uses", "tool") + malware_used_by_campaigns = self.get_related("campaign", "uses", "malware") + software_used_by_campaigns = self.merge(tools_used_by_campaigns, malware_used_by_campaigns) - self.all_software_used_by_all_groups = software_used_by_group - return software_used_by_group + # get groups attributing to campaigns: [group_id => [ {campaign, [campaign_attributed-to_group]} ]] + groups_attributing = self.get_related("campaign", "attributed-to", "intrusion-set", reverse=True) + + # add inherited relationships to software used by groups + software_used_by_groups = self.add_inherited_campaign_relationships(groups_attributing, software_used_by_campaigns, software_used_by_groups) + + self.all_software_used_by_all_groups = software_used_by_groups + return software_used_by_groups def get_software_used_by_group(self, group_stix_id: str) -> list: """Get all software used by a group. @@ -887,7 +925,7 @@ def get_software_used_by_group(self, group_stix_id: str) -> list: Returns ------- list - a list of {software, relationship} for each software used by the group and each software used + a list of {"object": Malware|Tool, "relationships": Relationship[]} for each software used by the group and each software used by campaigns attributed to the group """ software_used_by_groups = self.get_all_software_used_by_all_groups() @@ -899,47 +937,28 @@ def get_all_groups_using_all_software(self) -> dict: Returns ------- dict - a mapping of software_stix_id => [{'object': Group, 'relationship': Relationship}] for each group using the software and each attributed campaign + a mapping of software_stix_id => [{"object": IntrusionSet, "relationships": Relationship[]}] for each group using the software and each attributed campaign using the software """ # return data if it has already been fetched if self.all_groups_using_all_software: return self.all_groups_using_all_software - - # get all groups using software - groups_using_tool = self.get_related("intrusion-set", "uses", "tool", reverse=True) + + # get groups using software: [software_id => [ {group, [group_uses_software]} ]] + groups_using_tools = self.get_related("intrusion-set", "uses", "tool", reverse=True) groups_using_malware = self.get_related("intrusion-set", "uses", "malware", reverse=True) - groups_using_software = self.merge( - groups_using_tool, groups_using_malware - ) # software_id => {group, relationship} + groups_using_software = self.merge(groups_using_tools, groups_using_malware) - # get campaigns attributed to groups and all campaigns using software + # get campaigns using software: [software_id => [ {campaign, [campaign_uses_software]} ]] campaigns_using_tools = self.get_related("campaign", "uses", "tool", reverse=True) campaigns_using_malware = self.get_related("campaign", "uses", "malware", reverse=True) - campaigns_using_software = self.merge( - campaigns_using_tools, campaigns_using_malware - ) # software_id => {campaign, relationship} - - groups_attributing_to_campaigns = { - "campaigns": campaigns_using_software, # software_id => {campaign, relationship} - "groups": self.get_related( - "campaign", "attributed-to", "intrusion-set" - ), # campaign_id => {group, relationship} - } + campaigns_using_software = self.merge(campaigns_using_tools, campaigns_using_malware) - for software_id in groups_attributing_to_campaigns["campaigns"]: - groups_attributed_to_campaigns = [] - # check if campaign is attributed to group - for campaign in groups_attributing_to_campaigns["campaigns"][software_id]: - campaign_id = campaign["object"]["id"] - if campaign_id in groups_attributing_to_campaigns["groups"]: - groups_attributed_to_campaigns.extend(groups_attributing_to_campaigns["groups"][campaign_id]) - - # update groups using software to include software used by a groups attributed campaign - if software_id in groups_using_software: - groups_using_software[software_id].extend(groups_attributed_to_campaigns) - else: - groups_using_software[software_id] = groups_attributed_to_campaigns + # get groups attributing to campaigns: [campaign_id => [ {group, [campaign_attributed-to_group]} ]] + attributed_campaigns = self.get_related("campaign", "attributed-to", "intrusion-set") + + # add inherited relationships to groups using software + groups_using_software = self.add_inherited_campaign_relationships(campaigns_using_software, attributed_campaigns, groups_using_software) self.all_groups_using_all_software = groups_using_software return groups_using_software @@ -955,7 +974,7 @@ def get_groups_using_software(self, software_stix_id: str) -> list: Returns ------- list - a list of {group, relationship} for each group using the software and each attributed campaign + a list of {"object": IntrusionSet, "relationships": Relationship[]} for each group using the software and each attributed campaign using the software """ groups_using_software = self.get_all_groups_using_all_software() @@ -971,15 +990,15 @@ def get_all_software_used_by_all_campaigns(self) -> dict: Returns ------- dict - a mapping of campaign_stix_id => [{'object': Software, 'relationship': Relationship}] for each software used by the campaign + a mapping of campaign_stix_id => [{"object": Malware|Tool, "relationships": Relationship[]}] for each software used by the campaign """ # return data if it has already been fetched if self.all_software_used_by_all_campaigns: return self.all_software_used_by_all_campaigns - tools_used_by_campaign = self.get_related("campaign", "uses", "tool") - malware_used_by_campaign = self.get_related("campaign", "uses", "malware") - self.all_software_used_by_all_campaigns = self.merge(tools_used_by_campaign, malware_used_by_campaign) + tools_used_by_campaigns = self.get_related("campaign", "uses", "tool") + malware_used_by_campaigns = self.get_related("campaign", "uses", "malware") + self.all_software_used_by_all_campaigns = self.merge(tools_used_by_campaigns, malware_used_by_campaigns) return self.all_software_used_by_all_campaigns @@ -994,7 +1013,7 @@ def get_software_used_by_campaign(self, campaign_stix_id: str) -> list: Returns ------- list - a list of {software, relationship} for each software used by the campaign + a list of {"object": Malware|Tool, "relationships": Relationship[]} for each software used by the campaign """ software_used_by_campaigns = self.get_all_software_used_by_all_campaigns() return software_used_by_campaigns[campaign_stix_id] if campaign_stix_id in software_used_by_campaigns else [] @@ -1005,15 +1024,15 @@ def get_all_campaigns_using_all_software(self) -> dict: Returns ------- dict - a mapping of software_stix_id => [{'object': Campaign, 'relationship': Relationship}] for each campaign using the software + a mapping of software_stix_id => [{"object": Campaign, "relationships": Relationship[]}] for each campaign using the software """ # return data if it has already been fetched if self.all_campaigns_using_all_software: return self.all_campaigns_using_all_software - campaigns_using_tool = self.get_related("campaign", "uses", "tool", reverse=True) + campaigns_using_tools = self.get_related("campaign", "uses", "tool", reverse=True) campaigns_using_malware = self.get_related("campaign", "uses", "malware", reverse=True) - self.all_campaigns_using_all_software = self.merge(campaigns_using_tool, campaigns_using_malware) + self.all_campaigns_using_all_software = self.merge(campaigns_using_tools, campaigns_using_malware) return self.all_campaigns_using_all_software @@ -1028,7 +1047,7 @@ def get_campaigns_using_software(self, software_stix_id: str) -> list: Returns ------- list - a list of {campaign, relationship} for each campaign using the software + a list of {"object": Campaign, "relationships": Relationship[]} for each campaign using the software """ campaigns_using_software = self.get_all_campaigns_using_all_software() return campaigns_using_software[software_stix_id] if software_stix_id in campaigns_using_software else [] @@ -1043,7 +1062,7 @@ def get_all_groups_attributing_to_all_campaigns(self) -> dict: Returns ------- dict - a mapping of campaign_stix_id => [{'object': Group, 'relationship': Relationship}] for each group attributing to the campaign + a mapping of campaign_stix_id => [{"object": IntrusionSet, "relationships: Relationship[]}] for each group attributing to the campaign """ # return data if it has already been fetched if self.all_groups_attributing_to_all_campaigns: @@ -1064,7 +1083,7 @@ def get_groups_attributing_to_campaign(self, campaign_stix_id: str) -> list: Returns ------- list - a list of {group, relationship} for each group attributing to the campaign + a list of {"object": IntrusionSet, "relationships": Relationship[]} for each group attributing to the campaign """ groups_attributing_to_campaigns = self.get_all_groups_attributing_to_all_campaigns() return ( @@ -1079,7 +1098,7 @@ def get_all_campaigns_attributed_to_all_groups(self) -> dict: Returns ------- dict - a mapping of group_stix_id => [{'object': Campaign, 'relationship': Relationship}] for each campaign attributed to the group + a mapping of group_stix_id => [{"object": Campaign, "relationships": Relationship[]}] for each campaign attributed to the group """ # return data if it has already been fetched if self.all_campaigns_attributed_to_all_groups: @@ -1102,7 +1121,7 @@ def get_campaigns_attributed_to_group(self, group_stix_id: str) -> list: Returns ------- list - a list of {campaign, relationship} for each campaign attributed to the group + a list of {"object": Campaign, "relationships": Relationship[]} for each campaign attributed to the group """ campaigns_attributed_to_groups = self.get_all_campaigns_attributed_to_all_groups() return campaigns_attributed_to_groups[group_stix_id] if group_stix_id in campaigns_attributed_to_groups else [] @@ -1117,41 +1136,24 @@ def get_all_techniques_used_by_all_groups(self) -> dict: Returns ------- dict - a mapping of group_stix_id => [{'object': Technique, 'relationship': Relationship}] for each technique used by the group and + a mapping of group_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] for each technique used by the group and each technique used by campaigns attributed to the group """ # return data if it has already been fetched if self.all_techniques_used_by_all_groups: return self.all_techniques_used_by_all_groups - # get all techniques used by groups - techniques_used_by_groups = self.get_related( - "intrusion-set", "uses", "attack-pattern" - ) # group_id => {technique, relationship} - - # get groups attributing to campaigns and all techniques used by campaigns - campaigns_attributed_to_group = { - "campaigns": self.get_related( - "campaign", "attributed-to", "intrusion-set", reverse=True - ), # group_id => {campaign, relationship} - "techniques": self.get_related( - "campaign", "uses", "attack-pattern" - ), # campaign_id => {technique, relationship} - } + # get techniques used by groups: [group_id => [ {technique, [group_uses_technique]} ]] + techniques_used_by_groups = self.get_related("intrusion-set", "uses", "attack-pattern") - for group_id in campaigns_attributed_to_group["campaigns"]: - techniques_used_by_campaigns = [] - # check if attributed campaign is using technique - for campaign in campaigns_attributed_to_group["campaigns"][group_id]: - campaign_id = campaign["object"]["id"] - if campaign_id in campaigns_attributed_to_group["techniques"]: - techniques_used_by_campaigns.extend(campaigns_attributed_to_group["techniques"][campaign_id]) - - # update techniques used by groups to include techniques used by a groups attributed campaign - if group_id in techniques_used_by_groups: - techniques_used_by_groups[group_id].extend(techniques_used_by_campaigns) - else: - techniques_used_by_groups[group_id] = techniques_used_by_campaigns + # get techniques used by campaigns: [campaign_id => [ {technique, [campaign_uses_technique]} ]] + techniques_used_by_campaigns = self.get_related("campaign", "uses", "attack-pattern") + + # get groups attributing to campaigns: [group_id => [ {campaign, [campaign_attributed-to_group]} ]] + groups_attributing = self.get_related("campaign", "attributed-to", "intrusion-set", reverse=True) + + # add inherited relationships to techniques used by groups + techniques_used_by_groups = self.add_inherited_campaign_relationships(groups_attributing, techniques_used_by_campaigns, techniques_used_by_groups) self.all_techniques_used_by_all_groups = techniques_used_by_groups return techniques_used_by_groups @@ -1167,7 +1169,7 @@ def get_techniques_used_by_group(self, group_stix_id: str) -> list: Returns ------- list - a list of {technique, relationship} for each technique used by the group and + a list of {"object": AttackPattern, "relationships": Relationship[]} for each technique used by the group and each technique used by campaigns attributed to the group """ techniques_used_by_groups = self.get_all_techniques_used_by_all_groups() @@ -1179,41 +1181,24 @@ def get_all_groups_using_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_id => {group, relationship} for each group using the technique and each campaign attributed to - groups using the technique + a mapping of technique_stix_id => [{"object": IntrusionSet, "relationships": Relationship[]}] for each group using the + technique and each campaign attributed to groups using the technique """ # return data if it has already been fetched if self.all_groups_using_all_techniques: return self.all_groups_using_all_techniques - # get all groups using techniques - groups_using_techniques = self.get_related( - "intrusion-set", "uses", "attack-pattern", reverse=True - ) # technique_id => {group, relationship} - - # get campaigns attributed to groups and all campaigns using techniques - groups_attributing_to_campaigns = { - "campaigns": self.get_related( - "campaign", "uses", "attack-pattern", reverse=True - ), # technique_id => {campaign, relationship} - "groups": self.get_related( - "campaign", "attributed-to", "intrusion-set" - ), # campaign_id => {group, relationship} - } + # get groups using techniques: [technique_id => [ {group, [group_uses_technique]} ]] + groups_using_techniques = self.get_related("intrusion-set", "uses", "attack-pattern", reverse=True) - for technique_id in groups_attributing_to_campaigns["campaigns"]: - campaigns_attributed_to_group = [] - # check if campaign is attributed to group - for campaign in groups_attributing_to_campaigns["campaigns"][technique_id]: - campaign_id = campaign["object"]["id"] - if campaign_id in groups_attributing_to_campaigns["groups"]: - campaigns_attributed_to_group.extend(groups_attributing_to_campaigns["groups"][campaign_id]) - - # update groups using techniques to include techniques used by a groups attributed campaign - if technique_id in groups_using_techniques: - groups_using_techniques[technique_id].extend(campaigns_attributed_to_group) - else: - groups_using_techniques[technique_id] = campaigns_attributed_to_group + # get campaigns using techniques: [technique_id => [ {campaign, [campaign_uses_technique]} ]] + campaigns_using_techniques = self.get_related("campaign", "uses", "attack-pattern", reverse=True) + + # get groups attributing to campaigns: [campaign_id => [ {group, [campaign_attributed-to_group]} ]] + attributed_campaigns = self.get_related("campaign", "attributed-to", "intrusion-set") + + # add inherited relationships to groups using techniques + groups_using_techniques = self.add_inherited_campaign_relationships(campaigns_using_techniques, attributed_campaigns, groups_using_techniques) self.all_groups_using_all_techniques = groups_using_techniques return groups_using_techniques @@ -1229,7 +1214,7 @@ def get_groups_using_technique(self, technique_stix_id: str) -> list: Returns ------- list - a list of {group, relationship} for each group using the technique and each campaign attributed to + a list of {"object": IntrusionSet, "relationships": Relationship[]} for each group using the technique and each campaign attributed to groups using the technique """ groups_using_techniques = self.get_all_groups_using_all_techniques() @@ -1245,7 +1230,7 @@ def get_all_techniques_used_by_all_campaigns(self) -> dict: Returns ------- dict - a mapping of campaign_stix_id => [{'object': Technique, 'relationship': Relationship}] for each technique used by the campaign + a mapping of campaign_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] for each technique used by the campaign """ # return data if it has already been fetched if self.all_techniques_used_by_all_campaigns: @@ -1266,11 +1251,13 @@ def get_techniques_used_by_campaign(self, campaign_stix_id: str) -> list: Returns ------- list - a list of {technique, relationship} for each technique used by the campaign + a list of {"object": AttackPattern, "relationships": Relationship[]} for each technique used by the campaign """ techniques_used_by_campaigns = self.get_all_techniques_used_by_all_campaigns() return ( - techniques_used_by_campaigns[campaign_stix_id] if campaign_stix_id in techniques_used_by_campaigns else [] + techniques_used_by_campaigns[campaign_stix_id] + if campaign_stix_id in techniques_used_by_campaigns + else [] ) def get_all_campaigns_using_all_techniques(self) -> dict: @@ -1279,7 +1266,7 @@ def get_all_campaigns_using_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_stix_id => [{'object': Campaign, 'relationship': Relationship}] for each campaign using the technique + a mapping of technique_stix_id => [{"object": Campaign, "relationships": Relationship[]}] for each campaign using the technique """ # return data if it has already been fetched if self.all_campaigns_using_all_techniques: @@ -1300,7 +1287,7 @@ def get_campaigns_using_technique(self, technique_stix_id: str) -> list: Returns ------- list - a list of {campaign, relationship} for each campaign using the technique + a list of {"object": Campaign, "relationships": Relationship[]} for each campaign using the technique """ campaigns_using_techniques = self.get_all_campaigns_using_all_techniques() return campaigns_using_techniques[technique_stix_id] if technique_stix_id in campaigns_using_techniques else [] @@ -1315,15 +1302,15 @@ def get_all_techniques_used_by_all_software(self) -> dict: Returns ------- dict - a mapping of software_stix_id => [{'object': Technique, 'relationship': Relationship}] for each technique used by the software + a mapping of software_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] for each technique used by the software """ # return data if it has already been fetched if self.all_techniques_used_by_all_software: return self.all_techniques_used_by_all_software - techniques_by_tool = self.get_related("tool", "uses", "attack-pattern") + techniques_by_tools = self.get_related("tool", "uses", "attack-pattern") techniques_by_malware = self.get_related("malware", "uses", "attack-pattern") - self.all_techniques_used_by_all_software = self.merge(techniques_by_tool, techniques_by_malware) + self.all_techniques_used_by_all_software = self.merge(techniques_by_tools, techniques_by_malware) return self.all_techniques_used_by_all_software @@ -1338,7 +1325,7 @@ def get_techniques_used_by_software(self, software_stix_id: str) -> list: Returns ------- list - a list of {technique, relationship} for each technique used by the software + a list of {"object": AttackPattern, "relationships": Relationship[]} for each technique used by the software """ techniques_used_by_software = self.get_all_techniques_used_by_all_software() return techniques_used_by_software[software_stix_id] if software_stix_id in techniques_used_by_software else [] @@ -1349,7 +1336,7 @@ def get_all_software_using_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_stix_id => [{'object': Software, 'relationship': Relationship}] for each software using the technique + a mapping of technique_stix_id => [{"object": Malware|Tool, "relationships": Relationship[]}] for each software using the technique """ # return data if it has already been fetched if self.all_software_using_all_techniques: @@ -1372,7 +1359,7 @@ def get_software_using_technique(self, technique_stix_id: str) -> list: Returns ------- list - a list of {software, relationship} for each software using the technique + a list of {"object": Malware|Tool, "relationships": Relationship[]} for each software using the technique """ software_using_techniques = self.get_all_software_using_all_techniques() return software_using_techniques[technique_stix_id] if technique_stix_id in software_using_techniques else [] @@ -1387,7 +1374,7 @@ def get_all_techniques_mitigated_by_all_mitigations(self) -> dict: Returns ------- dict - a mapping of mitigation_stix_id => [{'object': Technique, 'relationship': Relationship}] for each technique mitigated by the mitigation + a mapping of mitigation_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] for each technique mitigated by the mitigation """ # return data if it has already been fetched if self.all_techniques_mitigated_by_all_mitigations: @@ -1410,7 +1397,7 @@ def get_techniques_mitigated_by_mitigation(self, mitigation_stix_id: str) -> lis Returns ------- list - a list of {technique, relationship} for each technique mitigated by the mitigation + a list of {"object": AttackPattern, "relationships": Relationship[]} for each technique mitigated by the mitigation """ techniques_mitigated_by_mitigations = self.get_all_techniques_mitigated_by_all_mitigations() return ( @@ -1425,7 +1412,7 @@ def get_all_mitigations_mitigating_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_stix_id => [{'object': Mitigation, 'relationship': Relationship}] for each mitigation mitigating the technique + a mapping of technique_stix_id => [{"object": CourseOfAction, "relationships": Relationship[]}] for each mitigation mitigating the technique """ # return data if it has already been fetched if self.all_mitigations_mitigating_all_techniques: @@ -1448,7 +1435,7 @@ def get_mitigations_mitigating_technique(self, technique_stix_id: str) -> list: Returns ------- list - a list of {mitigation, relationship} for each mitigation mitigating the technique + a list of {"object": CourseOfAction, "relationships": Relationship[]} for each mitigation mitigating the technique """ mitigations_mitigating_techniques = self.get_all_mitigations_mitigating_all_techniques() return ( @@ -1467,7 +1454,7 @@ def get_all_parent_techniques_of_all_subtechniques(self) -> dict: Returns ------- dict - a mapping of subtechnique_stix_id => [{'object': Technique, 'relationship': Relationship}] describing the parent technique of the subtechnique + a mapping of subtechnique_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] describing the parent technique of the subtechnique """ # return data if it has already been fetched if self.all_parent_techniques_of_all_subtechniques: @@ -1490,7 +1477,7 @@ def get_parent_technique_of_subtechnique(self, subtechnique_stix_id: str) -> dic Returns ------- dict - {parent technique, relationship} describing the parent technique of the sub-technique + {"object": AttackPattern, "relationships": Relationship[]} describing the parent technique of the sub-technique """ parent_techniques_of_subtechniques = self.get_all_parent_techniques_of_all_subtechniques() return ( @@ -1505,7 +1492,7 @@ def get_all_subtechniques_of_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_stix_id => [{'object': Subtechnique, 'relationship': Relationship}] for each subtechnique of the technique + a mapping of technique_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] for each subtechnique of the technique """ # return data if it has already been fetched if self.all_subtechniques_of_all_techniques: @@ -1528,7 +1515,7 @@ def get_subtechniques_of_technique(self, technique_stix_id: str) -> list: Returns ------- list - a list of {subtechnique, relationship} for each subtechnique of the technique + a list of {"object": AttackPattern, "relationships": Relationship[]} for each subtechnique of the technique """ subtechniques_of_techniques = self.get_all_subtechniques_of_all_techniques() return ( @@ -1545,7 +1532,7 @@ def get_all_techniques_detected_by_all_datacomponents(self) -> dict: Returns ------- dict - a mapping of datacomponent_stix_id => [{'object': Technique, 'relationship': Relationship}] describing the detections of the data component + a mapping of datacomponent_stix_id => [{"object": AttackPattern, "relationships": Relationship[]}] describing the detections of the data component """ # return data if it has already been fetched if self.all_techniques_detected_by_all_datacomponents: @@ -1568,7 +1555,7 @@ def get_techniques_detected_by_datacomponent(self, datacomponent_stix_id: str) - Returns ------- list - a list of {technique, relationship} describing the detections of the data component + a list of {"object": AttackPattern, "relationships": Relationship[]} describing the detections of the data component """ techniques_detected_by_datacomponents = self.get_all_techniques_detected_by_all_datacomponents() return ( @@ -1583,7 +1570,7 @@ def get_all_datacomponents_detecting_all_techniques(self) -> dict: Returns ------- dict - a mapping of technique_stix_id => [{'object': Datacomponent, 'relationship': Relationship}] describing the data components that can detect the technique + a mapping of technique_stix_id => [{"object": DataComponent, "relationships": Relationship[]}] describing the data components that can detect the technique """ # return data if it has already been fetched if self.all_datacomponents_detecting_all_techniques: @@ -1606,7 +1593,7 @@ def get_datacomponents_detecting_technique(self, technique_stix_id: str) -> list Returns ------- list - a list of {datacomponent, relationship} describing the data components that can detect the technique + a list of {"object": DataComponent, "relationships": Relationship[]} describing the data components that can detect the technique """ datacomponents_detecting_techniques = self.get_all_datacomponents_detecting_all_techniques() return (