diff --git a/scubagoggles/main.py b/scubagoggles/main.py
index b024ae25..bea30aec 100644
--- a/scubagoggles/main.py
+++ b/scubagoggles/main.py
@@ -6,7 +6,7 @@
"""
import argparse
-from scubagoggles.orchestrator import gws_products, start_automation
+from scubagoggles.orchestrator import Orchestrator
def get_gws_args(parser):
"""
@@ -14,7 +14,7 @@ def get_gws_args(parser):
:param parser: argparse object
"""
- gws = gws_products()
+ gws = Orchestrator.gws_products()
gws_baselines = gws["gws_baselines"]
default_file_output_names = {
@@ -140,6 +140,6 @@ def dive():
args = parser.parse_args()
if args.scuba_cmd == 'gws':
- start_automation(args)
+ Orchestrator(args).start_automation()
else:
raise Exception("Invalid subparser. Run scubagoggles -h to see a list of valid subparsers")
diff --git a/scubagoggles/orchestrator.py b/scubagoggles/orchestrator.py
index f55ffec2..167083f0 100644
--- a/scubagoggles/orchestrator.py
+++ b/scubagoggles/orchestrator.py
@@ -2,6 +2,7 @@
orchestrator.py is the main module that starts and handles the output of the
provider, rego, and report modules of the SCuBA tool
"""
+import argparse
import shutil
import os
import json
@@ -17,11 +18,16 @@
from scubagoggles.reporter import reporter, md_parser
from scubagoggles.utils import rel_abs_path
-def gws_products() -> dict:
- """
- Dictionary of the SCuBA GWS baselines short names plus full names
+
+class Orchestrator:
+
+ """The Orchestrator class runs the provider to get the GWS configuration
+ data, then runs OPA on the configuration data, and finally invokes the
+ reporter to produce the conformance reports.
"""
- gws_baselines = [
+
+ # Dictionary of the SCuBA GWS baselines short names plus full names.
+ _gws_baselines = [
"gmail",
"calendar",
"groups",
@@ -33,7 +39,7 @@ def gws_products() -> dict:
"rules",
"classroom"
]
- prod_to_fullname = {
+ _prod_to_fullname = {
"gmail": "Gmail",
"calendar": "Google Calendar",
"groups": "Groups for Business",
@@ -45,356 +51,372 @@ def gws_products() -> dict:
"rules": "Rules",
"classroom": "Google Classroom"
}
- gws = {
- "gws_baselines": gws_baselines,
- "prod_to_fullname": prod_to_fullname
+ _gws = {
+ "gws_baselines": _gws_baselines,
+ "prod_to_fullname": _prod_to_fullname
}
- return gws
-
-def run_gws_providers(args, services : dict):
- """
- Runs the provider scripts and outputs a json to path
-
- :param args: the command line arguments to this script
- :param services: a dictionary of Google API service objects
- """
-
- products = args.baselines
- out_folder = args.outputpath
- provider_dict = {}
-
- provider = Provider(services, args.customerid)
- provider_dict = provider.call_gws_providers(products, args.quiet)
- provider_dict['successful_calls'] = list(provider.successful_calls)
- provider_dict['unsuccessful_calls'] = list(provider.unsuccessful_calls)
-
- settings_json = json.dumps(provider_dict, indent = 4)
- out_path = out_folder + f'/{args.outputproviderfilename}.json'
- with open(out_path, mode="w", encoding='UTF-8') as outfile:
- outfile.write(settings_json)
-
-def rego_eval(args):
- """
- Executes the OPA executable with provider json input against
- specified rego files and outputs a json to path
-
- :param args: the command line arguments to this script
- """
-
- products = args.baselines
- products_bar = tqdm(products, leave=False, disable=args.quiet)
- out_folder = args.outputpath
- results = []
- for product in products_bar:
- product_name = product
- input_file = f'{out_folder}/{args.outputproviderfilename}.json'
- opa_path = args.opapath
- rego_path = args.regopath
-
- products_bar.set_description(f"Running Rego verification for {product}...")
- product_tests = opa_eval(
- product_name=product_name,
- input_file=input_file,
- opa_path=opa_path,
- rego_path=rego_path,
- omit_sudo=args.omitsudo,
- debug=args.debug
- )
- try:
- results.extend(product_tests[0])
- except Exception as exc:
- raise Exception("run_rego error") from exc
-
- settings_json = json.dumps(results,sort_keys=True ,indent = 4)
- out_path = out_folder + f'/{args.outputregofilename}.json'
- with open(out_path, mode="w", encoding='UTF-8') as outfile:
- outfile.write(settings_json)
-
-def pluralize(singular : str, plural : str, count : int) -> str:
- """
- If count is 1, returns the singular version of the word.
- Else returns the plural version.
- :param singular: string value in singular tense
- :param plural: string value in plural tense
- :param count: how many of string value
- """
- if count == 1:
- return singular
- return plural
-def generate_summary(stats : dict) -> str:
- """
- Craft the html-formatted summary from the stats dictionary.
- """
- n_success = stats["Passes"]
- n_warn = stats["Warnings"]
- n_fail = stats["Failures"]
- n_manual = stats["Manual"]
- n_error = stats["Errors"]
-
- pass_summary = (f"
{n_success}"
- f" {pluralize('test', 'tests', n_success)} passed
")
-
- # The warnings, failures, and manuals are only shown if they are
- # greater than zero. Reserve the space for them here. They will
- # be filled next if needed.
- warning_summary = ""
- failure_summary = ""
- manual_summary = ""
- error_summary = ""
-
- if n_warn > 0:
- warning_summary = (f"{n_warn}"
- f" {pluralize('warning', 'warnings', n_warn)}
")
- if n_fail > 0:
- failure_summary = (f"{n_fail}"
- f" {pluralize('test', 'tests', n_fail)} failed
")
- if n_manual > 0:
- manual_summary = (f"{n_manual} manual"
- f" {pluralize('check', 'checks', n_manual)} needed
")
- if n_error > 0:
- error_summary = (f"{n_error}"
- f" {pluralize('error', 'errors', n_error)}
")
-
- return f"{pass_summary}{warning_summary}{failure_summary}{manual_summary}{error_summary}"
-
-def run_reporter(args):
- """
- Creates the indvididual reports and the front page
- :param args: list of arguments to run report on
- """
-
- # Make the report output folders
- out_folder = args.outputpath
- individual_reports_path = out_folder + "/IndividualReports"
- reports_images_path = individual_reports_path + "/images"
- Path(individual_reports_path).mkdir(parents=True, exist_ok=True)
- Path(reports_images_path).mkdir(parents=True, exist_ok=True)
-
- # Copy the CISA logo to the repo folder so that it can be accessed
- # from there
- cisa_logo = str(rel_abs_path(__file__,"./reporter/images/cisa_logo.png"))
- triangle_svg = str(rel_abs_path(__file__,"./reporter/images/triangle-exclamation-solid.svg"))
- shutil.copy2(cisa_logo, reports_images_path)
- shutil.copy2(triangle_svg, reports_images_path)
-
- # we should load the testresults json here
- products = args.baselines
- prod_to_fullname = args.fullnamesdict
- test_results_json = out_folder + f'/{args.outputregofilename}.json'
- with open(test_results_json, mode='r', encoding='UTF-8') as file:
- test_results_data = json.load(file)
-
- # Get the successful/unsuccessful commands
- settings_name = f'{out_folder}/{args.outputproviderfilename}.json'
- with open(settings_name, mode='r', encoding='UTF-8') as file:
- settings_data = json.load(file)
- successful_calls = set(settings_data['successful_calls'])
- unsuccessful_calls = set(settings_data['unsuccessful_calls'])
-
- # baseline_path
- subset_prod_to_fullname = {
- key: prod_to_fullname[key]
- for key in args.baselines
- if key in prod_to_fullname
- }
-
- baseline_policies = md_parser.read_baseline_docs(args.documentpath,subset_prod_to_fullname)
-
- if "rules" in args.baselines:
- # There's no baseline specific to rules, so this case
- # needs to be handled separately
- baseline_policies["rules"] = []
- for group in baseline_policies['commoncontrols']:
- if group['GroupName'] == 'System-defined Rules':
- baseline_policies["rules"].append(group)
- break
+ def __init__(self, args: argparse.Namespace):
+
+ """Orchestrator class initialization
+
+ :param args: command arguments parsed by the argparse module. See
+ the GWS parser definition (get_gws_args()) in main.py for
+ information about the arguments.
+ """
+
+ self._args = args
+
+ @classmethod
+ def gws_products(cls) -> dict:
+ """
+ Dictionary of the SCuBA GWS baselines short names plus full names
+ """
+ return cls._gws
+
+ def _run_gws_providers(self, services: dict):
+ """
+ Runs the provider scripts and outputs a json to path
+
+ :param services: a dictionary of Google API service objects
+ """
+
+ args = self._args
+ products = args.baselines
+ out_folder = args.outputpath
+
+ provider = Provider(services, args.customerid)
+ provider_dict = provider.call_gws_providers(products, args.quiet)
+ provider_dict['successful_calls'] = list(provider.successful_calls)
+ provider_dict['unsuccessful_calls'] = list(provider.unsuccessful_calls)
+
+ settings_json = json.dumps(provider_dict, indent = 4)
+ out_path = out_folder + f'/{args.outputproviderfilename}.json'
+ with open(out_path, mode="w", encoding='UTF-8') as outfile:
+ outfile.write(settings_json)
+
+ def _rego_eval(self):
+ """
+ Executes the OPA executable with provider json input against
+ specified rego files and outputs a json to path
+ """
+
+ args = self._args
+ products = args.baselines
+ products_bar = tqdm(products, leave=False, disable=args.quiet)
+ out_folder = args.outputpath
+ results = []
+ for product in products_bar:
+ product_name = product
+ input_file = f'{out_folder}/{args.outputproviderfilename}.json'
+ opa_path = args.opapath
+ rego_path = args.regopath
+
+ products_bar.set_description(f"Running Rego verification for {product}...")
+ product_tests = opa_eval(
+ product_name=product_name,
+ input_file=input_file,
+ opa_path=opa_path,
+ rego_path=rego_path,
+ omit_sudo=args.omitsudo,
+ debug=args.debug
+ )
+ try:
+ results.extend(product_tests[0])
+ except Exception as exc:
+ raise Exception("run_rego error") from exc
+
+ settings_json = json.dumps(results,sort_keys=True ,indent = 4)
+ out_path = out_folder + f'/{args.outputregofilename}.json'
+ with open(out_path, mode="w", encoding='UTF-8') as outfile:
+ outfile.write(settings_json)
+
+ @staticmethod
+ def _pluralize(singular: str, plural: str, count: int) -> str:
+ """
+ If count is 1, returns the singular version of the word.
+ Else returns the plural version.
+ :param singular: string value in singular tense
+ :param plural: string value in plural tense
+ :param count: how many of string value
+ """
+ if count == 1:
+ return singular
+ return plural
+
+ @classmethod
+ def _generate_summary(cls, stats: dict) -> str:
+ """
+ Craft the html-formatted summary from the stats dictionary.
+ """
+ n_success = stats["Passes"]
+ n_warn = stats["Warnings"]
+ n_fail = stats["Failures"]
+ n_manual = stats["Manual"]
+ n_error = stats["Errors"]
+
+ pass_summary = (f"{n_success}"
+ f" {cls._pluralize('test', 'tests', n_success)} passed
")
+
+ # The warnings, failures, and manuals are only shown if they are
+ # greater than zero. Reserve the space for them here. They will
+ # be filled next if needed.
+ warning_summary = ""
+ failure_summary = ""
+ manual_summary = ""
+ error_summary = ""
+
+ if n_warn > 0:
+ warning_summary = (f"{n_warn}"
+ f" {cls._pluralize('warning', 'warnings', n_warn)}
")
+ if n_fail > 0:
+ failure_summary = (f"{n_fail}"
+ f" {cls._pluralize('test', 'tests', n_fail)} failed
")
+ if n_manual > 0:
+ manual_summary = (f"{n_manual} manual"
+ f" {cls._pluralize('check', 'checks', n_manual)} needed
")
+ if n_error > 0:
+ error_summary = (f"{n_error}"
+ f" {cls._pluralize('error', 'errors', n_error)}
")
+
+ return f"{pass_summary}{warning_summary}{failure_summary}{manual_summary}{error_summary}"
+
+ def _run_reporter(self):
+ """
+ Creates the individual reports and the front page
+ """
+
+ # Make the report output folders
+ args = self._args
+ out_folder = args.outputpath
+ individual_reports_path = out_folder + "/IndividualReports"
+ reports_images_path = individual_reports_path + "/images"
+ Path(individual_reports_path).mkdir(parents=True, exist_ok=True)
+ Path(reports_images_path).mkdir(parents=True, exist_ok=True)
+
+ # Copy the CISA logo to the repo folder so that it can be accessed
+ # from there
+ images_dir = Path(__file__).parent / 'reporter' / 'images'
+ cisa_logo = images_dir / 'cisa_logo.png'
+ triangle_svg = images_dir / 'triangle-exclamation-solid.svg'
+ shutil.copy2(cisa_logo, reports_images_path)
+ shutil.copy2(triangle_svg, reports_images_path)
+
+ # we should load the test results json here
+ products = args.baselines
+ prod_to_fullname = args.fullnamesdict
+ test_results_json = out_folder + f'/{args.outputregofilename}.json'
+ with open(test_results_json, mode='r', encoding='UTF-8') as file:
+ test_results_data = json.load(file)
+
+ # Get the successful/unsuccessful commands
+ settings_name = f'{out_folder}/{args.outputproviderfilename}.json'
+ with open(settings_name, mode='r', encoding='UTF-8') as file:
+ settings_data = json.load(file)
+ successful_calls = set(settings_data['successful_calls'])
+ unsuccessful_calls = set(settings_data['unsuccessful_calls'])
+
+ # baseline_path
+ subset_prod_to_fullname = {
+ key: prod_to_fullname[key]
+ for key in args.baselines
+ if key in prod_to_fullname
+ }
+
+ baseline_policies = md_parser.read_baseline_docs(args.documentpath,
+ subset_prod_to_fullname)
+
+ if "rules" in args.baselines:
+ # There's no baseline specific to rules, so this case
+ # needs to be handled separately
+ baseline_policies["rules"] = []
+ for group in baseline_policies['commoncontrols']:
+ if group['GroupName'] == 'System-defined Rules':
+ baseline_policies["rules"].append(group)
+ break
+ else:
+ raise RuntimeError("Unable to process 'rules' as no policy group named "
+ "'System-defined Rules' found in the Common Controls baseline.")
+
+ # Load Org metadata from provider
+ with open(f'{out_folder}/{args.outputproviderfilename}.json',
+ mode='r',encoding='UTF-8') as file:
+ tenant_info = json.load(file)['tenant_info']
+ tenant_domain = tenant_info['domain']
+
+ # Create the individual report files
+ out_jsonfile = args.outjsonfilename
+ summary = {}
+ results = {}
+ total_output = {}
+ stats_and_data = {}
+
+ products_assessed = [prod_to_fullname[product] for product in products
+ if product in prod_to_fullname]
+ product_abbreviation_mapping = {fullname: shortname for shortname,
+ fullname in prod_to_fullname.items()}
+
+ timestamp_utc = datetime.utcnow()
+ timestamp_zulu = timestamp_utc.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
+
+ report_metadata = {
+ "TenantId": None,
+ "DisplayName": None,
+ "DomainName": tenant_domain,
+ "ProductSuite": "GWS",
+ "ProductsAssessed": products_assessed,
+ "ProductAbbreviationMapping": product_abbreviation_mapping,
+ "Tool": "ScubaGoggles",
+ "ToolVersion": "0.2.0",
+ "TimeStampZulu": timestamp_zulu
+ }
+
+ total_output.update({"MetaData": report_metadata})
+
+ main_report_name = args.outputreportfilename
+ products_bar = tqdm(products, leave=False, disable=args.quiet)
+ for product in products_bar:
+ products_bar.set_description(f"Creating the HTML and JSON Report for {product}...")
+ stats_and_data[product] = reporter.rego_json_to_ind_reports(
+ test_results_data,
+ product,
+ out_folder,
+ tenant_domain,
+ main_report_name,
+ prod_to_fullname,
+ baseline_policies[product],
+ successful_calls,
+ unsuccessful_calls
+ )
+ baseline_product_summary = {product:stats_and_data[product][0]}
+ baseline_product_results_json = {product:stats_and_data[product][1]}
+ summary.update(baseline_product_summary)
+ results.update(baseline_product_results_json)
+ total_output.update({"Summary": summary})
+ total_output.update({"Results": results})
+
+ # Create the ScubaResults files
+ with open(f'{out_folder}/{args.outputproviderfilename}.json', encoding='UTF-8') as file:
+ raw_data = json.load(file)
+ total_output.update({"Raw": raw_data})
+ report = json.dumps(total_output, indent = 4)
+ with open(f"{out_folder}/{out_jsonfile}.json", mode='w', encoding='UTF-8') as results_file:
+ results_file.write(report)
+
+ # Delete the ProviderOutput file as it's now encapsulated in the ScubaResults file
+ os.remove(f"{out_folder}/{args.outputproviderfilename}.json")
+
+ # Make the report front page
+ report_path = out_folder + "/" + f'{args.outputreportfilename}.html'
+ abs_report_path = os.path.abspath(report_path)
+
+ fragments = []
+ table_data = []
+ for product, stats in stats_and_data.items():
+ ## Build the "Baseline Conformance Reports" column
+ product_capitalize = product.capitalize()
+ full_name = prod_to_fullname[product]
+ link_path = "./IndividualReports/" f"{product_capitalize}Report.html"
+ link = f"{full_name}"
+ table_data.append({
+ "Baseline Conformance Reports": link,
+ "Details": self._generate_summary(stats[0])
+ })
+
+ fragments.append(reporter.create_html_table(table_data))
+ with open(f"{report_path}", mode='w', encoding='UTF-8') as file:
+ file.write(reporter.build_front_page_html(fragments, tenant_info))
+
+ # suppress opening the report in the browser
+ if args.quiet:
+ return
+ # Open the report in the client's default web browser
+ # pylint: disable=E1101
+ if os.name == 'nt':
+ os.startfile(abs_report_path)
else:
- raise RuntimeError("Unable to process 'rules' as no policy group named "
- "'System-defined Rules' found in the Common Controls baseline.")
-
- # Load Org metadata from provider
- with open(f'{out_folder}/{args.outputproviderfilename}.json',
- mode='r',encoding='UTF-8') as file:
- tenant_info = json.load(file)['tenant_info']
- tenant_domain = tenant_info['domain']
-
-
- # Create the the individual report files
- out_jsonfile = args.outjsonfilename
- summary = {}
- results = {}
- total_output = {}
- stats_and_data = {}
-
- products_assessed = [prod_to_fullname[product] for product in products
- if product in prod_to_fullname]
- product_abbreviation_mapping = {fullname: shortname for shortname,
- fullname in prod_to_fullname.items()}
-
- timestamp_utc = datetime.utcnow()
- timestamp_zulu = timestamp_utc.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
-
- report_metadata = {
- "TenantId": None,
- "DisplayName": None,
- "DomainName": tenant_domain,
- "ProductSuite": "GWS",
- "ProductsAssessed": products_assessed,
- "ProductAbbreviationMapping": product_abbreviation_mapping,
- "Tool": "ScubaGoggles",
- "ToolVersion": "0.2.0",
- "TimeStampZulu": timestamp_zulu
- }
+ report_path = "file:///" + abs_report_path
+ webbrowser.get().open(report_path, new=2)
- total_output.update({"MetaData": report_metadata})
-
- main_report_name = args.outputreportfilename
- products_bar = tqdm(products, leave=False, disable=args.quiet)
- for product in products_bar:
- products_bar.set_description(f"Creating the HTML and JSON Report for {product}...")
- stats_and_data[product] = reporter.rego_json_to_ind_reports(
- test_results_data,
- product,
- out_folder,
- tenant_domain,
- main_report_name,
- prod_to_fullname,
- baseline_policies[product],
- successful_calls,
- unsuccessful_calls
- )
- baseline_product_summary = {product:stats_and_data[product][0]}
- baseline_product_results_json = {product:stats_and_data[product][1]}
- summary.update(baseline_product_summary)
- results.update(baseline_product_results_json)
- total_output.update({"Summary": summary})
- total_output.update({"Results": results})
-
- # Create the ScubaResults files
- with open(f'{out_folder}/{args.outputproviderfilename}.json', encoding='UTF-8') as file:
- raw_data = json.load(file)
- total_output.update({"Raw": raw_data})
- report = json.dumps(total_output, indent = 4)
- with open(f"{out_folder}/{out_jsonfile}.json", mode='w', encoding='UTF-8') as results_file:
- results_file.write(report)
-
- # Delete the ProviderOutput file as it's now encapsulated in the ScubaResults file
- os.remove(f"{out_folder}/{args.outputproviderfilename}.json")
-
- # Make the report front page
- report_path = out_folder + "/" + f'{args.outputreportfilename}.html'
- abs_report_path = os.path.abspath(report_path)
-
- fragments = []
- table_data = []
- for product, stats in stats_and_data.items():
- ## Build the "Baseline Conformance Reports" column
- product_capitalize = product.capitalize()
- full_name = prod_to_fullname[product]
- link_path = "./IndividualReports/" f"{product_capitalize}Report.html"
- link = f"{full_name}"
- table_data.append({
- "Baseline Conformance Reports": link,
- "Details": generate_summary(stats[0])
- })
-
- fragments.append(reporter.create_html_table(table_data))
- with open(f"{report_path}", mode='w', encoding='UTF-8') as file:
- file.write(reporter.build_front_page_html(fragments, tenant_info))
-
- # suppress opening the report in the browser
- if args.quiet:
- return
- # Open the report in the client's default web browser
- # pylint: disable=E1101
- if os.name == 'nt':
- os.startfile(abs_report_path)
- else:
- report_path = "file:///" + abs_report_path
- webbrowser.get().open(report_path, new=2)
-
-def run_cached(args):
- """
- Has the ability to run scuba on a cached provider json
-
- :param args: argparse object containing arguments to run
- """
-
- args.outputpath = str(rel_abs_path(__file__,args.outputpath))
- Path(args.outputpath).mkdir(parents=True, exist_ok=True)
- args.outputpath = os.path.abspath(args.outputpath)
-
- if not args.skipexport:
- creds = gws_auth(args.credentials)
- services = {}
- services['reports'] = build('admin', 'reports_v1', credentials=creds)
- services['directory'] = build('admin', 'directory_v1', credentials=creds)
- services['groups'] = build('groupssettings', 'v1', credentials=creds)
- run_gws_providers(args, services)
-
- if not os.path.exists(f'{args.outputpath}/{args.outputproviderfilename}.json'):
- # When running run_cached, the provider output might not exist as a stand-alone
- # file depending what version of ScubaGoggles created the output. If the provider
- # ouptut doesn't exist as a standa-lone file, create it from the scuba results
- # file so the other functions can execute as normal.
- with open(f'{args.outputpath}/{args.outjsonfilename}.json', 'r',
- encoding='UTF-8') as scuba_results:
- provider_output = json.load(scuba_results)['Raw']
- with open(f'{args.outputpath}/{args.outputproviderfilename}.json', 'w',
- encoding='UTF-8') as provider_file:
- json.dump(provider_output, provider_file)
- rego_eval(args)
- run_reporter(args)
-
-def start_automation(args):
- """
- Main orchestration function
-
- :param args: argparse object containing arguments to run
- """
+ def _run_cached(self):
+ """
+ Has the ability to run scuba on a cached provider json
+ """
- if "commoncontrols" in args.baselines and "rules" not in args.baselines:
- args.baselines.append("rules")
- if "rules" in args.baselines and "commoncontrols" not in args.baselines:
- args.baselines.append("commoncontrols")
- args.baselines.sort()
-
- # get the absolute paths relative to this directory
- args.outputpath = (Path.cwd() / args.outputpath).resolve()
- args.credentials = (Path.cwd() / args.credentials).resolve()
- args.opapath = Path(args.opapath).resolve()
- args.regopath = Path(args.regopath).resolve()
- args.documentpath = Path(args.documentpath).resolve()
-
- # add any additional variables to args
- gws_params = gws_products()
- additional_args = vars(args)
- additional_args['fullnamesdict'] = gws_params["prod_to_fullname"]
-
- if args.skipexport and not args.runcached:
- exc = 'Used --skipexport without --runcached' \
- 'please rerun scubagoggles with --runcached as well'
- raise Exception(exc)
-
- if not args.runcached:
- # create a timestamped output folder
- now = datetime.now()
- folder_time = now.strftime("%Y_%m_%d_%H_%M_%S")
- timestamped_folder = f'{args.outputfoldername}_{folder_time}'
- args.outputpath = (args.outputpath / timestamped_folder).resolve()
+ args = self._args
+ args.outputpath = str(rel_abs_path(__file__,args.outputpath))
Path(args.outputpath).mkdir(parents=True, exist_ok=True)
args.outputpath = os.path.abspath(args.outputpath)
- # authenticate
- creds = gws_auth(args.credentials, args.subjectemail)
- services = {}
- services['reports'] = build('admin', 'reports_v1', credentials=creds)
- services['directory'] = build('admin', 'directory_v1', credentials=creds)
- services['groups'] = build('groupssettings', 'v1', credentials=creds)
-
- run_gws_providers(args, services)
- rego_eval(args)
- run_reporter(args)
- else:
- run_cached(args)
+ if not args.skipexport:
+ creds = gws_auth(args.credentials)
+ services = {}
+ services['reports'] = build('admin', 'reports_v1', credentials=creds)
+ services['directory'] = build('admin', 'directory_v1', credentials=creds)
+ services['groups'] = build('groupssettings', 'v1', credentials=creds)
+ self._run_gws_providers(services)
+
+ if not os.path.exists(f'{args.outputpath}/{args.outputproviderfilename}.json'):
+ # When running run_cached, the provider output might not exist as a stand-alone
+ # file depending what version of ScubaGoggles created the output. If the provider
+ # ouptut doesn't exist as a standa-lone file, create it from the scuba results
+ # file so the other functions can execute as normal.
+ with open(f'{args.outputpath}/{args.outjsonfilename}.json', 'r',
+ encoding='UTF-8') as scuba_results:
+ provider_output = json.load(scuba_results)['Raw']
+ with open(f'{args.outputpath}/{args.outputproviderfilename}.json', 'w',
+ encoding='UTF-8') as provider_file:
+ json.dump(provider_output, provider_file)
+ self._rego_eval()
+ self._run_reporter()
+
+ def start_automation(self):
+ """
+ Main orchestration function
+ """
+
+ args = self._args
+ if "commoncontrols" in args.baselines and "rules" not in args.baselines:
+ args.baselines.append("rules")
+ if "rules" in args.baselines and "commoncontrols" not in args.baselines:
+ args.baselines.append("commoncontrols")
+ args.baselines.sort()
+
+ # get the absolute paths relative to this directory
+ args.outputpath = (Path.cwd() / args.outputpath).resolve()
+ args.credentials = (Path.cwd() / args.credentials).resolve()
+ args.opapath = Path(args.opapath).resolve()
+ args.regopath = Path(args.regopath).resolve()
+ args.documentpath = Path(args.documentpath).resolve()
+
+ # add any additional variables to args
+ gws_params = self.gws_products()
+ additional_args = vars(args)
+ additional_args['fullnamesdict'] = gws_params["prod_to_fullname"]
+
+ if args.skipexport and not args.runcached:
+ exc = 'Used --skipexport without --runcached' \
+ 'please rerun scubagoggles with --runcached as well'
+ raise Exception(exc)
+
+ if not args.runcached:
+ # create a timestamped output folder
+ now = datetime.now()
+ folder_time = now.strftime("%Y_%m_%d_%H_%M_%S")
+ timestamped_folder = f'{args.outputfoldername}_{folder_time}'
+ args.outputpath = (args.outputpath / timestamped_folder).resolve()
+ Path(args.outputpath).mkdir(parents=True, exist_ok=True)
+ args.outputpath = os.path.abspath(args.outputpath)
+
+ # authenticate
+ creds = gws_auth(args.credentials, args.subjectemail)
+ services = {}
+ services['reports'] = build('admin', 'reports_v1', credentials=creds)
+ services['directory'] = build('admin', 'directory_v1', credentials=creds)
+ services['groups'] = build('groupssettings', 'v1', credentials=creds)
+
+ self._run_gws_providers(services)
+ self._rego_eval()
+ self._run_reporter()
+ else:
+ self._run_cached()