Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add management command to dump auth config data to a file #2082

Merged
merged 10 commits into from
Mar 1, 2024
140 changes: 140 additions & 0 deletions galaxy_ng/app/management/commands/dump-auth-config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import json
import os
import sys
from django.core.management.base import BaseCommand
from django.conf import settings


class Command(BaseCommand):
KEYCLOAK_KEYS = [
"SOCIAL_AUTH_KEYCLOAK_ACCESS_TOKEN_URL",
"SOCIAL_AUTH_KEYCLOAK_AUTHORIZATION_URL",
"SOCIAL_AUTH_KEYCLOAK_KEY",
"SOCIAL_AUTH_KEYCLOAK_PUBLIC_KEY",
"SOCIAL_AUTH_KEYCLOAK_SECRET",
]

LDAP_KEYS = [
"AUTH_LDAP_SERVER_URI",
"AUTH_LDAP_BIND_DN",
"AUTH_LDAP_BIND_PASSWORD",
"AUTH_LDAP_USER_SEARCH_BASE_DN",
"AUTH_LDAP_USER_SEARCH_SCOPE",
"AUTH_LDAP_USER_SEARCH_FILTER",
"AUTH_LDAP_GROUP_SEARCH_BASE_DN",
"AUTH_LDAP_GROUP_SEARCH_SCOPE",
"AUTH_LDAP_GROUP_SEARCH_FILTER",
]

help = "Dump auth config data from database to a JSON file"

def add_arguments(self, parser):
parser.add_argument(
"output_file",
nargs="?",
type=str,
default=None,
help="Output JSON file path",
)

def is_enabled(self, keys):
values = []
for key in keys:
values.append(settings.get(key, default=None))
return all(values)

def post_config_ldap(self):
post_config = {}
# Other required platform params
post_config["USER_ATTR_MAP"] = settings.get("AUTH_LDAP_USER_ATTR_MAP")
post_config["USER_DN_TEMPLATE"] = settings.get("AUTH_LDAP_USER_DN_TEMPLATE")
post_config["GROUP_TYPE_PARAMS"] = settings.get("AUTH_LDAP_GROUP_TYPE_PARAMS")
post_config["CONNECTION_OPTIONS"] = settings.get("AUTH_LDAP_CONNECTION_OPTIONS")
post_config["START_TLS"] = settings.get("AUTH_LDAP_START_TLS")

# Configure USER_SEARCH and GROUP_SEARCH
AUTH_LDAP_USER_SEARCH_BASE_DN = settings.get("AUTH_LDAP_USER_SEARCH_BASE_DN", default=None)
AUTH_LDAP_USER_SEARCH_SCOPE = settings.get("AUTH_LDAP_USER_SEARCH_SCOPE", default=None)
AUTH_LDAP_USER_SEARCH_FILTER = settings.get("AUTH_LDAP_USER_SEARCH_FILTER", default=None)
AUTH_LDAP_GROUP_SEARCH_BASE_DN = settings.get(
"AUTH_LDAP_GROUP_SEARCH_BASE_DN",
default=None
)
AUTH_LDAP_GROUP_SEARCH_SCOPE = settings.get("AUTH_LDAP_GROUP_SEARCH_SCOPE", default=None)
AUTH_LDAP_GROUP_SEARCH_FILTER = settings.get("AUTH_LDAP_GROUP_SEARCH_FILTER", default=None)

post_config["USER_SEARCH"] = [
AUTH_LDAP_USER_SEARCH_BASE_DN,
AUTH_LDAP_USER_SEARCH_SCOPE,
AUTH_LDAP_USER_SEARCH_FILTER,
]

post_config["GROUP_SEARCH"] = [
AUTH_LDAP_GROUP_SEARCH_BASE_DN,
AUTH_LDAP_GROUP_SEARCH_SCOPE,
AUTH_LDAP_GROUP_SEARCH_FILTER,
]

# Configure GROUP_TYPE
post_config["GROUP_TYPE"] = None
AUTH_LDAP_GROUP_TYPE = settings.get("AUTH_LDAP_GROUP_TYPE")
if AUTH_LDAP_GROUP_TYPE:
post_config["GROUP_TYPE"] = type(AUTH_LDAP_GROUP_TYPE).__name__

return post_config

def format_config_data(self, type, keys, prefix):
config = {
"type": f"galaxy.authentication.authenticator_plugins.{type}",
"enabled": self.is_enabled(keys),
"configuration": {},
}
for key in keys:
k = key
if prefix in key:
k = key[len(prefix):]
v = settings.get(key, default=None)
config["configuration"].update({k: v})

# handle post configuration for ldap:
if type == "ldap":
config["configuration"].update(self.post_config_ldap())

return config

def handle(self, *args, **options):
try:
data = []

# Add Keycloak auth config
data.append(
self.format_config_data(
"keycloak",
self.KEYCLOAK_KEYS,
"SOCIAL_AUTH_KEYCLOAK_"),
)

# Add LDAP auth config
data.append(self.format_config_data("ldap", self.LDAP_KEYS, "AUTH_LDAP_"))

# write to file if requested
if options["output_file"]:
# Define the path for the output JSON file
output_file = options["output_file"]

# Ensure the directory exists
os.makedirs(os.path.dirname(output_file), exist_ok=True)

# Write data to the JSON file
with open(output_file, "w") as f:
json.dump(data, f, indent=4)

self.stdout.write(
self.style.SUCCESS(f"Auth config data dumped to {output_file}")
)
else:
self.stdout.write(json.dumps(data))

except Exception as e:
self.stdout.write(self.style.ERROR(f"An error occurred: {str(e)}"))
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from io import StringIO
import json
from django.core.management import call_command
from django.test import TestCase, override_settings


@override_settings(SOCIAL_AUTH_KEYCLOAK_ACCESS_TOKEN_URL="ACCESS_TOKEN_URL")
@override_settings(SOCIAL_AUTH_KEYCLOAK_AUTHORIZATION_URL="AUTHORIZATION_URL")
@override_settings(SOCIAL_AUTH_KEYCLOAK_KEY="KEY")
@override_settings(SOCIAL_AUTH_KEYCLOAK_PUBLIC_KEY="PUBLIC_KEY")
@override_settings(SOCIAL_AUTH_KEYCLOAK_SECRET="SECRET")
@override_settings(AUTH_LDAP_SERVER_URI="SERVER_URI")
@override_settings(AUTH_LDAP_BIND_DN="BIND_DN")
@override_settings(AUTH_LDAP_BIND_PASSWORD="BIND_PASSWORD")
@override_settings(AUTH_LDAP_USER_DN_TEMPLATE="USER_DN_TEMPLATE")
@override_settings(AUTH_LDAP_USER_SEARCH_BASE_DN="USER_SEARCH_BASE_DN")
@override_settings(AUTH_LDAP_USER_SEARCH_SCOPE="USER_SEARCH_SCOPE")
@override_settings(AUTH_LDAP_USER_SEARCH_FILTER="USER_SEARCH_FILTER")
@override_settings(AUTH_LDAP_GROUP_SEARCH_BASE_DN="GROUP_SEARCH_BASE_DN")
@override_settings(AUTH_LDAP_GROUP_SEARCH_SCOPE="GROUP_SEARCH_SCOPE")
@override_settings(AUTH_LDAP_GROUP_SEARCH_FILTER="GROUP_SEARCH_FILTER")
@override_settings(AUTH_LDAP_GROUP_TYPE_PARAMS="GROUP_TYPE_PARAMS")
@override_settings(AUTH_LDAP_USER_ATTR_MAP={
"email": "email",
"last_name": "last_name",
"first_name": "first_name",
})
@override_settings(AUTH_LDAP_CONNECTION_OPTIONS={})
@override_settings(AUTH_LDAP_START_TLS=None)
@override_settings(AUTH_LDAP_GROUP_TYPE="string object")
class TestDumpAuthConfigCommand(TestCase):
def setUp(self):
super().setUp()
self.expected_config = [
{
"type": "galaxy.authentication.authenticator_plugins.keycloak",
"enabled": True,
"configuration": {
"ACCESS_TOKEN_URL": "ACCESS_TOKEN_URL",
"AUTHORIZATION_URL": "AUTHORIZATION_URL",
"KEY": "KEY",
"PUBLIC_KEY": "PUBLIC_KEY",
"SECRET": "SECRET"
}
},
{
"type": "galaxy.authentication.authenticator_plugins.ldap",
"enabled": True,
"configuration": {
"SERVER_URI": "SERVER_URI",
"BIND_DN": "BIND_DN",
"BIND_PASSWORD": "BIND_PASSWORD",
"USER_SEARCH_BASE_DN": "USER_SEARCH_BASE_DN",
"USER_SEARCH_SCOPE": "USER_SEARCH_SCOPE",
"USER_SEARCH_FILTER": "USER_SEARCH_FILTER",
"GROUP_SEARCH_BASE_DN": "GROUP_SEARCH_BASE_DN",
"GROUP_SEARCH_SCOPE": "GROUP_SEARCH_SCOPE",
"GROUP_SEARCH_FILTER": "GROUP_SEARCH_FILTER",
"USER_ATTR_MAP": {
"email": "email",
"last_name": "last_name",
"first_name": "first_name"
},
"USER_DN_TEMPLATE": "USER_DN_TEMPLATE",
"GROUP_TYPE_PARAMS": "GROUP_TYPE_PARAMS",
"CONNECTION_OPTIONS": {},
"START_TLS": None,
"USER_SEARCH": [
"USER_SEARCH_BASE_DN",
"USER_SEARCH_SCOPE",
"USER_SEARCH_FILTER"
],
"GROUP_SEARCH": [
"GROUP_SEARCH_BASE_DN",
"GROUP_SEARCH_SCOPE",
"GROUP_SEARCH_FILTER"
],
"GROUP_TYPE": "str"
}
}
]

def test_json_returned_from_cmd(self):
output = StringIO()
call_command("dump-auth-config", stdout=output)
assert output.getvalue().rstrip() == json.dumps(self.expected_config)
Loading