")
+
+ # The warnings, failures, and manuals are only shown if they are
+ # greater than zero. Reserve the space for them here. They will
+ # be filled next if needed.
+ warning_summary = ""
+ failure_summary = ""
+ manual_summary = ""
+ error_summary = ""
+
+ if n_warn > 0:
+ warning_summary = (f"
")
-
- # The warnings, failures, and manuals are only shown if they are
- # greater than zero. Reserve the space for them here. They will
- # be filled next if needed.
- warning_summary = ""
- failure_summary = ""
- manual_summary = ""
-
- if n_warn > 0:
- warning_summary = (f"
")
-
table_data.append({
- "Baseline Conformance Reports": link,
- "Details": f"{pass_summary}{warning_summary}{failure_summary}{manual_summary}"
+ "Baseline Conformance Reports": link,
+ "Details": generate_summary(stats)
})
fragments.append(reporter.create_html_table(table_data))
diff --git a/scubagoggles/provider.py b/scubagoggles/provider.py
index fc161c74..60154140 100644
--- a/scubagoggles/provider.py
+++ b/scubagoggles/provider.py
@@ -6,6 +6,7 @@
from tqdm import tqdm
from scubagoggles.utils import create_subset_inverted_dict, create_key_to_list, merge_dicts
+from scubagoggles.types import ApiReference
from scubagoggles.robust_dns import RobustDNSClient
EVENTS = {
@@ -66,7 +67,8 @@
'all': [None]
}
-selectors = ["google", "selector1", "selector2"]
+
+SELECTORS = ["google", "selector1", "selector2"]
# For DKIM.
# Unfortunately, hard-coded. Ideally, we'd be able to use an API to get
# the selectors used programmatically, but it doesn't seem like there is
@@ -84,437 +86,443 @@
# beginning of the domain name up to the first period
#
-DNSClient = RobustDNSClient()
-
-def get_spf_records(domains: list) -> list:
- '''
- Gets the SPF records for each domain in domains.
-
- :param domains: The list of domain names (strings).
- '''
- results = []
- n_low_confidence = 0
- for domain in domains:
- result = DNSClient.query(domain)
- if not result['HighConfidence']:
- n_low_confidence += 1
- results.append({
- "domain": domain,
- "rdata": result["Answers"],
- "log": result["LogEntries"]
- })
- if n_low_confidence > 0:
- warnings.warn(f"get_spf_records: for {n_low_confidence} domain(s), \
-the traditional DNS queries returned an empty answer \
-section and the DoH queries failed. Will assume SPF not configured, but \
-can't guarantee that failure isn't due to something like split horizon DNS. \
-See ProviderSettingsExport.json under 'spf_records' for more details.", RuntimeWarning)
- return results
-
-def get_dkim_records(domains : list) -> list:
- '''
- Gets the DKIM records for each domain in domains.
-
- :param domains: The list of domain names (strings).
+class Provider:
'''
- results = []
- n_low_confidence = 0
- for domain in domains:
- qnames = [f"{selector}._domainkey.{domain}" for selector in selectors]
- log_entries = []
- for qname in qnames:
- result = DNSClient.query(qname)
- log_entries.extend(result['LogEntries'])
- if len(result['Answers']) == 0:
- # The DKIM record does not exist with this selector, we need to try again with
- # a different one
- continue
- # Otherwise, the DKIM record exists with this selector, no need to try the rest
- break
-
- if not result['HighConfidence']:
- n_low_confidence += 1
- results.append({
- "domain": domain,
- "rdata": result["Answers"],
- "log": log_entries
- })
-
- if n_low_confidence > 0:
- warnings.warn(f"get_dkim_records: for {n_low_confidence} domain(s), \
-the traditional DNS queries returned an empty answer \
-section and the DoH queries failed. Will assume DKIM not configured, but \
-can't guarantee that failure isn't due to something like split horizon DNS. \
-See ProviderSettingsExport.json under 'dkim_records' for more details.", RuntimeWarning)
- return results
-
-def get_dmarc_records(domains : list) -> list:
+ Class for making the GWS api calls and tracking the results.
'''
- Gets the DMARC records for each domain in domains.
- :param domains: The list of domain names (strings).
- '''
- results = []
- n_low_confidence = 0
- for domain in domains:
- log_entries = []
- qname = f"_dmarc.{domain}"
- result = DNSClient.query(qname)
- log_entries.extend(result['LogEntries'])
- if len(result["Answers"]) == 0:
- # The domain does not exist. If the record is not available at the full domain
- # level, we need to check at the organizational domain level.
- labels = domain.split(".")
- org_domain = f"{labels[-2]}.{labels[-1]}"
- result = DNSClient.query(f"_dmarc.{org_domain}")
+ def __init__(self, services : dict, customer_id : str):
+ '''
+ Initialize the Provider.
+
+ :param services: a dict of service objects.
+ :param customer_id: the ID of the customer to run against.
+ '''
+ self.services = services
+ self.customer_id = customer_id
+ self.successful_calls = set()
+ self.unsuccessful_calls = set()
+ self.dns_client = RobustDNSClient()
+ self.domains = None
+
+ def list_domains(self) -> list:
+ '''
+ Return the customer's domains. Ensures that the domains API is called only once and that
+ the domains used throughout the provider are consistent.
+ '''
+ if self.domains is None:
+ try:
+ self.domains = self.services['directory'].domains().list(customer=self.customer_id)\
+ .execute()['domains']
+ self.successful_calls.add(ApiReference.LIST_DOMAINS.value)
+ except Exception as exc:
+ self.domains = []
+ warnings.warn(f"An exception was thrown by list_domains: {exc}", RuntimeWarning)
+ self.unsuccessful_calls.add(ApiReference.LIST_DOMAINS.value)
+ return self.domains
+
+ def get_spf_records(self, domains: list) -> list:
+ '''
+ Gets the SPF records for each domain in domains.
+
+ :param domains: The list of domain names (strings).
+ '''
+ results = []
+ n_low_confidence = 0
+ for domain in domains:
+ result = self.dns_client.query(domain)
+ if not result['HighConfidence']:
+ n_low_confidence += 1
+ results.append({
+ "domain": domain,
+ "rdata": result["Answers"],
+ "log": result["LogEntries"]
+ })
+ if n_low_confidence > 0:
+ warnings.warn(f"get_spf_records: for {n_low_confidence} domain(s), \
+ the traditional DNS queries returned an empty answer \
+ section and the DoH queries failed. Will assume SPF not configured, but \
+ can't guarantee that failure isn't due to something like split horizon DNS. \
+ See ProviderSettingsExport.json under 'spf_records' for more details.", RuntimeWarning)
+ return results
+
+ def get_dkim_records(self, domains : list) -> list:
+ '''
+ Gets the DKIM records for each domain in domains.
+
+ :param domains: The list of domain names (strings).
+ '''
+ results = []
+ n_low_confidence = 0
+ for domain in domains:
+ qnames = [f"{selector}._domainkey.{domain}" for selector in SELECTORS]
+ log_entries = []
+ for qname in qnames:
+ result = self.dns_client.query(qname)
+ log_entries.extend(result['LogEntries'])
+ if len(result['Answers']) == 0:
+ # The DKIM record does not exist with this selector, we need to try again with
+ # a different one
+ continue
+ # Otherwise, the DKIM record exists with this selector, no need to try the rest
+ break
+
+ if not result['HighConfidence']:
+ n_low_confidence += 1
+ results.append({
+ "domain": domain,
+ "rdata": result["Answers"],
+ "log": log_entries
+ })
+
+ if n_low_confidence > 0:
+ warnings.warn(f"get_dkim_records: for {n_low_confidence} domain(s), \
+ the traditional DNS queries returned an empty answer \
+ section and the DoH queries failed. Will assume DKIM not configured, but \
+ can't guarantee that failure isn't due to something like split horizon DNS. \
+ See ProviderSettingsExport.json under 'dkim_records' for more details.", RuntimeWarning)
+ return results
+
+ def get_dmarc_records(self, domains : list) -> list:
+ '''
+ Gets the DMARC records for each domain in domains.
+
+ :param domains: The list of domain names (strings).
+ '''
+ results = []
+ n_low_confidence = 0
+ for domain in domains:
+ log_entries = []
+ qname = f"_dmarc.{domain}"
+ result = self.dns_client.query(qname)
log_entries.extend(result['LogEntries'])
- if not result['HighConfidence']:
- n_low_confidence += 1
- results.append({
- "domain": domain,
- "rdata": result["Answers"],
- "log": log_entries
- })
- if n_low_confidence > 0:
- warnings.warn(f"get_dmarc_records: for {n_low_confidence} domain(s), \
-the traditional DNS queries returned an empty answer \
-section and the DoH queries failed. Will assume DMARC not configured, but \
-can't guarantee that failure isn't due to something like split horizon DNS. \
-See ProviderSettingsExport.json under 'dmarc_records' for more details.", RuntimeWarning)
- return results
-
-def get_dnsinfo(service, customer_id):
- '''
- Gets DNS Information for Gmail baseline
-
- :param service: a directory_v1 service instance
- :param customer_id: the ID of the customer to run against
- '''
- output = {"domains": [], "spf_records": [], "dkim_records": [], "dmarc_records": []}
-
- # Determine the tenant's domains via the API
- response = service.domains().list(customer=customer_id).execute()
- domains = {d['domainName'] for d in response['domains']}
-
- if len(domains) == 0:
- warnings.warn("No domains found.", RuntimeWarning)
+ if len(result["Answers"]) == 0:
+ # The domain does not exist. If the record is not available at the full domain
+ # level, we need to check at the organizational domain level.
+ labels = domain.split(".")
+ org_domain = f"{labels[-2]}.{labels[-1]}"
+ result = self.dns_client.query(f"_dmarc.{org_domain}")
+ log_entries.extend(result['LogEntries'])
+ if not result['HighConfidence']:
+ n_low_confidence += 1
+ results.append({
+ "domain": domain,
+ "rdata": result["Answers"],
+ "log": log_entries
+ })
+ if n_low_confidence > 0:
+ warnings.warn(f"get_dmarc_records: for {n_low_confidence} domain(s), \
+ the traditional DNS queries returned an empty answer \
+ section and the DoH queries failed. Will assume DMARC not configured, but \
+ can't guarantee that failure isn't due to something like split horizon DNS. \
+ See ProviderSettingsExport.json under 'dmarc_records' for more details.", RuntimeWarning)
+ return results
+
+ def get_dnsinfo(self):
+ '''
+ Gets DNS Information for Gmail baseline
+ '''
+ output = {"domains": [], "spf_records": [], "dkim_records": [], "dmarc_records": []}
+ domains = {d['domainName'] for d in self.list_domains()}
+ if len(domains) == 0:
+ warnings.warn("No domains found.", RuntimeWarning)
+ return output
+
+ output["domains"].extend(domains)
+
+ try:
+ output["spf_records"] = self.get_spf_records(domains)
+ self.successful_calls.add("get_spf_records")
+ except Exception as exc:
+ output["spf_records"] = []
+ warnings.warn(f"An exception was thrown by get_spf_records: {exc}", RuntimeWarning)
+ self.unsuccessful_calls.add("get_spf_records")
+ try:
+ output["dkim_records"] = self.get_dkim_records(domains)
+ self.successful_calls.add("get_dkim_records")
+ except Exception as exc:
+ output["dkim_records"] = []
+ warnings.warn(f"An exception was thrown by get_dkim_records: {exc}", RuntimeWarning)
+ self.unsuccessful_calls.add("get_dkim_records")
+ try:
+ output["dmarc_records"] = self.get_dmarc_records(domains)
+ self.successful_calls.add("get_dmarc_records")
+ except Exception as exc:
+ output["dmarc_records"] = []
+ warnings.warn(f"An exception was thrown by get_dmarc_records: {exc}", RuntimeWarning)
+ self.unsuccessful_calls.add("get_dmarc_records")
return output
- output["domains"].extend(domains)
-
- try:
- output["spf_records"] = get_spf_records(domains)
- except Exception as exc:
- output["spf_records"] = []
- warnings.warn(f"An exception was thrown by get_spf_records: {exc}", RuntimeWarning)
- try:
- output["dkim_records"] = get_dkim_records(domains)
- except Exception as exc:
- output["dkim_records"] = []
- warnings.warn(f"An exception was thrown by get_dkim_records: {exc}", RuntimeWarning)
- try:
- output["dmarc_records"] = get_dmarc_records(domains)
- except Exception as exc:
- output["dmarc_records"] = []
- warnings.warn(f"An exception was thrown by get_dmarc_records: {exc}", RuntimeWarning)
- return output
-
-def get_super_admins(service, customer_id) -> dict:
- '''
- Gets the org unit/primary email of all super admins, using the directory API
-
- :param service: a directory_v1 service instance
- :param customer_id: the ID of the customer to run against
- '''
- try:
- response = service.users().list(customer=customer_id, query="isAdmin=True").execute()
- admins = []
- for user in response['users']:
- org_unit = user['orgUnitPath']
- # strip out the leading '/'
- org_unit = org_unit[1:] if org_unit.startswith('/') else org_unit
- email = user['primaryEmail']
- admins.append({'primaryEmail': email, 'orgUnitPath': org_unit})
- return {'super_admins': admins}
- except Exception as exc:
- warnings.warn(
- f"Exception thrown while getting super admins; outputs will be incorrect: {exc}",
- RuntimeWarning
- )
- return {'super_admins': []}
-
-def get_ous(service, customer_id) -> dict:
- '''
- Gets the organizational units using the directory API
-
- :param service: a directory_v1 service instance
- :param customer_id: the ID of the customer to run against
- '''
-
- try:
- response = service.orgunits().list(customerId=customer_id).execute()
- if 'organizationUnits' not in response:
+ def get_super_admins(self) -> dict:
+ '''
+ Gets the org unit/primary email of all super admins, using the directory API
+ '''
+ try:
+ response = self.services['directory'].users()\
+ .list(customer=self.customer_id, query="isAdmin=True").execute()
+ admins = []
+ for user in response['users']:
+ org_unit = user['orgUnitPath']
+ # strip out the leading '/'
+ org_unit = org_unit[1:] if org_unit.startswith('/') else org_unit
+ email = user['primaryEmail']
+ admins.append({'primaryEmail': email, 'orgUnitPath': org_unit})
+ self.successful_calls.add(ApiReference.LIST_USERS.value)
+ return {'super_admins': admins}
+ except Exception as exc:
+ warnings.warn(
+ f"Exception thrown while getting super admins; outputs will be incorrect: {exc}",
+ RuntimeWarning
+ )
+ self.unsuccessful_calls.add(ApiReference.LIST_USERS.value)
+ return {'super_admins': []}
+
+ def get_ous(self) -> dict:
+ '''
+ Gets the organizational units using the directory API
+ '''
+
+ try:
+ response = self.services['directory'].orgunits().list(customerId=self.customer_id)\
+ .execute()
+ self.successful_calls.add(ApiReference.LIST_OUS.value)
+ if 'organizationUnits' not in response:
+ return {}
+ return response
+ except Exception as exc:
+ warnings.warn(
+ f"Exception thrown while getting top level OU: {exc}",
+ RuntimeWarning
+ )
+ self.unsuccessful_calls.add(ApiReference.LIST_OUS.value)
return {}
- return response
- except Exception as exc:
- warnings.warn(
- f"Exception thrown while getting top level OU: {exc}",
- RuntimeWarning
- )
- return {}
-
-def get_toplevel_ou(service, customer_id) -> str:
- '''
- Gets the tenant name using the directory API
- :param service: a directory_v1 service instance
- :param customer_id: the ID of the customer to run against
- '''
+ def get_toplevel_ou(self) -> str:
+ '''
+ Gets the tenant name using the directory API
+ '''
+
+ try:
+ response = self.services['directory'].orgunits()\
+ .list(customerId=self.customer_id, orgUnitPath='/', type='children').execute()
+ # Because we set orgUnitPath to / and type to children, the API call will only
+ # return the second-level OUs, meaning the parentOrgUnitId of any of the OUs returned
+ # will point us to OU of the entire organization
+ if 'organizationUnits' not in response:
+ # No custom OUs have been created. In this case, we can't
+ # determine the name of the top-level OU. See:
+ # https://stackoverflow.com/questions/26936357/google-directory-api-org-name-of-root-org-unit-path
+ # https://stackoverflow.com/questions/60464432/cannot-get-root-orgunit-in-google-directory-api?noredirect=1&lq=1
+ # Fortunately, when there are no custom OUs present, we won't
+ # need to check if a setting change was made at the top-level
+ # OU in the Rego; because no custom OUs have been created, any
+ # changes have to apply to the top-level OU.
+ return ""
+ parent_ou = response['organizationUnits'][0]['parentOrgUnitId']
+ response = self.services['directory'].orgunits()\
+ .get(customerId=self.customer_id, orgUnitPath=parent_ou).execute()
+ ou_name = response['name']
+ self.successful_calls.add(ApiReference.LIST_OUS.value)
+ return ou_name
+ except Exception as exc:
+ warnings.warn(
+ f"Exception thrown while getting top level OU: {exc}",
+ RuntimeWarning
+ )
+ self.unsuccessful_calls.add(ApiReference.LIST_OUS.value)
+ return "Error Retrieving"
- try:
- response = service.orgunits().list(customerId=customer_id,
- orgUnitPath='/',
- type='children').execute()
- # Because we set orgUnitPath to / and type to children, the API call will only
- # return the second-level OUs, meaning the parentOrgUnitId of any of the OUs returned
- # will point us to OU of the entire organization
- if 'organizationUnits' not in response:
- # No custom OUs have been created. In this case, we can't
- # determine the name of the top-level OU. See:
- # https://stackoverflow.com/questions/26936357/google-directory-api-org-name-of-root-org-unit-path
- # https://stackoverflow.com/questions/60464432/cannot-get-root-orgunit-in-google-directory-api?noredirect=1&lq=1
- # Fortunately, when there are no custom OUs present, we won't
- # need to check if a setting change was made at the top-level
- # OU in the Rego; because no custom OUs have been created, any
- # changes have to apply to the top-level OU.
- return ""
- parent_ou = response['organizationUnits'][0]['parentOrgUnitId']
- response = service.orgunits().get(customerId=customer_id, orgUnitPath=parent_ou).execute()
- ou_name = response['name']
- return ou_name
- except Exception as exc:
- warnings.warn(
- f"Exception thrown while getting top level OU: {exc}",
- RuntimeWarning
- )
- return ""
-
-
-def get_tenant_info(service, customer_id) -> dict:
- '''
- Gets the high-level tenant info using the directory API
- :param service: a directory_v1 service instance
- :param customer_id: the ID of the customer to run against
- '''
- try:
- response = service.domains().list(customer=customer_id).execute()
- primary_domain = ""
- for domain in response['domains']:
+ def get_tenant_info(self) -> dict:
+ '''
+ Gets the high-level tenant info using the directory API
+ '''
+ primary_domain = "Error Retrieving"
+ for domain in self.list_domains():
if domain['isPrimary']:
primary_domain = domain['domainName']
return {
'domain': primary_domain,
- 'topLevelOU': get_toplevel_ou(service, customer_id)
- }
- except Exception as exc:
- warnings.warn(
- f"An exception was thrown trying to get the tenant info: {exc}",
- RuntimeWarning
- )
- return {
- 'domain': 'Error Retrieving',
- 'topLevelOU': 'Error Retrieving'
+ 'topLevelOU': self.get_toplevel_ou()
}
+ def get_gws_logs(self, products: list, event: str) -> dict:
+ '''
+ Gets the GWS admin audit logs with the specified event name.
+ This function will also some parsing and filtering to ensure that an appropriate
+ log event is matched to the appropriate product.
+ This is to prevent the same log event from being duplicated
+ across products in the resulting provider JSON.
+
+ :param products: a narrowed list of the products being invoked
+ authenticating in auth.py
+ :param event: the name of the specific event we are querying for.
+ '''
+
+ # Filter responses by org_unit id
+ response = (self.services['reports'].activities().list(userKey='all',
+ applicationName='admin',
+ eventName=event).execute()).get('items', [])
+
+
+ # Used for filtering duplicate events
+ prod_to_app_name_values = {
+ 'calendar': ['Calendar'],
+ 'chat': ['Google Chat', 'Google Workspace Marketplace'],
+ 'commoncontrols': [
+ 'Security',
+ 'Google Workspace Marketplace',
+ 'Blogger',
+ 'Google Cloud Platform Sharing Options',
+ ],
+ 'drive': ['Drive and Docs'],
+ 'gmail': ['Gmail'],
+ 'groups': ['Groups for Business'],
+ 'meet': ['Google Meet'],
+ 'sites': ['Sites'],
+ 'classroom': ['Classroom']
+ }
+ # create a subset of just the products we need from the dict above
+ subset_prod_to_app_name = {
+ prod: prod_to_app_name_values[prod]
+ for prod in products if prod in prod_to_app_name_values
+ }
-def get_gws_logs(products: list, service, event: str) -> dict:
- '''
- Gets the GWS admin audit logs with the specified event name.
- This function will also some parsing and filtering to ensure that an appropriate
- log event is matched to the appropriate product.
- This is to prevent the same log event from being duplicated
- across products in the resulting provider JSON.
-
- :param products: a narrowed list of the products being invoked
- :param service: service is a Google reports API object, created from successfully
- authenticating in auth.py
- :param event: the name of the specific event we are querying for.
- '''
-
- # Filter responses by org_unit id
- response = (service.activities().list(userKey='all',
- applicationName='admin',
- eventName=event).execute()).get('items', [])
-
-
- # Used for filtering duplicate events
- prod_to_app_name_values = {
- 'calendar': ['Calendar'],
- 'chat': ['Google Chat', 'Google Workspace Marketplace'],
- 'commoncontrols': [
- 'Security',
- 'Google Workspace Marketplace',
- 'Blogger',
- 'Google Cloud Platform Sharing Options',
- ],
- 'drive': ['Drive and Docs'],
- 'gmail': ['Gmail'],
- 'groups': ['Groups for Business'],
- 'meet': ['Google Meet'],
- 'sites': ['Sites'],
- 'classroom': ['Classroom']
- }
- # create a subset of just the products we need from the dict above
- subset_prod_to_app_name = {
- prod: prod_to_app_name_values[prod]
- for prod in products if prod in prod_to_app_name_values
- }
-
- products_to_logs = create_key_to_list(products)
- # Certain events are not being currently being filtered because
- # filtering for those events here would be duplicative of the Rego code
- try:
- # the value we want is nested several layers deep
- # checks under the APPLICATION_NAME key for the correct app_name value
- dup_events = (
- 'CHANGE_APPLICATION_SETTING',
- 'CREATE_APPLICATION_SETTING',
- 'DELETE_APPLICATION_SETTING'
+ products_to_logs = create_key_to_list(products)
+ # Certain events are not being currently being filtered because
+ # filtering for those events here would be duplicative of the Rego code
+ try:
+ # the value we want is nested several layers deep
+ # checks under the APPLICATION_NAME key for the correct app_name value
+ dup_events = (
+ 'CHANGE_APPLICATION_SETTING',
+ 'CREATE_APPLICATION_SETTING',
+ 'DELETE_APPLICATION_SETTING'
+ )
+ if event in dup_events:
+ app_name = 'APPLICATION_NAME'
+ for report in response:
+ for events in report['events']:
+ parameters = events.get('parameters', [])
+ for parameter in parameters:
+ if parameter.get('name') == app_name:
+ param_val = parameter.get('value')
+ for prod, app_values in subset_prod_to_app_name.items():
+ if param_val in app_values:
+ products_to_logs[prod].append(report)
+ else: # no filtering append entire response to relevant product
+ for prod in products:
+ products_to_logs[prod].extend(response)
+ except Exception as exc:
+ warnings.warn(
+ f"An exception was thrown while getting the logs; outputs will be incorrect: {exc}",
+ RuntimeWarning
+ )
+ return products_to_logs
+
+ def get_group_settings(self) -> dict:
+ '''
+ Gets all of the group info using the directory API and group settings API
+ '''
+
+ group_service = self.services['groups']
+ directory_service = self.services['directory']
+ domains = {d['domainName'] for d in self.list_domains() if d['verified']}
+
+ try:
+ # get the group settings for each groups
+ group_settings = []
+ for domain in domains:
+ response = directory_service.groups().list(domain=domain).execute()
+ for group in response.get('groups'):
+ email = group.get('email')
+ group_settings.append(group_service.groups().get(groupUniqueId=email).execute())
+ self.successful_calls.add(ApiReference.LIST_GROUPS.value)
+ self.successful_calls.add(ApiReference.GET_GROUP.value)
+ return {'group_settings': group_settings}
+ except Exception as exc:
+ warnings.warn(
+ f"Exception thrown while getting group settings; outputs will be incorrect: {exc}",
+ RuntimeWarning
+ )
+ self.unsuccessful_calls.add(ApiReference.LIST_GROUPS.value)
+ self.unsuccessful_calls.add(ApiReference.GET_GROUP.value)
+ return {'group_settings': []}
+
+ def call_gws_providers(self, products: list, quiet) -> dict:
+ '''
+ Calls the relevant GWS APIs to get the data we need for the baselines.
+ Data such as the admin audit log, super admin users etc.
+
+ :param products: list of product names to check
+ :param quiet: suppress tqdm output
+ '''
+ # create a inverse dictionary containing a mapping of event => list of products
+ events_to_products = create_subset_inverted_dict(EVENTS, products)
+ events_to_products_bar = tqdm(events_to_products.items(), leave=False, disable=quiet)
+
+ # main aggregator dict
+ product_to_logs = create_key_to_list(products)
+ product_to_items = {}
+ ou_ids = set()
+ ou_ids.add("") # certain settings have no OU
+ try:
+ # Add top level organization unit name
+ ou_ids.add(self.get_toplevel_ou())
+ # get all organizational unit data
+ product_to_items['organizational_units'] = self.get_ous()
+ for orgunit in product_to_items['organizational_units']['organizationUnits']:
+ ou_ids.add(orgunit['name'])
+ # add just organizational unit names to a field]
+ product_to_items['organizational_unit_names'] = list(ou_ids)
+ except Exception as exc:
+ warnings.warn(
+ f"Exception thrown while getting tenant data: {exc}",
+ RuntimeWarning
)
- if event in dup_events:
- app_name = 'APPLICATION_NAME'
- for report in response:
- for events in report['events']:
- parameters = events.get('parameters', [])
- for parameter in parameters:
- if parameter.get('name') == app_name:
- param_val = parameter.get('value')
- for prod, app_values in subset_prod_to_app_name.items():
- if param_val in app_values:
- products_to_logs[prod].append(report)
- else: # no filtering append entire response to relevant product
- for prod in products:
- products_to_logs[prod].extend(response)
- except Exception as exc:
- warnings.warn(
- f"An exception was thrown while getting the logs; outputs will be incorrect: {exc}",
- RuntimeWarning
- )
- return products_to_logs
-
-def get_group_settings(services, customer_id) -> dict:
- '''
- Gets all of the group info using the directory API and group settings API
-
- :param services: a service instance
- :param customer_id: the ID of the customer to run against
- '''
-
- try:
- # set up the services
- group_service = services['groups']
- domain_service = services['directory']
- # gather all of the domains within a suite to get groups
- response = domain_service.domains().list(customer=customer_id).execute()
- domains = {d['domainName'] for d in response['domains'] if d['verified']}
- # get the group settings for each groups
- group_settings = []
- for domain in domains:
- response = domain_service.groups().list(domain=domain).execute()
- for group in response.get('groups'):
- email = group.get('email')
- group_settings.append(group_service.groups().get(groupUniqueId=email).execute())
- return {'group_settings': group_settings}
- except Exception as exc:
- warnings.warn(
- f"Exception thrown while getting group settings; outputs will be incorrect: {exc}",
- RuntimeWarning
- )
- return {'group_settings': []}
-
-def call_gws_providers(products: list, services, quiet, customer_id) -> dict:
- '''
- Calls the relevant GWS APIs to get the data we need for the baselines.
- Data such as the admin audit log, super admin users etc.
-
- :param products: list of product names to check
- :param services: a dict of service objects.
- :param quiet: suppress tqdm output
- :param customer_id: the ID of the customer to run against
- service is a Google reports API object, created from successfully authenticating in auth.py
- '''
- # create a inverse dictionary containing a mapping of event => list of products
- events_to_products = create_subset_inverted_dict(EVENTS, products)
- events_to_products_bar = tqdm(events_to_products.items(), leave=False, disable=quiet)
-
- # main aggregator dict
- product_to_logs = create_key_to_list(products)
- product_to_items = {}
- ou_ids = set()
- ou_ids.add("") # certain settings have no OU
- try:
- # Add top level organization unit name
- ou_ids.add(get_toplevel_ou(services['directory'], customer_id))
- # get all organizational unit data
- product_to_items['organizational_units'] = get_ous(services['directory'], customer_id)
- for orgunit in product_to_items['organizational_units']['organizationUnits']:
- ou_ids.add(orgunit['name'])
- # add just organizational unit names to a field]
- product_to_items['organizational_unit_names'] = list(ou_ids)
- except Exception as exc:
- warnings.warn(
- f"Exception thrown while getting tenant data: {exc}",
- RuntimeWarning
- )
-
- # call the api once per event type
- try:
- for event, product_list in events_to_products_bar:
- products = ', '.join(product_list)
- bar_descr = f"Running Provider: Exporting {event} events for {products}..."
- events_to_products_bar.set_description(bar_descr)
-
- # gets the GWS admin audit logs and merges them into product_to_logs
- # aggregator dict
- product_to_logs = merge_dicts(
- product_to_logs,
- get_gws_logs(
- products=product_list,
- service=services['reports'],
- event=event
+ # call the api once per event type
+ try:
+ for event, product_list in events_to_products_bar:
+ products = ', '.join(product_list)
+ bar_descr = f"Running Provider: Exporting {event} events for {products}..."
+ events_to_products_bar.set_description(bar_descr)
+
+ # gets the GWS admin audit logs and merges them into product_to_logs
+ # aggregator dict
+ product_to_logs = merge_dicts(
+ product_to_logs,
+ self.get_gws_logs(products=product_list, event=event)
)
+ self.successful_calls.add(ApiReference.LIST_ACTIVITIES.value)
+ except Exception as exc:
+ warnings.warn("Provider Exception thrown while getting the logs; "\
+ f"outputs will be incorrect: {exc}", RuntimeWarning)
+ self.unsuccessful_calls.add(ApiReference.LIST_ACTIVITIES.value)
+
+ # repacks the main aggregator into the original form
+ # that the api returns the data in; under an 'items' key.
+ # Then we put this key under a {product}_log key for the Rego code
+ try:
+ for product, logs in product_to_logs.items():
+ key_name = f"{product}_logs"
+ product_to_items[key_name] = {'items': logs}
+
+ # get tenant metadata for report front page header
+ product_to_items['tenant_info'] = self.get_tenant_info()
+
+ if 'gmail' in product_to_logs: # add dns info if gmail is being run
+ product_to_items.update(self.get_dnsinfo())
+
+ if 'commoncontrols' in product_to_logs: # add list of super admins if CC is being run
+ product_to_items.update(self.get_super_admins())
+
+ if 'groups' in product_to_logs:
+ product_to_items.update(self.get_group_settings())
+
+ except Exception as exc:
+ warnings.warn(
+ f"Uncaught Exception thrown while getting other data: {exc}",
+ RuntimeWarning
)
- except Exception as exc:
- warnings.warn(
- f"Provider Exception thrown while getting the logs; outputs will be incorrect: {exc}",
- RuntimeWarning
- )
-
- # repacks the main aggregator into the original form
- # that the api returns the data in; under an 'items' key.
- # Then we put this key under a {product}_log key for the Rego code
- try:
- for product, logs in product_to_logs.items():
- key_name = f"{product}_logs"
- product_to_items[key_name] = {'items': logs}
-
- # get tenant metadata for report front page header
- product_to_items['tenant_info'] = get_tenant_info(services['directory'], customer_id)
-
- if 'gmail' in product_to_logs: # add dns info if gmail is being run
- product_to_items.update(get_dnsinfo(services['directory'], customer_id))
-
- if 'commoncontrols' in product_to_logs: # add list of super admins if CC is being run
- product_to_items.update(get_super_admins(services['directory'], customer_id))
-
- if 'groups' in product_to_logs:
- product_to_items.update(get_group_settings(services, customer_id))
-
- except Exception as exc:
- warnings.warn(
- f"Uncaught Exception thrown while getting other data: {exc}",
- RuntimeWarning
- )
- return product_to_items
+ return product_to_items
diff --git a/scubagoggles/reporter/reporter.py b/scubagoggles/reporter/reporter.py
index 2b329339..bf35a87d 100644
--- a/scubagoggles/reporter/reporter.py
+++ b/scubagoggles/reporter/reporter.py
@@ -5,9 +5,13 @@
"""
import os
import time
+import warnings
from datetime import datetime
import pandas as pd
from scubagoggles.utils import rel_abs_path
+from scubagoggles.types import API_LINKS
+
+SCUBA_GITHUB_URL = "https://github.com/cisagov/scubagoggles"
def get_test_result(requirement_met : bool, criticality : str, no_such_events : bool) -> str:
'''
@@ -140,8 +144,55 @@ def build_report_html(fragments : list, product : str,
html = html.replace('{{TABLES}}', collected)
return html
+def get_failed_prereqs(test : dict, successful_calls : set, unsuccessful_calls : set) -> set:
+ '''
+ Given the output of a specific Rego test and the set of successful and unsuccessful
+ calls, determine the set of prerequisites that were not met.
+ :param test: a dictionary representing the output of a Rego test
+ :param successful_calls: a set with the successful provider calls
+ :param unsuccessful_calls: a set with the unsuccessful provider calls
+ '''
+ if 'Prerequisites' not in test:
+ # If Prerequisites is not defined, assume the test just depends on the
+ # reports API.
+ prereqs = set(["reports/v1/activities/list"])
+ else:
+ prereqs = set(test['Prerequisites'])
+
+ # A call is failed if it is either missing from the successful_calls set
+ # or present in the unsuccessful_calls
+ failed_prereqs = set().union(
+ prereqs.difference(successful_calls),
+ prereqs.intersection(unsuccessful_calls)
+ )
+
+ return failed_prereqs
+
+def get_failed_details(failed_prereqs : set) -> str:
+ '''
+ Create the string used for the Details column of the report when one
+ or more of the API calls/functions failed.
+
+ :param failed_prereqs: A set of strings with the API calls/function prerequisites
+ that were not met for a given test.
+ '''
+
+ failed_apis = [API_LINKS[api] for api in failed_prereqs if api in API_LINKS]
+ failed_functions = [call for call in failed_prereqs if call not in API_LINKS]
+ failed_details = ""
+ if len(failed_apis) > 0:
+ links = ', '.join(failed_apis)
+ failed_details += f"This test depends on the following API call(s) " \
+ f"which did not execute successfully: {links}. "
+ if len(failed_functions) > 0:
+ failed_details += f"This test depends on the following function(s) " \
+ f"which did not execute successfully: {', '.join(failed_functions)}. "
+ failed_details += "See terminal output for more details."
+ return failed_details
+
def rego_json_to_html(test_results_data : str, product : list, out_path : str,
-tenant_domain : str, main_report_name : str, prod_to_fullname: dict, product_policies) -> None:
+tenant_domain : str, main_report_name : str, prod_to_fullname: dict, product_policies,
+successful_calls : set, unsuccessful_calls : set) -> None:
'''
Transforms the Rego JSON output into HTML
@@ -152,6 +203,8 @@ def rego_json_to_html(test_results_data : str, product : list, out_path : str,
:param main_report_name: report_name: Name of the main report HTML file.
:param prod_to_fullname: dict containing mapping of the product full names
:param product_policies: dict containing policies read from the baseline markdown
+ :param successful_calls: set with the set of successful calls
+ :param unsuccessful_calls: set with the set of unsuccessful calls
'''
product_capitalized = product.capitalize()
@@ -164,56 +217,86 @@ def rego_json_to_html(test_results_data : str, product : list, out_path : str,
"Warning": 0,
"Fail": 0,
"N/A": 0,
- "No events found": 0
+ "No events found": 0,
+ "Error": 0
}
for baseline_group in product_policies:
table_data = []
for control in baseline_group['Controls']:
tests = [test for test in test_results_data if test['PolicyId'] == control['Id']]
- for test in tests:
- result = get_test_result(test['RequirementMet'], test['Criticality'],
- test['NoSuchEvent'])
- report_stats[result] = report_stats[result] + 1
- details = test['ReportDetails']
-
- if result == "No events found":
- warning_icon = ""
- details = warning_icon + " " + test['ReportDetails']
-
- # As rules doesn't have it's own baseline, Rules and Common Controls
- # need to be handled specially
- if product_capitalized == "Rules":
- if 'Not-Implemented' in test['Criticality']:
- # The easiest way to identify the GWS.COMMONCONTROLS.14.1v1
- # results that belong to the Common Controls report is they're
- # marked as Not-Implemented. This if excludes them from the
- # rules report.
- continue
- table_data.append({
- 'Control ID': control['Id'],
- 'Rule Name': test['Requirement'],
- 'Result': result,
- 'Criticality': test['Criticality'],
- 'Rule Description': test['ReportDetails']})
- elif product_capitalized == "Commoncontrols" \
- and baseline_group['GroupName'] == 'System-defined Rules' \
- and 'Not-Implemented' not in test['Criticality']:
- # The easiest way to identify the System-defined Rules
- # results that belong to the Common Controls report is they're
- # marked as Not-Implemented. This if excludes the full results
- # from the Common Controls report.
- continue
- else:
- table_data.append({
+ if len(tests) == 0:
+ # Handle the case where Rego doesn't output anything for a given control
+ report_stats['Error'] += 1
+ issues_link = f'GitHub'
+ table_data.append({
'Control ID': control['Id'],
'Requirement': control['Value'],
- 'Result': result,
- 'Criticality': test['Criticality'],
- 'Details': details})
+ 'Result': "Error - Test results missing",
+ 'Criticality': "-",
+ 'Details': f'Report issue on {issues_link}'
+ })
+ warnings.warn(f"No test results found for Control Id {control['Id']}",
+ RuntimeWarning)
+ else:
+ for test in tests:
+ failed_prereqs = get_failed_prereqs(test, successful_calls, unsuccessful_calls)
+ if len(failed_prereqs) > 0:
+ result = "Error"
+ report_stats["Error"] += 1
+ failed_details = get_failed_details(failed_prereqs)
+ table_data.append({
+ 'Control ID': control['Id'],
+ 'Requirement': control['Value'],
+ 'Result': "Error",
+ 'Criticality': test['Criticality'],
+ 'Details': failed_details
+ })
+ else:
+ result = get_test_result(test['RequirementMet'], test['Criticality'],
+ test['NoSuchEvent'])
+
+ report_stats[result] = report_stats[result] + 1
+ details = test['ReportDetails']
+
+ if result == "No events found":
+ warning_icon = ""
+ details = warning_icon + " " + test['ReportDetails']
+
+ # As rules doesn't have its own baseline, Rules and Common Controls
+ # need to be handled specially
+ if product_capitalized == "Rules":
+ if 'Not-Implemented' in test['Criticality']:
+ # The easiest way to identify the GWS.COMMONCONTROLS.13.1v1
+ # results that belong to the Common Controls report is they're
+ # marked as Not-Implemented. This if excludes them from the
+ # rules report.
+ continue
+ table_data.append({
+ 'Control ID': control['Id'],
+ 'Rule Name': test['Requirement'],
+ 'Result': result,
+ 'Criticality': test['Criticality'],
+ 'Rule Description': test['ReportDetails']})
+ elif product_capitalized == "Commoncontrols" \
+ and baseline_group['GroupName'] == 'System-defined Rules' \
+ and 'Not-Implemented' not in test['Criticality']:
+ # The easiest way to identify the System-defined Rules
+ # results that belong to the Common Controls report is they're
+ # marked as Not-Implemented. This if excludes the full results
+ # from the Common Controls report.
+ continue
+ else:
+ table_data.append({
+ 'Control ID': control['Id'],
+ 'Requirement': control['Value'],
+ 'Result': result,
+ 'Criticality': test['Criticality'],
+ 'Details': details
+ })
fragments.append(f"