From acdd622192c732a1f6797cfb7cee67396687d460 Mon Sep 17 00:00:00 2001 From: andrewkrug Date: Wed, 30 Jan 2019 07:46:39 -0800 Subject: [PATCH] fix pep8 and re wrap using black formatter --- dashboard/__init__.py | 35 ++--- dashboard/api/idp.py | 99 +++++++----- dashboard/app.py | 212 ++++++++++++-------------- dashboard/auth.py | 107 +++++++------ dashboard/config.py | 67 +++++---- dashboard/csp.py | 65 ++++---- dashboard/models/__init__.py | 6 +- dashboard/models/alert.py | 284 ++++++++++++++++++----------------- dashboard/models/tile.py | 68 +++------ dashboard/models/user.py | 190 ++++++++++++----------- dashboard/op/yaml_loader.py | 26 ++-- dashboard/person.py | 25 +-- dashboard/vanity.py | 14 +- 13 files changed, 611 insertions(+), 587 deletions(-) diff --git a/dashboard/__init__.py b/dashboard/__init__.py index d0ee3832..df641fec 100644 --- a/dashboard/__init__.py +++ b/dashboard/__init__.py @@ -13,20 +13,11 @@ """Mozilla Single Signon Dashboard.""" __author__ = """Andrew Krug""" -__email__ = 'akrug@mozilla.com' -__version__ = '0.0.1' +__email__ = "akrug@mozilla.com" +__version__ = "0.0.1" -__all__ = [ - 'app', - 'auth', - 'config', - 'models', - 'person', - 's3', - 'utils', - 'vanity' -] +__all__ = ["app", "auth", "config", "models", "person", "s3", "utils", "vanity"] class CredstashEnv(object): @@ -37,9 +28,9 @@ def get(self, key, namespace=None): try: if len(namespace) > 0: secret = getSecret( - name='{}.{}'.format(namespace[0], key), - context={'app': 'sso-dashboard'}, - region="us-east-1" + name="{}.{}".format(namespace[0], key), + context={"app": "sso-dashboard"}, + region="us-east-1", ) else: secret = None @@ -55,11 +46,13 @@ def get(self, key, namespace=None): def get_config(): return ConfigManager( [ - ConfigIniEnv([ - os.environ.get('DASHBOARD_CONFIG_INI'), - '~/.sso-dashboard.ini', - '/etc/sso-dashboard.ini' - ]), - CredstashEnv() + ConfigIniEnv( + [ + os.environ.get("DASHBOARD_CONFIG_INI"), + "~/.sso-dashboard.ini", + "/etc/sso-dashboard.ini", + ] + ), + CredstashEnv(), ] ) diff --git a/dashboard/api/idp.py b/dashboard/api/idp.py index dae120c9..bb586c44 100644 --- a/dashboard/api/idp.py +++ b/dashboard/api/idp.py @@ -13,15 +13,17 @@ class AuthorizeAPI(object): def __init__(self, app, oidc_config): self.app = app - self.algorithms = 'RS256' + self.algorithms = "RS256" self.auth0_domain = oidc_config.OIDC_DOMAIN # auth.mozilla.auth0.com self.audience = self._get_audience(self.app.config) def _get_audience(self, app_config): - if app_config['SERVER_NAME'] == 'localhost:5000': - return 'https://sso.allizom.org' + if app_config["SERVER_NAME"] == "localhost:5000": + return "https://sso.allizom.org" else: - return 'https://' + self.app.config.get('SERVER_NAME', 'sso.mozilla.com') # sso.mozilla.com + return "https://" + self.app.config.get( + "SERVER_NAME", "sso.mozilla.com" + ) # sso.mozilla.com # Format error response and append status code def get_token_auth_header(self): @@ -29,25 +31,36 @@ def get_token_auth_header(self): """ auth = request.headers.get("Authorization", None) if not auth: - raise AuthError({"code": "authorization_header_missing", - "description": - "Authorization header is expected"}, 401) + raise AuthError( + { + "code": "authorization_header_missing", + "description": "Authorization header is expected", + }, + 401, + ) parts = auth.split() if parts[0].lower() != "bearer": - raise AuthError({"code": "invalid_header", - "description": - "Authorization header must start with" - " Bearer"}, 401) + raise AuthError( + { + "code": "invalid_header", + "description": "Authorization header must start with" " Bearer", + }, + 401, + ) elif len(parts) == 1: - raise AuthError({"code": "invalid_header", - "description": "Token not found"}, 401) + raise AuthError( + {"code": "invalid_header", "description": "Token not found"}, 401 + ) elif len(parts) > 2: - raise AuthError({"code": "invalid_header", - "description": - "Authorization header must be" - " Bearer token"}, 401) + raise AuthError( + { + "code": "invalid_header", + "description": "Authorization header must be" " Bearer token", + }, + 401, + ) token = parts[1] return token @@ -60,6 +73,7 @@ def get_jwks(self): def requires_api_auth(self, f): """Determines if the Access Token is valid """ + @wraps(f) def decorated(*args, **kwargs): token = self.get_token_auth_header() @@ -73,7 +87,7 @@ def decorated(*args, **kwargs): "kid": key["kid"], "use": key["use"], "n": key["n"], - "e": key["e"] + "e": key["e"], } if rsa_key: try: @@ -83,29 +97,44 @@ def decorated(*args, **kwargs): rsa_key, algorithms=self.algorithms, audience=self.audience, - issuer="https://" + self.auth0_domain + "/" + issuer="https://" + self.auth0_domain + "/", ) except jwt.ExpiredSignatureError as e: logger.error(e) - raise AuthError({"code": "token_expired", - "description": "token is expired"}, 401) + raise AuthError( + {"code": "token_expired", "description": "token is expired"}, + 401, + ) except jwt.JWTClaimsError as e: logger.error(e) - raise AuthError({"code": "invalid_claims", - "description": - "incorrect claims," - "please check the audience and issuer"}, 401) + raise AuthError( + { + "code": "invalid_claims", + "description": "incorrect claims," + "please check the audience and issuer", + }, + 401, + ) except Exception as e: logger.error(e) - raise AuthError({"code": "invalid_header", - "description": - "Unable to parse authentication" - " token."}, 401) + raise AuthError( + { + "code": "invalid_header", + "description": "Unable to parse authentication" " token.", + }, + 401, + ) _request_ctx_stack.top.current_user = payload return f(*args, **kwargs) - raise AuthError({"code": "invalid_header", - "description": "Unable to find appropriate key"}, 401) + raise AuthError( + { + "code": "invalid_header", + "description": "Unable to find appropriate key", + }, + 401, + ) + return decorated def requires_scope(self, required_scope): @@ -116,8 +145,8 @@ def requires_scope(self, required_scope): token = self.get_token_auth_header() unverified_claims = jwt.get_unverified_claims(token) if unverified_claims.get("scope"): - token_scopes = unverified_claims["scope"].split() - for token_scope in token_scopes: - if token_scope == required_scope: - return True + token_scopes = unverified_claims["scope"].split() + for token_scope in token_scopes: + if token_scope == required_scope: + return True return False diff --git a/dashboard/app.py b/dashboard/app.py index edddaf6e..b1520d9d 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -39,51 +39,53 @@ logging.basicConfig(level=logging.INFO) -with open('dashboard/logging.yml', 'r') as log_config: +with open("dashboard/logging.yml", "r") as log_config: config_yml = log_config.read() config_dict = yaml.load(config_yml) logging.config.dictConfig(config_dict) -logger = logging.getLogger('sso-dashboard') +logger = logging.getLogger("sso-dashboard") app = Flask(__name__) everett_config = get_config() # Enable monitoring endpoint -if everett_config('enable_prometheus_monitoring', namespace='sso-dashboard', default='False') == 'True': +if ( + everett_config( + "enable_prometheus_monitoring", namespace="sso-dashboard", default="False" + ) + == "True" +): os.environ["prometheus_multiproc_dir"] = "/tmp" registry = CollectorRegistry() - multiprocess.MultiProcessCollector(registry, path='/tmp') + multiprocess.MultiProcessCollector(registry, path="/tmp") metrics = PrometheusMetrics(app) metrics.start_http_server( int( - everett_config('prometheus_monitoring_port', namespace='sso-dashboard', default='9000') + everett_config( + "prometheus_monitoring_port", namespace="sso-dashboard", default="9000" + ) ) ) -talisman = Talisman( - app, content_security_policy=DASHBOARD_CSP, - force_https=False -) +talisman = Talisman(app, content_security_policy=DASHBOARD_CSP, force_https=False) app.config.from_object(config.Config(app).settings) app_list = S3Transfer(config.Config(app).settings) app_list.sync_config() assets = Environment(app) -js = Bundle('js/base.js', filters='jsmin', output='js/gen/packed.js') -assets.register('js_all', js) +js = Bundle("js/base.js", filters="jsmin", output="js/gen/packed.js") +assets.register("js_all", js) -sass = Bundle('css/base.scss', filters='scss') -css = Bundle(sass, filters='cssmin', output='css/gen/all.css') -assets.register('css_all', css) +sass = Bundle("css/base.scss", filters="scss") +css = Bundle(sass, filters="cssmin", output="css/gen/all.css") +assets.register("css_all", css) # Hack to support serving .svg -mimetypes.add_type('image/svg+xml', '.svg') +mimetypes.add_type("image/svg+xml", ".svg") oidc_config = config.OIDCConfig() -authentication = auth.OpenIDConnect( - oidc_config -) +authentication = auth.OpenIDConnect(oidc_config) oidc = authentication.auth(app) person_api = person.API() @@ -92,60 +94,57 @@ api = idp.AuthorizeAPI(app, oidc_config) -@app.route('/favicon.ico') +@app.route("/favicon.ico") def favicon(): - return send_from_directory(os.path.join(app.root_path, 'static/img'), 'favicon.ico') + return send_from_directory(os.path.join(app.root_path, "static/img"), "favicon.ico") -@app.route('/') +@app.route("/") def home(): - return redirect('/dashboard', code=302) + return redirect("/dashboard", code=302) -@app.route('/csp_report', methods=['POST']) +@app.route("/csp_report", methods=["POST"]) def csp_report(): - return '200' + return "200" # XXX This needs to load the schema from a better location # See also https://github.com/mozilla/iam-project-backlog/issues/161 -@app.route('/claim') +@app.route("/claim") def claim(): """Show the user schema - this path is refered to by our OIDC Claim namespace, i.e.: https://sso.mozilla.com/claim/*""" return redirect( - 'https://github.com/mozilla-iam/cis/blob/master/cis/schema.json', - code=302 + "https://github.com/mozilla-iam/cis/blob/master/cis/schema.json", code=302 ) @app.errorhandler(404) def page_not_found(error): if request.url is not None: - logger.error( - "A 404 has been generated for {route}".format(route=request.url) - ) - return render_template('404.html'), 404 + logger.error("A 404 has been generated for {route}".format(route=request.url)) + return render_template("404.html"), 404 -@app.route('/forbidden') +@app.route("/forbidden") def forbidden(): """Route to render error page.""" - if 'error' not in request.args: - return render_template('forbidden.html') + if "error" not in request.args: + return render_template("forbidden.html") else: - jws = request.args.get('error').encode() + jws = request.args.get("error").encode() - token_verifier = auth.tokenVerification(jws=jws, public_key=app.config['FORBIDDEN_PAGE_PUBLIC_KEY']) + token_verifier = auth.tokenVerification( + jws=jws, public_key=app.config["FORBIDDEN_PAGE_PUBLIC_KEY"] + ) token_verifier.verify - return render_template( - 'forbidden.html', token_verifier=token_verifier - ) + return render_template("forbidden.html", token_verifier=token_verifier) -@app.route('/logout') +@app.route("/logout") @oidc.oidc_logout def logout(): """ @@ -158,7 +157,7 @@ def logout(): return redirect(logout_url, code=302) -@app.route('/autologin-settings') +@app.route("/autologin-settings") def showautologinsettings(): """ Redirect to NLX Auto-login Settings page @@ -169,185 +168,166 @@ def showautologinsettings(): return redirect(autologin_settings_url, code=302) -@app.route('/signout.html') +@app.route("/signout.html") def signout(): logger.info("Signout messaging displayed.") - return render_template('signout.html') + return render_template("signout.html") -@app.route('/dashboard') +@app.route("/dashboard") @oidc.oidc_auth def dashboard(): """Primary dashboard the users will interact with.""" - logger.info("User: {} authenticated proceeding to dashboard.".format(session.get('id_token')['sub'])) + logger.info( + "User: {} authenticated proceeding to dashboard.".format( + session.get("id_token")["sub"] + ) + ) - if "Mozilla-LDAP" in session.get('userinfo')['sub']: + if "Mozilla-LDAP" in session.get("userinfo")["sub"]: logger.info("Mozilla IAM user detected. Attempt enriching with ID-Vault data.") try: - session['idvault_userinfo'] = person_api.get_userinfo(session.get('id_token')['sub']) + session["idvault_userinfo"] = person_api.get_userinfo( + session.get("id_token")["sub"] + ) except Exception as e: - logger.error("Could not enrich profile due to: {}. Perhaps it doesn't exist?".format(e)) + logger.error( + "Could not enrich profile due to: {}. Perhaps it doesn't exist?".format( + e + ) + ) # Hotfix to set user id for firefox alert # XXXTBD Refactor rules later to support full id_conformant session - session['userinfo']['user_id'] = session.get('id_token')['sub'] + session["userinfo"]["user_id"] = session.get("id_token")["sub"] # Transfer any updates in to the app_tiles. S3Transfer(config.Config(app).settings).sync_config() # Send the user session and browser headers to the alert rules engine. - Rules(userinfo=session['userinfo'], request=request).run() + Rules(userinfo=session["userinfo"], request=request).run() user = User(session, config.Config(app).settings) apps = user.apps(Application(app_list.apps_yml).apps) return render_template( - 'dashboard.html', - config=app.config, - user=user, - apps=apps, - alerts=None + "dashboard.html", config=app.config, user=user, apps=apps, alerts=None ) -@app.route('/styleguide/dashboard') +@app.route("/styleguide/dashboard") def styleguide_dashboard(): user = FakeUser(config.Config(app).settings) apps = user.apps(Application(app_list.apps_yml).apps) return render_template( - 'dashboard.html', - config=app.config, - user=user, - apps=apps, - alerts=None + "dashboard.html", config=app.config, user=user, apps=apps, alerts=None ) -@app.route('/styleguide/notifications') +@app.route("/styleguide/notifications") @oidc.oidc_auth def styleguide_notifications(): user = FakeUser(config.Config(app).settings) - return render_template( - 'notifications.html', - config=app.config, - user=user, - ) + return render_template("notifications.html", config=app.config, user=user) -@app.route('/notifications') +@app.route("/notifications") @oidc.oidc_auth def notifications(): user = User(session, config.Config(app).settings) - return render_template( - 'notifications.html', - config=app.config, - user=user, - ) + return render_template("notifications.html", config=app.config, user=user) @oidc.oidc_auth -@app.route('/alert/', methods=['POST']) +@app.route("/alert/", methods=["POST"]) def alert_operation(alert_id): - if request.method == 'POST': + if request.method == "POST": user = User(session, config.Config(app).settings) if request.data is not None: data = json.loads(request.data.decode()) - helpfulness = data.get('helpfulness') - alert_action = data.get('alert_action') + helpfulness = data.get("helpfulness") + alert_action = data.get("alert_action") result = user.take_alert_action(alert_id, alert_action, helpfulness) - if result['ResponseMetadata']['HTTPStatusCode'] == 200: - return '200' + if result["ResponseMetadata"]["HTTPStatusCode"] == 200: + return "200" else: - return '500' + return "500" @oidc.oidc_auth -@app.route('/alert/fake', methods=['GET']) +@app.route("/alert/fake", methods=["GET"]) def alert_faking(): - if request.method == 'GET': - if app.config.get('SERVER_NAME') != 'sso.mozilla.com': + if request.method == "GET": + if app.config.get("SERVER_NAME") != "sso.mozilla.com": """Only allow alert faking in non production environment.""" user = User(session, config.Config(app).settings) - fake_alerts = FakeAlert(user_id=user.userinfo.get('sub')) + fake_alerts = FakeAlert(user_id=user.userinfo.get("sub")) fake_alerts.create_fake_alerts() - return redirect('/dashboard', code=302) + return redirect("/dashboard", code=302) -@app.route('/api/v1/alert', methods=['GET']) +@app.route("/api/v1/alert", methods=["GET"]) @api.requires_api_auth def alert_api(): - if request.method == 'GET' and api.requires_scope("read:alert"): - user_id = request.args.get('user_id') + if request.method == "GET" and api.requires_scope("read:alert"): + user_id = request.args.get("user_id") alerts = Alert().find(user_id) result = Alert().to_summary(alerts) return jsonify(result) raise exceptions.AuthError( - { - "code": "Unauthorized", - "description": "Scope not matched. Access Denied." - }, 403 + {"code": "Unauthorized", "description": "Scope not matched. Access Denied."}, + 403, ) -@app.route('/info') +@app.route("/info") @oidc.oidc_auth def info(): """Return the JSONified user session for debugging.""" return jsonify( - id_token=session.get('id_token'), - userinfo=session.get('userinfo'), - person_api_v1=session.get('idvault_userinfo') + id_token=session.get("id_token"), + userinfo=session.get("userinfo"), + person_api_v1=session.get("idvault_userinfo"), ) -@app.route('/about') +@app.route("/about") def about(): - return render_template( - 'about.html' - ) + return render_template("about.html") -@app.route('/contribute.json') +@app.route("/contribute.json") def contribute_lower(): data = { "name": "sso-dashboard by Mozilla", "description": "A single signon dashboard for auth0.", "repository": { "url": "https://github.com/mozilla-iam/sso-dashboard", - "license": "MPL2" + "license": "MPL2", }, "participate": { "home": "https://github.com/mozilla-iam/sso-dashboard", "irc": "irc://irc.mozilla.org/#infosec", - "irc-contacts": [ - "Andrew" - ] + "irc-contacts": ["Andrew"], }, "bugs": { "list": "https://github.com/mozilla-iam/sso-dashboard/issues", "report": "https://github.com/mozilla-iam/sso-dashboard/issues/new", - "mentored": "https://github.com/mozilla-iam/sso-dashboard/issues?q=is%3Aissue+is%3Aclosed" # noqa + "mentored": "https://github.com/mozilla-iam/sso-dashboard/issues?q=is%3Aissue+is%3Aclosed", # noqa }, "urls": { "prod": "https://sso.mozilla.com/", - "stage": "https://sso.allizom.org/" + "stage": "https://sso.allizom.org/", }, - "keywords": [ - "python", - "html5", - "jquery", - "mui-css", - "sso", - "auth0" - ] + "keywords": ["python", "html5", "jquery", "mui-css", "sso", "auth0"], } return jsonify(data) -if __name__ == '__main__': +if __name__ == "__main__": app.run() diff --git a/dashboard/auth.py b/dashboard/auth.py index 9b49bda9..898546a0 100644 --- a/dashboard/auth.py +++ b/dashboard/auth.py @@ -19,7 +19,7 @@ def __init__(self, configuration): def client_info(self): return dict( client_id=self.oidc_config.client_id, - client_secret=self.oidc_config.client_secret + client_secret=self.oidc_config.client_secret, ) def provider_info(self): @@ -27,15 +27,14 @@ def provider_info(self): issuer="https://{DOMAIN}/".format(DOMAIN=self.oidc_config.OIDC_DOMAIN), authorization_endpoint=self.oidc_config.auth_endpoint(), token_endpoint=self.oidc_config.token_endpoint(), - userinfo_endpoint=self.oidc_config.userinfo_endpoint() - + userinfo_endpoint=self.oidc_config.userinfo_endpoint(), ) def auth(self, app): o = OIDCAuthentication( app, provider_configuration_info=self.provider_info(), - client_registration_info=self.client_info() + client_registration_info=self.client_info(), ) return o @@ -49,10 +48,7 @@ def __init__(self, configuration): self.oidc_config = None def client_info(self): - return dict( - client_id=None, - client_secret=None - ) + return dict(client_id=None, client_secret=None) def provider_info(self): return dict( @@ -79,28 +75,29 @@ def data(self): @property def error_code(self): - return self.jws_data.get('code', None) + return self.jws_data.get("code", None) @property def preferred_connection_name(self): - return self.jws_data.get('preferred_connection_name', 'Unknown') + return self.jws_data.get("preferred_connection_name", "Unknown") @property def redirect_uri(self): - return self.jws_data.get('redirect_uri', 'https://sso.mozilla.com') + return self.jws_data.get("redirect_uri", "https://sso.mozilla.com") def _get_connection_name(self, connection): CONNECTION_NAMES = { - 'google-oauth2': 'Google', - 'github': 'GitHub', - 'firefoxaccounts': 'Firefox Accounts', - 'Mozilla-LDAP-Dev': 'LDAP', - 'Mozilla-LDAP': 'LDAP', - 'email': 'passwordless email' + "google-oauth2": "Google", + "github": "GitHub", + "firefoxaccounts": "Firefox Accounts", + "Mozilla-LDAP-Dev": "LDAP", + "Mozilla-LDAP": "LDAP", + "email": "passwordless email", } return ( CONNECTION_NAMES[connection] - if connection in CONNECTION_NAMES else connection + if connection in CONNECTION_NAMES + else connection ) def _signed(self, jwk): @@ -114,62 +111,82 @@ def _verified(self): jwk = JWK.load(self.public_key) self.jws_obj = JWS.from_compact(self.jws) if self._signed(jwk) is False: - logger.warning('The public key signature was not valid for jws {jws}'.format(jws=self.jws)) + logger.warning( + "The public key signature was not valid for jws {jws}".format( + jws=self.jws + ) + ) self.jws_data = json.loads(self.jws.payload) - self.jws_data['code'] = 'invalid' + self.jws_data["code"] = "invalid" return False else: self.jws_data = json.loads(self.jws_obj.payload.decode()) - logger.info('Loaded JWS data.') - self.jws_data['connection_name'] = self._get_connection_name(self.jws_data['connection']) + logger.info("Loaded JWS data.") + self.jws_data["connection_name"] = self._get_connection_name( + self.jws_data["connection"] + ) return True except UnicodeDecodeError: return False def error_message(self): error_code = self.error_code - if error_code == 'githubrequiremfa': - error_text = \ - "You must setup a security device (\"MFA\", \"2FA\") for your GitHub account in order to access \ + if error_code == "githubrequiremfa": + error_text = 'You must setup a security device ("MFA", "2FA") for your GitHub account in order to access \ this service. Please follow the \ - \ + \ GitHub documentation\ - to setup your device, then try logging in again." - elif error_code == 'fxarequiremfa': - error_text = \ - "Please \ + to setup your device, then try logging in again.' + elif error_code == "fxarequiremfa": + error_text = 'Please \ secure your Firefox Account with two-step authentication, \ then try logging in again.\n

\n\ If you have just setup your security device and you see this message, please log out of \ - Firefox Accounts (click the \"Sign out\" button), then \ - log back in." - elif error_code == 'notingroup': + Firefox Accounts (click the "Sign out" button), then \ + log back in.' + elif error_code == "notingroup": error_text = "Sorry, you do not have permission to access {client}. \ - Please contact eus@mozilla.com if you should have access.".format(client=self.data.get('client')) - elif error_code == 'accesshasexpired': + Please contact eus@mozilla.com if you should have access.".format( + client=self.data.get("client") + ) + elif error_code == "accesshasexpired": error_text = "Sorry, your access to {client} has expired because you have not been actively using it. \ - Please request access again.".format(client=self.data.get('client')) - elif error_code == 'primarynotverified': + Please request access again.".format( + client=self.data.get("client") + ) + elif error_code == "primarynotverified": "You primary email address is not yet verified. Please verify your \ email address with {connection_name} in order to use this service.".format( - connection_name=self._get_connection_name(self.jws_data.get('connection', '')) + connection_name=self._get_connection_name( + self.jws_data.get("connection", "") + ) ) - elif error_code == 'incorrectaccount': + elif error_code == "incorrectaccount": error_text = "Sorry, you may not login using {connection_name}. \ Instead, please use \ {preferred_connection_name}.".format( - connection_name=self._get_connection_name(self.jws_data.get('connection', '')), - preferred_connection_name=self._get_connection_name(self.preferred_connection_name) + connection_name=self._get_connection_name( + self.jws_data.get("connection", "") + ), + preferred_connection_name=self._get_connection_name( + self.preferred_connection_name + ), ) - elif error_code == 'aai_failed': + elif error_code == "aai_failed": error_text = "{client} requires you to setup additional security measures for your account, \ such as enabling multi-factor authentication (MFA) or using a safer authentication method (such as a \ Firefox Account login). You will not be able to login until this is \ - done.".format(client=self.data.get('client')) - elif error_code == 'staffmustuseldap': + done.".format( + client=self.data.get("client") + ) + elif error_code == "staffmustuseldap": error_text = "Staff LDAP account holders are required to use their LDAP account to login. Please go back \ and type your LDAP email address to login with your Staff account, instead of using \ - {connection_name}.".format(connection_name=self._get_connection_name(self.jws_data.get('connection', ''))) + {connection_name}.".format( + connection_name=self._get_connection_name( + self.jws_data.get("connection", "") + ) + ) else: error_text = "Oye, something went wrong." return error_text diff --git a/dashboard/config.py b/dashboard/config.py index 88e54e38..a619fe37 100644 --- a/dashboard/config.py +++ b/dashboard/config.py @@ -9,7 +9,7 @@ class Config(object): def __init__(self, app): self.app = app - self.environment = CONFIG('environment', default='development') + self.environment = CONFIG("environment", default="development") self.settings = self._init_env() def _init_env(self): @@ -18,42 +18,59 @@ def _init_env(self): class DefaultConfig(object): """Defaults for the configuration objects.""" - DEBUG = bool(CONFIG('debug', namespace='sso-dashboard', default='True')) - TESTING = bool(CONFIG('testing', namespace='sso-dashboard', default='False')) - CSRF_ENABLED = bool(CONFIG('csrf_enabled', default='True')) - PERMANENT_SESSION = bool(CONFIG('permanent_session', namespace='sso-dashboard', default='True')) - PERMANENT_SESSION_LIFETIME = int(CONFIG('permanent_session_lifetime', namespace='sso-dashboard', default='86400')) - SESSION_COOKIE_HTTPONLY = bool(CONFIG('session_cookie_httponly', namespace='sso-dashboard', default='True')) - LOGGER_NAME = CONFIG('logger_name', namespace='sso-dashboard', default='sso-dashboard') + DEBUG = bool(CONFIG("debug", namespace="sso-dashboard", default="True")) + TESTING = bool(CONFIG("testing", namespace="sso-dashboard", default="False")) + CSRF_ENABLED = bool(CONFIG("csrf_enabled", default="True")) + PERMANENT_SESSION = bool( + CONFIG("permanent_session", namespace="sso-dashboard", default="True") + ) + PERMANENT_SESSION_LIFETIME = int( + CONFIG("permanent_session_lifetime", namespace="sso-dashboard", default="86400") + ) + + SESSION_COOKIE_HTTPONLY = bool( + CONFIG("session_cookie_httponly", namespace="sso-dashboard", default="True") + ) + LOGGER_NAME = CONFIG( + "logger_name", namespace="sso-dashboard", default="sso-dashboard" + ) - SECRET_KEY = CONFIG('secret_key', namespace='sso-dashboard') - SERVER_NAME = CONFIG('server_name', namespace='sso-dashboard', default='localhost:5000') + SECRET_KEY = CONFIG("secret_key", namespace="sso-dashboard") + SERVER_NAME = CONFIG( + "server_name", namespace="sso-dashboard", default="localhost:5000" + ) - S3_BUCKET = CONFIG('s3_bucket', namespace='sso-dashboard') + S3_BUCKET = CONFIG("s3_bucket", namespace="sso-dashboard") - CDN = CONFIG('cdn', namespace='sso-dashboard', default='https://cdn.{SERVER_NAME}'.format(SERVER_NAME=SERVER_NAME)) + CDN = CONFIG( + "cdn", + namespace="sso-dashboard", + default="https://cdn.{SERVER_NAME}".format(SERVER_NAME=SERVER_NAME), + ) FORBIDDEN_PAGE_PUBLIC_KEY = base64.b64decode( - CONFIG('forbidden_page_public_key', namespace='sso-dashboard') + CONFIG("forbidden_page_public_key", namespace="sso-dashboard") ) - PREFERRED_URL_SCHEME = CONFIG('preferred_url_scheme', namespace='sso-dashboard', default='https') + PREFERRED_URL_SCHEME = CONFIG( + "preferred_url_scheme", namespace="sso-dashboard", default="https" + ) class OIDCConfig(object): """Convienience Object for returning required vars to flask.""" + def __init__(self): """General object initializer.""" CONFIG = get_config() - self.OIDC_DOMAIN = CONFIG('oidc_domain', namespace='sso-dashboard') - self.OIDC_CLIENT_ID = CONFIG('oidc_client_id', namespace='sso-dashboard') + self.OIDC_DOMAIN = CONFIG("oidc_domain", namespace="sso-dashboard") + self.OIDC_CLIENT_ID = CONFIG("oidc_client_id", namespace="sso-dashboard") self.OIDC_CLIENT_SECRET = CONFIG( - 'oidc_client_secret', namespace='sso-dashboard' + "oidc_client_secret", namespace="sso-dashboard" ) self.LOGIN_URL = "https://{DOMAIN}/login?client={CLIENT_ID}".format( - DOMAIN=self.OIDC_DOMAIN, - CLIENT_ID=self.OIDC_CLIENT_ID + DOMAIN=self.OIDC_DOMAIN, CLIENT_ID=self.OIDC_CLIENT_ID ) @property @@ -65,16 +82,10 @@ def client_secret(self): return self.OIDC_CLIENT_SECRET def auth_endpoint(self): - return "https://{DOMAIN}/authorize".format( - DOMAIN=self.OIDC_DOMAIN - ) + return "https://{DOMAIN}/authorize".format(DOMAIN=self.OIDC_DOMAIN) def token_endpoint(self): - return "https://{DOMAIN}/oauth/token".format( - DOMAIN=self.OIDC_DOMAIN - ) + return "https://{DOMAIN}/oauth/token".format(DOMAIN=self.OIDC_DOMAIN) def userinfo_endpoint(self): - return "https://{DOMAIN}/userinfo".format( - DOMAIN=self.OIDC_DOMAIN - ) + return "https://{DOMAIN}/userinfo".format(DOMAIN=self.OIDC_DOMAIN) diff --git a/dashboard/csp.py b/dashboard/csp.py index 2705f7c5..42a49c99 100644 --- a/dashboard/csp.py +++ b/dashboard/csp.py @@ -1,40 +1,37 @@ - DASHBOARD_CSP = { - 'default-src': [ - '\'self\'', + "default-src": ["'self'"], + "script-src": [ + "'self'", + "ajax.googleapis.com", + "fonts.googleapis.com", + "https://*.googletagmanager.com", + "https://tagmanager.google.com", + "https://*.google-analytics.com", + "https://cdn.sso.mozilla.com", + "https://cdn.sso.allizom.org", ], - 'script-src': [ - '\'self\'', - 'ajax.googleapis.com', - 'fonts.googleapis.com', - 'https://*.googletagmanager.com', - 'https://tagmanager.google.com', - 'https://*.google-analytics.com', - 'https://cdn.sso.mozilla.com', - 'https://cdn.sso.allizom.org' + "style-src": [ + "'self'", + "ajax.googleapis.com", + "fonts.googleapis.com", + "https://cdn.sso.mozilla.com", + "https://cdn.sso.allizom.org", ], - 'style-src': [ - '\'self\'', - 'ajax.googleapis.com', - 'fonts.googleapis.com', - 'https://cdn.sso.mozilla.com', - 'https://cdn.sso.allizom.org' + "img-src": [ + "'self'", + "https://*.mozillians.org", + "https://cdn.sso.mozilla.com", + "https://cdn.sso.allizom.org", + "https://*.google-analytics.com", + "https://*.gravatar.com", + "https://i0.wp.com/", + "https://i1.wp.com", ], - 'img-src': [ - '\'self\'', - 'https://*.mozillians.org', - 'https://cdn.sso.mozilla.com', - 'https://cdn.sso.allizom.org', - 'https://*.google-analytics.com', - 'https://*.gravatar.com', - 'https://i0.wp.com/', - 'https://i1.wp.com' + "font-src": [ + "'self'", + "fonts.googleapis.com", + "fonts.gstatic.com", + "https://cdn.sso.mozilla.com", + "https://cdn.sso.allizom.org", ], - 'font-src': [ - '\'self\'', - 'fonts.googleapis.com', - 'fonts.gstatic.com', - 'https://cdn.sso.mozilla.com', - 'https://cdn.sso.allizom.org' - ] } diff --git a/dashboard/models/__init__.py b/dashboard/models/__init__.py index 732e24b3..629ed940 100644 --- a/dashboard/models/__init__.py +++ b/dashboard/models/__init__.py @@ -1,5 +1 @@ -__all__ = [ - 'alert', - 'tile', - 'user' -] +__all__ = ["alert", "tile", "user"] diff --git a/dashboard/models/alert.py b/dashboard/models/alert.py index 6dc7c4a4..f5a66a08 100644 --- a/dashboard/models/alert.py +++ b/dashboard/models/alert.py @@ -17,6 +17,7 @@ class Feedback(object): """Send user data back to MozDef or other subscribers via an SNS Topic""" + def __init__(self, alert_dict, alert_action): self.alert_dict = alert_dict self.alert_action = alert_action @@ -26,29 +27,28 @@ def __init__(self, alert_dict, alert_action): def connect_sns(self): if self.sns is None: - self.sns = boto3.client('sns', region_name='us-west-2') + self.sns = boto3.client("sns", region_name="us-west-2") def connect_ssm(self): if self.ssm is None: - self.ssm = boto3.client('ssm', region_name='us-west-2') + self.ssm = boto3.client("ssm", region_name="us-west-2") def get_sns_arn(self): self.connect_ssm() response = self.ssm.get_parameter( - Name='sso-dashboard-alerts-sns', - WithDecryption=False + Name="sso-dashboard-alerts-sns", WithDecryption=False ) - return response.get('Parameter').get('Value') + return response.get("Parameter").get("Value") def _construct_alert(self): message = { - 'category': 'user_feedback', - 'details': { - 'action': self.alert_action, # (escalate|acknowledge|false-positive) - 'alert_information': self.alert_dict - } + "category": "user_feedback", + "details": { + "action": self.alert_action, # (escalate|acknowledge|false-positive) + "alert_information": self.alert_dict, + }, } return message @@ -60,22 +60,23 @@ def send(self): response = self.sns.publish( TopicArn=self.sns_topic_arn, Message=json.dumps(message), - Subject='sso-dashboard-user-feedback' + Subject="sso-dashboard-user-feedback", ) return response class Alert(object): """Primary object containing alert functions.""" + def __init__(self): - self.alert_table_name = 'sso-dashboard-alert' + self.alert_table_name = "sso-dashboard-alert" self.dynamodb = None def has_actions(self, alert_dict): """Let's the view know if it should render actions for the alert.""" # Whitelist the firefox out of date alert. It should not get buttons. - if self.alert_dict.get('alert_code') is not '63f675d8896f4fb2b3caa204c8c2761e': + if self.alert_dict.get("alert_code") is not "63f675d8896f4fb2b3caa204c8c2761e": return True else: return False @@ -84,14 +85,14 @@ def has_escalation(self, alert_dict): """Let's the view know if it should render actions for the alert.""" # Whitelist the firefox out of date alert. It should not get buttons. - if self.alert_dict.get('alert_code') is not '63f675d8896f4fb2b3caa204c8c2761e': + if self.alert_dict.get("alert_code") is not "63f675d8896f4fb2b3caa204c8c2761e": return True else: return False def connect_dynamodb(self): if self.dynamodb is None: - dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + dynamodb = boto3.resource("dynamodb", region_name="us-west-2") table = dynamodb.Table(self.alert_table_name) self.dynamodb = table @@ -106,14 +107,21 @@ def find_or_create_by(self, alert_dict, user_id): current_alerts = self.find(user_id) # If the alert is duplicate false do not create another instance of it. - for alert in current_alerts.get('visible_alerts'): + for alert in current_alerts.get("visible_alerts"): try: - if alert.get('alert_code') == alert_dict.get('alert_code') and alert_dict.get('duplicate') is False: + if ( + alert.get("alert_code") == alert_dict.get("alert_code") + and alert_dict.get("duplicate") is False + ): return None else: continue except AttributeError as e: - logger.error('Bad data in alerts table for user: {}, exception was {}'.format(user_id, e)) + logger.error( + "Bad data in alerts table for user: {}, exception was {}".format( + user_id, e + ) + ) # Else create another alert. return self.create(alert_dict) @@ -126,10 +134,8 @@ def create(self, alert_dict): """ self.connect_dynamodb() - alert_dict['alert_id'] = self._create_alert_id() - response = self.dynamodb.put_item( - Item=alert_dict - ) + alert_dict["alert_id"] = self._create_alert_id() + response = self.dynamodb.put_item(Item=alert_dict) return response @@ -142,10 +148,7 @@ def destroy(self, alert_id, user_id): self.connect_dynamodb() response = self.dynamodb.delete_item( - Key={ - 'alert_id': alert_id, - 'user_id': user_id - } + Key={"alert_id": alert_id, "user_id": user_id} ) return response @@ -159,10 +162,8 @@ def update(self, alert_id, alert_dict): """ self.connect_dynamodb() - alert_dict['alert_id'] = alert_id - response = self.dynamodb.put_item( - Item=alert_dict - ) + alert_dict["alert_id"] = alert_id + response = self.dynamodb.put_item(Item=alert_dict) return response @@ -177,24 +178,26 @@ def find(self, user_id): self.connect_dynamodb() response = self.dynamodb.query( - IndexName='user_id-index', - Select='ALL_ATTRIBUTES', - KeyConditionExpression=Key('user_id').eq(user_id) + IndexName="user_id-index", + Select="ALL_ATTRIBUTES", + KeyConditionExpression=Key("user_id").eq(user_id), ) - alerts = response.get('Items', []) + alerts = response.get("Items", []) if response: - while 'LastEvaluatedKey' in response: + while "LastEvaluatedKey" in response: response = self.dynamodb.query( - IndexName='user_id-index', - Select='ALL_ATTRIBUTES', - KeyConditionExpression=Key('user_id').eq(user_id), - ExclusiveStartKey=response['LastEvaluatedKey'] + IndexName="user_id-index", + Select="ALL_ATTRIBUTES", + KeyConditionExpression=Key("user_id").eq(user_id), + ExclusiveStartKey=response["LastEvaluatedKey"], ) - alerts.extend(response['Items']) + alerts.extend(response["Items"]) except Exception as e: - logger.error('Could not load alerts for user: {} due to: {}.'.format(user_id, e)) + logger.error( + "Could not load alerts for user: {} due to: {}.".format(user_id, e) + ) alerts = [] inactive_alerts = [] @@ -204,29 +207,29 @@ def find(self, user_id): for alert in alerts: if self._alert_is_expired(alert): - self.destroy(alert.get('alert_id'), user_id) - elif alert.get('state', '') == 'acknowledge': + self.destroy(alert.get("alert_id"), user_id) + elif alert.get("state", "") == "acknowledge": inactive_alerts.append(alert) - elif alert.get('helpfulness', '') != '': + elif alert.get("helpfulness", "") != "": ranked_alerts.append(alert) visible_alerts.append(alert) - elif alert.get('state', '') == 'escalate': + elif alert.get("state", "") == "escalate": escalations.append(alert) visible_alerts.append(alert) else: visible_alerts.append(alert) return { - 'visible_alerts': visible_alerts, - 'ranked_alerts': ranked_alerts, - 'escalations': escalations, - 'inactive_alerts': inactive_alerts + "visible_alerts": visible_alerts, + "ranked_alerts": ranked_alerts, + "escalations": escalations, + "inactive_alerts": inactive_alerts, } def _alert_is_expired(self, alert): now = datetime.datetime.today() threshold = now - datetime.timedelta(days=7) - alert_time = datetime.datetime.strptime(alert.get('date'), '%Y-%m-%d') + alert_time = datetime.datetime.strptime(alert.get("date"), "%Y-%m-%d") if alert_time < threshold: return True @@ -243,19 +246,23 @@ def to_summary(self, alert_dict): medium_count = 0 low_count = 0 - for alert in alert_dict.get('visible_alerts', []): - if alert.get('risk') == 'high': + for alert in alert_dict.get("visible_alerts", []): + if alert.get("risk") == "high": high_count = high_count + 1 - if alert.get('risk') == 'maximum': + if alert.get("risk") == "maximum": maximum_count = maximum_count + 1 - if alert.get('risk') == 'medium': + if alert.get("risk") == "medium": medium_count = medium_count + 1 - if alert.get('risk') == 'low': + if alert.get("risk") == "low": low_count = low_count + 1 return { - 'alerts': - {'maximum': maximum_count, 'high': high_count, 'medium': medium_count, 'low': low_count} + "alerts": { + "maximum": maximum_count, + "high": high_count, + "medium": medium_count, + "low": low_count, + } } def find_by_id(self, alert_id): @@ -267,11 +274,11 @@ def find_by_id(self, alert_id): self.connect_dynamodb() response = self.dynamodb.query( - KeyConditionExpression=Key('alert_id').eq(alert_id) + KeyConditionExpression=Key("alert_id").eq(alert_id) ) - if response.get('Items'): - return response.get('Items')[0] + if response.get("Items"): + return response.get("Items")[0] return {} @@ -300,56 +307,60 @@ def run(self): def alert_firefox_out_of_date(self): if self._firefox_out_of_date(): alert_dict = { - 'alert_code': '63f675d8896f4fb2b3caa204c8c2761e', - 'user_id': self.userinfo.get('user_id'), - 'risk': 'medium', - 'summary': 'Your version of Firefox is older than the current stable release.', - 'description': 'Running the latest version of your browser is key to keeping your ' - 'computer secure and your private data private. Older browsers may ' - 'have known security vulnerabilities that attackers can exploit to ' - 'steal your data or load malware, which can put you and Mozilla at risk. ', - 'date': str(datetime.date.today()), - 'url': 'https://www.mozilla.org/firefox/', - 'url_title': 'Download', - 'duplicate': False + "alert_code": "63f675d8896f4fb2b3caa204c8c2761e", + "user_id": self.userinfo.get("user_id"), + "risk": "medium", + "summary": "Your version of Firefox is older than the current stable release.", + "description": "Running the latest version of your browser is key to keeping your " + "computer secure and your private data private. Older browsers may " + "have known security vulnerabilities that attackers can exploit to " + "steal your data or load malware, which can put you and Mozilla at risk. ", + "date": str(datetime.date.today()), + "url": "https://www.mozilla.org/firefox/", + "url_title": "Download", + "duplicate": False, } - self.alert.find_or_create_by(alert_dict=alert_dict, user_id=self.userinfo.get('user_id')) + self.alert.find_or_create_by( + alert_dict=alert_dict, user_id=self.userinfo.get("user_id") + ) else: # Clear any active alerts for firefox out of date. - alerts = self.alert.find(self.userinfo.get('user_id')) - for alert in alerts.get('visible_alerts'): - if alert.get('alert_code') == '63f675d8896f4fb2b3caa204c8c2761e': + alerts = self.alert.find(self.userinfo.get("user_id")) + for alert in alerts.get("visible_alerts"): + if alert.get("alert_code") == "63f675d8896f4fb2b3caa204c8c2761e": self.alert.destroy( - alert_id=alert.get('alert_id'), user_id=alert.get('user_id') + alert_id=alert.get("alert_id"), user_id=alert.get("user_id") ) def _firefox_info(self): - release_json = requests.get('https://product-details.mozilla.org/1.0/firefox_versions.json') + release_json = requests.get( + "https://product-details.mozilla.org/1.0/firefox_versions.json" + ) if release_json.status_code == 200: return release_json.json() else: return None def _user_firefox_version(self): - agent = self.request.headers.get('User-Agent') - if agent.find('Firefox') != -1: - version = agent.split('Firefox/')[1] + agent = self.request.headers.get("User-Agent") + if agent.find("Firefox") != -1: + version = agent.split("Firefox/")[1] else: version = None return version def _version_to_dictionary(self, version_number): - version_number_list = version_number.split('.') + version_number_list = version_number.split(".") version_dict = { - 'major_version': version_number_list[0], - 'minor_version': version_number_list[1] + "major_version": version_number_list[0], + "minor_version": version_number_list[1], } if len(version_number_list) == 3: - version_dict['dot_version'] = version_number_list[2] + version_dict["dot_version"] = version_number_list[2] else: - version_dict['dot_version'] = None + version_dict["dot_version"] = None return version_dict @@ -357,16 +368,18 @@ def _firefox_out_of_date(self): ff_info = self._firefox_info() if self._user_firefox_version() is not None and ff_info is not None: u_version = self._version_to_dictionary(self._user_firefox_version()) - f_version = self._version_to_dictionary(ff_info.get('FIREFOX_ESR')) + f_version = self._version_to_dictionary(ff_info.get("FIREFOX_ESR")) - if u_version.get('major_version') < f_version.get('major_version'): + if u_version.get("major_version") < f_version.get("major_version"): return True - elif u_version.get('major_version') == f_version.get('major_version'): - if u_version.get('minor_version') < f_version.get('minor_version'): + elif u_version.get("major_version") == f_version.get("major_version"): + if u_version.get("minor_version") < f_version.get("minor_version"): return True - elif u_version.get('minor_version') == f_version.get('minor_version') \ - and u_version.get('dot_version') is not None: - if u_version.get('dot_version') < f_version.get('dot_version'): + elif ( + u_version.get("minor_version") == f_version.get("minor_version") + and u_version.get("dot_version") is not None + ): + if u_version.get("dot_version") < f_version.get("dot_version"): return True else: return False @@ -376,6 +389,7 @@ def _firefox_out_of_date(self): class FakeAlert(object): """Class only fires in development mode. Adds alerts to a given user for testing only.""" + def __init__(self, user_id): self.user_id = user_id self.alert = Alert() @@ -386,18 +400,18 @@ def create_fake_alerts(self): def _create_fake_browser_alert(self): alert_dict = { - 'alert_code': 'a63f675d8896f4fb2b3caa204c8c2761e', - 'user_id': self.user_id, - 'risk': 'medium', - 'summary': 'Your version of Firefox is older than the current stable release.', - 'description': 'Running the latest version of your browser is key to keeping your ' - 'computer secure and your private data private. Older browsers may ' - 'have known security vulnerabilities that attackers can exploit to ' - 'steal your data or load malware, which can put you and Mozilla at risk. ', - 'date': str(fake.date(pattern="%Y-%m-%d", end_datetime=None)), - 'url': 'https://www.mozilla.org/firefox/', - 'url_title': 'Download', - 'duplicate': False + "alert_code": "a63f675d8896f4fb2b3caa204c8c2761e", + "user_id": self.user_id, + "risk": "medium", + "summary": "Your version of Firefox is older than the current stable release.", + "description": "Running the latest version of your browser is key to keeping your " + "computer secure and your private data private. Older browsers may " + "have known security vulnerabilities that attackers can exploit to " + "steal your data or load malware, which can put you and Mozilla at risk. ", + "date": str(fake.date(pattern="%Y-%m-%d", end_datetime=None)), + "url": "https://www.mozilla.org/firefox/", + "url_title": "Download", + "duplicate": False, } self.alert.find_or_create_by(alert_dict=alert_dict, user_id=self.user_id) @@ -410,46 +424,38 @@ def _create_fake_geolocation_alert(self): fake_ip = fake.ipv4() original_alert_dict = { - 'category': 'geomodel', - 'details': { - 'category': 'NEWCOUNTRY', - 'prev_locality_details': { - 'city': prev_fake_state, - 'country': prev_fake_country - }, - 'locality_details': { - 'city': fake_state, - 'country': fake_country + "category": "geomodel", + "details": { + "category": "NEWCOUNTRY", + "prev_locality_details": { + "city": prev_fake_state, + "country": prev_fake_country, }, - 'principal': fake_email, - 'source_ip': fake_ip + "locality_details": {"city": fake_state, "country": fake_country}, + "principal": fake_email, + "source_ip": fake_ip, }, - 'severity': 'NOTICE', - 'summary': '{} NEWCOUNTRY {}, {} access from {}'.format( - fake_email, - fake_state, - fake_country, - fake_ip + "severity": "NOTICE", + "summary": "{} NEWCOUNTRY {}, {} access from {}".format( + fake_email, fake_state, fake_country, fake_ip ), - 'tags': ['geomodel'], - 'url': 'https://www.mozilla.org/alert', - 'utctimestamp': '{}+00:00'.format(fake.iso8601()) + "tags": ["geomodel"], + "url": "https://www.mozilla.org/alert", + "utctimestamp": "{}+00:00".format(fake.iso8601()), } alert_dict = { - 'alert_code': '416c65727447656f6d6f64656c', - 'user_id': self.user_id, - 'risk': 'high', - 'summary': 'Did you recently login from {}, {} ({})?'.format( - fake_state, - fake_country, - fake_ip + "alert_code": "416c65727447656f6d6f64656c", + "user_id": self.user_id, + "risk": "high", + "summary": "Did you recently login from {}, {} ({})?".format( + fake_state, fake_country, fake_ip ), - 'alert_str_json': json.dumps(original_alert_dict), - 'description': 'This alert is created based on geo ip information about the last login of a user.', - 'date': str(fake.date(pattern="%Y-%m-%d", end_datetime=None)), - 'url': 'https://www.mozilla.org', - 'url_title': 'Get Help', - 'duplicate': False + "alert_str_json": json.dumps(original_alert_dict), + "description": "This alert is created based on geo ip information about the last login of a user.", + "date": str(fake.date(pattern="%Y-%m-%d", end_datetime=None)), + "url": "https://www.mozilla.org", + "url_title": "Get Help", + "duplicate": False, } self.alert.find_or_create_by(alert_dict=alert_dict, user_id=self.user_id) diff --git a/dashboard/models/tile.py b/dashboard/models/tile.py index 57ad8ad1..c86640e9 100644 --- a/dashboard/models/tile.py +++ b/dashboard/models/tile.py @@ -12,6 +12,7 @@ class S3Transfer(object): """News up and does the job if configuration specifies S3 for data transfer.""" + def __init__(self, app_config): self.app_config = app_config self.client = None @@ -20,73 +21,59 @@ def __init__(self, app_config): def connect_s3(self): if not self.client: - self.client = boto3.client('s3') + self.client = boto3.client("s3") def is_updated(self): """Compare etag of what is in bucket to what is on disk.""" self.connect_s3() try: self.client.head_object( - Bucket=self.s3_bucket, - Key='apps.yml', - IfMatch=self._etag() + Bucket=self.s3_bucket, Key="apps.yml", IfMatch=self._etag() ) return False except Exception as e: - logger.error('Etags do not match as a result of {error}'.format(error=e)) + logger.error("Etags do not match as a result of {error}".format(error=e)) return True def last_update(self): this_dir = os.path.dirname(__file__) - filename = os.path.join(this_dir, '../data/apps.yml') + filename = os.path.join(this_dir, "../data/apps.yml") return abs(os.path.getmtime(filename)) def _update_etag(self, etag): this_dir = os.path.dirname(__file__) - filename = os.path.join(this_dir, '../data/{name}').format( - name='apps.yml-etag' - ) - c = open(filename, 'w+') + filename = os.path.join(this_dir, "../data/{name}").format(name="apps.yml-etag") + c = open(filename, "w+") c.write(etag) c.close() def _etag(self): this_dir = os.path.dirname(__file__) - filename = os.path.join(this_dir, '../data/{name}').format( - name='apps.yml-etag' - ) + filename = os.path.join(this_dir, "../data/{name}").format(name="apps.yml-etag") try: - return open(filename, 'r').read() + return open(filename, "r").read() except Exception as e: - logger.error('Error fetching etag: {e}'.format(e=e)) + logger.error("Error fetching etag: {e}".format(e=e)) return "12345678" def _get_config(self): self.connect_s3() - apps_yml = self.client.get_object( - Bucket=self.s3_bucket, - Key='apps.yml' - ) + apps_yml = self.client.get_object(Bucket=self.s3_bucket, Key="apps.yml") - response = self.client.head_object( - Bucket=self.s3_bucket, - Key='apps.yml' - ) + response = self.client.head_object(Bucket=self.s3_bucket, Key="apps.yml") - self.apps_yml = apps_yml.get('Body').read().decode('utf-8') + self.apps_yml = apps_yml.get("Body").read().decode("utf-8") this_dir = os.path.dirname(__file__) - filename = os.path.join(this_dir, '../data/{name}').format( - name='apps.yml' - ) - c = open(filename, 'w+') + filename = os.path.join(this_dir, "../data/{name}").format(name="apps.yml") + c = open(filename, "w+") c.write(self.apps_yml) c.close() - self._update_etag(response.get('ETag')) + self._update_etag(response.get("ETag")) def _touch(self): - fname = 'dashboard/app.py' - fhandle = open(fname, 'a') + fname = "dashboard/app.py" + fhandle = open(fname, "a") try: os.utime(fname, None) finally: @@ -96,7 +83,7 @@ def sync_config(self): self.connect_s3() try: if self.is_updated(): - logger.info('Config file is updated fetching new config.') + logger.info("Config file is updated fetching new config.") self._get_config() # Touch app.py to force a gunicorn reload self._touch() @@ -107,31 +94,26 @@ def sync_config(self): # Do nothing except Exception as e: print(e) - logger.error( - 'Problem fetching config file {error}'.format( - error=e - ) - ) + logger.error("Problem fetching config file {error}".format(error=e)) class DynamoTransfer(object): """News up and does the job if configuration specifies that dynamo should be used for app information.""" + def __init__(self, app_config): - self.configuration_table_name = 'sso-dashboard-apps' + self.configuration_table_name = "sso-dashboard-apps" self.dynamodb = None def connect_dynamodb(self): if self.dynamodb is None: - dynamodb = boto3.resource('dynamodb', region_name='us-west-2') + dynamodb = boto3.resource("dynamodb", region_name="us-west-2") table = dynamodb.Table(self.configuration_table_name) self.dynamodb = table def sync_config(self): self.connect_dynamodb() - results = self.dynamodb.scan( - FilterExpression=Attr('name').exists() - ) - return results.get('Items', None) + results = self.dynamodb.scan(FilterExpression=Attr("name").exists()) + return results.get("Items", None) class Tile(object): diff --git a/dashboard/models/user.py b/dashboard/models/user.py index 38a40a76..e86ef959 100644 --- a/dashboard/models/user.py +++ b/dashboard/models/user.py @@ -12,38 +12,40 @@ class User(object): def __init__(self, session, app_config): """Constructor takes user session.""" - self.id_token = session.get('id_token', None) + self.id_token = session.get("id_token", None) self.app_config = app_config - self.userinfo = session.get('userinfo') - self.idvault_info = session.get('idvault_userinfo') + self.userinfo = session.get("userinfo") + self.idvault_info = session.get("idvault_userinfo") def email(self): try: - email = self.userinfo.get('email') + email = self.userinfo.get("email") except Exception as e: logger.error( - 'The email attribute does no exists falling back to OIDC Conformant: {}.'.format(e) + "The email attribute does no exists falling back to OIDC Conformant: {}.".format( + e + ) ) - email = self.userinfo.get('https://sso.mozilla.com/claim/emails')[0]['emails'] + email = self.userinfo.get("https://sso.mozilla.com/claim/emails")[0][ + "emails" + ] return email def apps(self, app_list): """Return a list of the apps a user is allowed to see in dashboard.""" - authorized_apps = { - 'apps': [] - } + authorized_apps = {"apps": []} - for app in app_list['apps']: + for app in app_list["apps"]: if self._is_valid_yaml(app): if self._is_authorized(app): - authorized_apps['apps'].append(app) + authorized_apps["apps"].append(app) - return authorized_apps.get('apps', []) + return authorized_apps.get("apps", []) @property def avatar(self): if self.idvault_info: - picture_url = self.idvault_info.get('picture') + picture_url = self.idvault_info.get("picture") else: picture_url = None @@ -51,19 +53,24 @@ def avatar(self): def group_membership(self): """Return list of group membership if user is asserted from ldap.""" - if self.userinfo.get('https://sso.mozilla.com/claim/groups', []) != []: - group_count = len(self.userinfo.get('https://sso.mozilla.com/claim/groups', [])) + if self.userinfo.get("https://sso.mozilla.com/claim/groups", []) != []: + group_count = len( + self.userinfo.get("https://sso.mozilla.com/claim/groups", []) + ) else: - if self.userinfo.get('groups'): - group_count = len(self.userinfo.get('groups', [])) + if self.userinfo.get("groups"): + group_count = len(self.userinfo.get("groups", [])) else: group_count = 0 - if 'https://sso.mozilla.com/claim/groups' in self.userinfo.keys() and group_count > 0: - return self.userinfo['https://sso.mozilla.com/claim/groups'] + if ( + "https://sso.mozilla.com/claim/groups" in self.userinfo.keys() + and group_count > 0 + ): + return self.userinfo["https://sso.mozilla.com/claim/groups"] - if 'groups' in self.userinfo.keys() and group_count > 0: - return self.userinfo['groups'] + if "groups" in self.userinfo.keys() and group_count > 0: + return self.userinfo["groups"] else: # This could mean a user is authing with non-ldap return [] @@ -72,7 +79,7 @@ def group_membership(self): def first_name(self): """Return user first_name.""" try: - return self.idvault_info.get('firstName', "") + return self.idvault_info.get("firstName", "") except KeyError: return "" except AttributeError: @@ -82,7 +89,7 @@ def first_name(self): def last_name(self): """Return user last_name.""" try: - return self.idvault_info.get('lastName', "") + return self.idvault_info.get("lastName", "") except KeyError: return "" except AttributeError: @@ -90,52 +97,57 @@ def last_name(self): def user_identifiers(self): """Construct a list of potential user identifiers to match on.""" - return [self.email(), self.userinfo['sub']] + return [self.email(), self.userinfo["sub"]] @property def alerts(self): - alerts = alert.Alert().find(user_id=self.userinfo['sub']) + alerts = alert.Alert().find(user_id=self.userinfo["sub"]) return alerts def take_alert_action(self, alert_id, alert_action, helpfulness=None): a = alert.Alert() alert_dict = a.find_by_id(alert_id) - alert_dict['last_update'] = int(time.time()) + alert_dict["last_update"] = int(time.time()) - if alert_action == 'acknowledge': - logger.info('An alert was acked for {uid}.'.format(uid=self.userinfo['sub'])) - alert_dict['state'] = alert_action + if alert_action == "acknowledge": + logger.info( + "An alert was acked for {uid}.".format(uid=self.userinfo["sub"]) + ) + alert_dict["state"] = alert_action res = a.update(alert_id=alert_id, alert_dict=alert_dict) - elif alert_action == 'escalate': - logger.info('An alert was escalated for {uid}.'.format(uid=self.userinfo['sub'])) - alert_dict['state'] = alert_action + elif alert_action == "escalate": + logger.info( + "An alert was escalated for {uid}.".format(uid=self.userinfo["sub"]) + ) + alert_dict["state"] = alert_action res = a.update(alert_id=alert_id, alert_dict=alert_dict) - elif alert_action == 'indicate-helpfulness': - logger.info('Alert helpfulness was set for {uid}.'.format(uid=self.userinfo['sub'])) - alert_dict['helpfulness'] = helpfulness + elif alert_action == "indicate-helpfulness": + logger.info( + "Alert helpfulness was set for {uid}.".format(uid=self.userinfo["sub"]) + ) + alert_dict["helpfulness"] = helpfulness res = a.update(alert_id=alert_id, alert_dict=alert_dict) else: - res = {'ResponseMetadata': {'HTTPStatusCode': 200}} + res = {"ResponseMetadata": {"HTTPStatusCode": 200}} - m = alert.Feedback( - alert_dict=alert_dict, - alert_action=alert_action - ) + m = alert.Feedback(alert_dict=alert_dict, alert_action=alert_action) m.send() return res def _is_authorized(self, app): - if app['application']['display'] == 'False': + if app["application"]["display"] == "False": return False - elif not app['application']['display']: + elif not app["application"]["display"]: return False - elif 'everyone' in app['application']['authorized_groups']: + elif "everyone" in app["application"]["authorized_groups"]: return True - elif set(app['application']['authorized_groups']) & set(self.group_membership()): + elif set(app["application"]["authorized_groups"]) & set( + self.group_membership() + ): return True - elif set(app['application']['authorized_users']) & set(self.user_identifiers()): + elif set(app["application"]["authorized_users"]) & set(self.user_identifiers()): return True else: return False @@ -143,9 +155,9 @@ def _is_authorized(self, app): def _is_valid_yaml(self, app): """If an app doesn't have the required fields skip it.""" try: - app['application']['display'] - app['application']['authorized_groups'] - app['application']['authorized_users'] + app["application"]["display"] + app["application"]["authorized_groups"] + app["application"]["authorized_users"] return True except Exception: return False @@ -160,22 +172,20 @@ def email(self): return fake.email() def apps(self, app_list): - authorized_apps = { - 'apps': [] - } + authorized_apps = {"apps": []} - for app in app_list['apps']: + for app in app_list["apps"]: if self._is_valid_yaml(app): if self._is_authorized(app): - authorized_apps['apps'].append(app) - return authorized_apps.get('apps', []) + authorized_apps["apps"].append(app) + return authorized_apps.get("apps", []) @property def avatar(self): - return self.profile['avatar'] + return self.profile["avatar"] def group_membership(self): - return [] + return [] @property def first_name(self): @@ -188,39 +198,45 @@ def last_name(self): @property def alerts(self): return { - 'visible_alerts': [ + "visible_alerts": [ { - 'alert_code': '416c65727447656f6d6f64656c', - 'alert_id': '4053bd6a9e9a6bb03095f479c0fab2', - 'date': '2017-10-27', - 'description': 'This alert is created based on geo ip information about the last login of a user.', - 'duplicate': True, - 'risk': 'medium', - 'summary': 'Did you recently login from {}, {}?'.format(fake.city(), fake.country()), - 'url': 'https://mana.mozilla.org/wiki/display/SECURITY/Alert%3A+Change+in+Country', - 'url_title': 'Get Help', - 'user_id': 'ad|Mozilla-LDAP|fakeuser', - 'details': { - 'Timestamp': fake.date_time_this_year().strftime('%A, %B %d %Y %H:%M UTC'), - 'New Location': '{}, {}'.format(fake.city(), fake.country()), - 'New IP': '{} ({})'.format(fake.ipv4(), fake.company()), - 'Previous Location': '{}, {}'.format(fake.city(), fake.country()) - } + "alert_code": "416c65727447656f6d6f64656c", + "alert_id": "4053bd6a9e9a6bb03095f479c0fab2", + "date": "2017-10-27", + "description": "This alert is created based on geo ip information about the last login of a user.", + "duplicate": True, + "risk": "medium", + "summary": "Did you recently login from {}, {}?".format( + fake.city(), fake.country() + ), + "url": "https://mana.mozilla.org/wiki/display/SECURITY/Alert%3A+Change+in+Country", + "url_title": "Get Help", + "user_id": "ad|Mozilla-LDAP|fakeuser", + "details": { + "Timestamp": fake.date_time_this_year().strftime( + "%A, %B %d %Y %H:%M UTC" + ), + "New Location": "{}, {}".format(fake.city(), fake.country()), + "New IP": "{} ({})".format(fake.ipv4(), fake.company()), + "Previous Location": "{}, {}".format( + fake.city(), fake.country() + ), + }, }, { - 'alert_code': '63f675d8896f4fb2b3caa204c8c2761e', - 'user_id': 'ad|Mozilla-LDAP|fakeuser', - 'risk': 'medium', - 'summary': 'Your version of Firefox is older than the current stable release.', - 'description': 'Running the latest version of your browser is key to keeping your ' - 'computer secure and your private data private. Older browsers may ' - 'have known security vulnerabilities that attackers can exploit to ' - 'steal your data or load malware, which can put you and Mozilla at risk. ', - 'date': '2017-10-27', - 'url': 'https://www.mozilla.org/firefox/', - 'url_title': 'Download', - 'duplicate': False - } + "alert_code": "63f675d8896f4fb2b3caa204c8c2761e", + "user_id": "ad|Mozilla-LDAP|fakeuser", + "risk": "medium", + "summary": "Your version of Firefox is older than the current stable release.", + "description": "Running the latest version of your browser is key to keeping your " + "computer secure and your private data private. Older browsers may " + "have known security vulnerabilities that attackers can exploit to " + "steal your data or load malware, which can put you and Mozilla at risk. ", + "date": "2017-10-27", + "url": "https://www.mozilla.org/firefox/", + "url_title": "Download", + "duplicate": False, + }, ] } @@ -228,7 +244,7 @@ def _is_valid_yaml(self, app): return True def _is_authorized(self, app): - if 'everyone' in app['application']['authorized_groups']: + if "everyone" in app["application"]["authorized_groups"]: return True else: return False diff --git a/dashboard/op/yaml_loader.py b/dashboard/op/yaml_loader.py index 11ef0ccf..c3987a6d 100644 --- a/dashboard/op/yaml_loader.py +++ b/dashboard/op/yaml_loader.py @@ -28,14 +28,12 @@ def _load_data(self): return stream def _render_data(self): - for app in self.apps['apps']: - app['application']['alt_text'] = app['application']['name'] - app['application']['name'] = self._truncate( - app['application']['name'] - ) + for app in self.apps["apps"]: + app["application"]["alt_text"] = app["application"]["name"] + app["application"]["name"] = self._truncate(app["application"]["name"]) def _alphabetize(self): - self.apps['apps'].sort(key=lambda a: a['application']['name'].lower()) + self.apps["apps"].sort(key=lambda a: a["application"]["name"].lower()) def _find(self, name, path): for root, dirs, files in os.walk(path): @@ -44,27 +42,21 @@ def _find(self, name, path): def _has_vanity(self, app): try: - app['application']['vanity_url'] + app["application"]["vanity_url"] return True except Exception: return False def _truncate(self, app_name): """If name is longer than allowed 18 chars truncate the name.""" - app_name = ( - app_name[:16] + '..' - ) if len(app_name) > 18 else app_name + app_name = (app_name[:16] + "..") if len(app_name) > 18 else app_name return app_name def vanity_urls(self): redirects = [] - for app in self.apps['apps']: + for app in self.apps["apps"]: if self._has_vanity(app): - for redirect in app['application']['vanity_url']: - redirects.append( - { - redirect: app['application']['url'] - } - ) + for redirect in app["application"]["vanity_url"]: + redirects.append({redirect: app["application"]["url"]}) return redirects diff --git a/dashboard/person.py b/dashboard/person.py index 754c8dba..b666d2a0 100644 --- a/dashboard/person.py +++ b/dashboard/person.py @@ -7,6 +7,7 @@ class API(object): """Retrieve data from person api as needed. Will eventually replace Mozillians API""" + def __init__(self): """ :param session: the flask session to update with userinfo @@ -18,35 +19,35 @@ def get_bearer(self): conn = http.client.HTTPSConnection(self.config.OIDC_DOMAIN) payload = json.dumps( { - 'client_id': self.config.OIDC_CLIENT_ID, - 'client_secret': self.config.OIDC_CLIENT_SECRET, - 'audience': 'https://{}'.format(self._get_url()), - 'grant_type': 'client_credentials' + "client_id": self.config.OIDC_CLIENT_ID, + "client_secret": self.config.OIDC_CLIENT_SECRET, + "audience": "https://{}".format(self._get_url()), + "grant_type": "client_credentials", } ) - headers = {'content-type': "application/json"} + headers = {"content-type": "application/json"} conn.request("POST", "/oauth/token", payload, headers) res = conn.getresponse() data = res.read() - return json.loads(data.decode('utf-8')) + return json.loads(data.decode("utf-8")) def get_userinfo(self, auth_zero_id): user_id = urllib.parse.quote(auth_zero_id) conn = http.client.HTTPSConnection("{}".format(self.person_api_url)) - token = "Bearer {}".format(self.get_bearer().get('access_token')) + token = "Bearer {}".format(self.get_bearer().get("access_token")) - headers = {'authorization': token} + headers = {"authorization": token} conn.request("GET", "/v1/profile/{}".format(user_id), headers=headers) res = conn.getresponse() data = res.read() - return json.loads(json.loads(data.decode('utf-8')).get('body')) + return json.loads(json.loads(data.decode("utf-8")).get("body")) def _get_url(self): - if self.config.OIDC_DOMAIN == 'auth.mozilla.auth0.com': - return 'person-api.sso.mozilla.com' + if self.config.OIDC_DOMAIN == "auth.mozilla.auth0.com": + return "person-api.sso.mozilla.com" else: - return 'person-api.sso.allizom.org' + return "person-api.sso.allizom.org" diff --git a/dashboard/vanity.py b/dashboard/vanity.py index a413dbbb..65cdc175 100644 --- a/dashboard/vanity.py +++ b/dashboard/vanity.py @@ -15,20 +15,24 @@ def setup(self): for vanity_url in url.keys(): try: self.app.add_url_rule(vanity_url, vanity_url, self.redirect_url) - self.app.add_url_rule(vanity_url + "/", vanity_url + "/", self.redirect_url) + self.app.add_url_rule( + vanity_url + "/", vanity_url + "/", self.redirect_url + ) except Exception as e: print(e) def redirect_url(self): - vanity_url = '/' + request.url.split('/')[3] + vanity_url = "/" + request.url.split("/")[3] for match in self.url_list: for key in match.keys(): if key == vanity_url: resp = make_response(redirect(match[vanity_url], code=301)) - resp.headers['Cache-Control'] = ('no-store, no-cache, must-revalidate, ' - 'post-check=0, pre-check=0, max-age=0') - resp.headers['Expires'] = '-1' + resp.headers["Cache-Control"] = ( + "no-store, no-cache, must-revalidate, " + "post-check=0, pre-check=0, max-age=0" + ) + resp.headers["Expires"] = "-1" return resp else: pass