diff --git a/signbank/abstract_machine.py b/signbank/abstract_machine.py index 68fdaa7c3..9b364b919 100644 --- a/signbank/abstract_machine.py +++ b/signbank/abstract_machine.py @@ -12,7 +12,7 @@ from signbank.dictionary.update_senses_mapping import add_sense_to_revision_history from django.http import HttpResponse, HttpResponseRedirect, HttpResponseForbidden, HttpResponseBadRequest, JsonResponse from guardian.shortcuts import get_objects_for_user -from signbank.api_token import hash_token +from signbank.api_token import put_api_user_in_request def get_interface_language_api(request, user): @@ -360,40 +360,26 @@ def csv_create_gloss(request, datasetid): @csrf_exempt +@put_api_user_in_request def api_create_gloss(request, datasetid): results = dict() - auth_token_request = request.headers.get('Authorization', '') interface_language_code = request.headers.get('Accept-Language', 'en') if interface_language_code not in settings.MODELTRANSLATION_LANGUAGES: interface_language_code = 'en' activate(interface_language_code) - if auth_token_request: - auth_token = auth_token_request.split('Bearer ')[-1] - hashed_token = hash_token(auth_token) - signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() - if not signbank_token: - results['errors'] = [gettext("Your Authorization Token does not match anything.")] - return JsonResponse(results) - username = signbank_token.signbank_user.username - user = User.objects.get(username=username) - elif request.user: - user = request.user - else: - results['errors'] = [gettext("User not found in request.")] - return JsonResponse(results) dataset = Dataset.objects.filter(id=int(datasetid)).first() if not dataset: results['errors'] = [gettext("Dataset ID does not exist.")] return JsonResponse(results) - change_permit_datasets = get_objects_for_user(user, 'change_dataset', Dataset) + change_permit_datasets = get_objects_for_user(request.user, 'change_dataset', Dataset) if dataset not in change_permit_datasets: - results['errors'] = [gettext("No change permission for dataset for user ") + str(user)] + results['errors'] = [gettext("No change permission for dataset for user ") + str(request.user)] return JsonResponse(results) - if not user.has_perm('dictionary.change_gloss'): + if not request.user.has_perm('dictionary.change_gloss'): results['errors'] = [gettext("No change gloss permission.")] return JsonResponse(results) @@ -409,6 +395,6 @@ def api_create_gloss(request, datasetid): results['glossid'] = "" return JsonResponse(results) - creation_results = create_gloss(user, dataset, value_dict) + creation_results = create_gloss(request.user, dataset, value_dict) return JsonResponse(creation_results) diff --git a/signbank/api_interface.py b/signbank/api_interface.py index 1f7efa662..bbb0715d4 100644 --- a/signbank/api_interface.py +++ b/signbank/api_interface.py @@ -7,7 +7,7 @@ from signbank.dictionary.forms import * from django.utils.translation import override, gettext_lazy as _, activate from signbank.settings.server_specific import LANGUAGES, LEFT_DOUBLE_QUOTE_PATTERNS, RIGHT_DOUBLE_QUOTE_PATTERNS -from signbank.api_token import hash_token +from signbank.api_token import put_api_user_in_request from signbank.abstract_machine import get_interface_language_api from django.core.exceptions import ObjectDoesNotExist @@ -84,26 +84,14 @@ def api_fields(dataset, language_code='en', advanced=False): return api_fields_2023 +@put_api_user_in_request def get_fields_data_json(request, datasetid): - - results = dict() - auth_token_request = request.headers.get('Authorization', '') interface_language_code = request.headers.get('Accept-Language', 'en') if interface_language_code not in settings.MODELTRANSLATION_LANGUAGES: interface_language_code = 'en' activate(interface_language_code) - if auth_token_request: - auth_token = auth_token_request.split('Bearer ')[-1] - hashed_token = hash_token(auth_token) - signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() - if not signbank_token: - results['errors'] = [gettext("Your Authorization Token does not match anything.")] - return JsonResponse(results) - username = signbank_token.signbank_user.username - user = User.objects.get(username=username) - else: - user = request.user - interface_language_code = get_interface_language_api(request, user) + if request.user.is_authenticated: + interface_language_code = get_interface_language_api(request, request.user) sequence_of_digits = True for i in datasetid: @@ -120,7 +108,7 @@ def get_fields_data_json(request, datasetid): # ignore the database in the url if necessary dataset = Dataset.objects.get(id=settings.DEFAULT_DATASET_PK) - if user.has_perm('dictionary.change_gloss'): + if request.user.has_perm('dictionary.change_gloss'): api_fields_2023 = api_fields(dataset, interface_language_code, advanced=True) else: api_fields_2023 = api_fields(dataset, interface_language_code, advanced=False) @@ -130,26 +118,14 @@ def get_fields_data_json(request, datasetid): return JsonResponse(result) +@put_api_user_in_request def get_gloss_data_json(request, datasetid, glossid): - - results = dict() - auth_token_request = request.headers.get('Authorization', '') interface_language_code = request.headers.get('Accept-Language', 'en') if interface_language_code not in settings.MODELTRANSLATION_LANGUAGES: interface_language_code = 'en' activate(interface_language_code) - if auth_token_request: - auth_token = auth_token_request.split('Bearer ')[-1] - hashed_token = hash_token(auth_token) - signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() - if not signbank_token: - results['errors'] = [gettext("Your Authorization Token does not match anything.")] - return JsonResponse(results) - username = signbank_token.signbank_user.username - user = User.objects.get(username=username) - else: - user = request.user - interface_language_code = get_interface_language_api(request, user) + if request.user.is_authenticated: + interface_language_code = get_interface_language_api(request, request.user) sequence_of_digits = True for i in datasetid: @@ -162,7 +138,7 @@ def get_gloss_data_json(request, datasetid, glossid): dataset_id = int(datasetid) dataset = Dataset.objects.filter(id=dataset_id).first() - if not dataset or not user.is_authenticated: + if not dataset or not request.user.is_authenticated: # ignore the dataset in the url if necessary dataset = Dataset.objects.get(id=settings.DEFAULT_DATASET_PK) @@ -181,7 +157,7 @@ def get_gloss_data_json(request, datasetid, glossid): if not gloss: return JsonResponse({}) - if user.has_perm('dictionary.change_gloss'): + if request.user.has_perm('dictionary.change_gloss'): api_fields_2023 = api_fields(dataset, interface_language_code, advanced=True) else: api_fields_2023 = api_fields(dataset, interface_language_code, advanced=False) @@ -275,6 +251,7 @@ def get_unzipped_video_files_json(request, datasetid): return JsonResponse(videos_data) +@put_api_user_in_request def upload_zipped_videos_folder_json(request, datasetid): status_request = dict() @@ -465,6 +442,7 @@ def json_finish(): return ["finish"] +@put_api_user_in_request def upload_videos_to_glosses(request, datasetid): # get file as a url parameter: /dictionary/upload_videos_to_glosses/5 diff --git a/signbank/api_token.py b/signbank/api_token.py index 14cc9b9cc..e0aea20ce 100644 --- a/signbank/api_token.py +++ b/signbank/api_token.py @@ -3,6 +3,11 @@ import secrets import string +from django.http import HttpRequest, JsonResponse +from django.utils.translation import gettext_lazy as _ + +from signbank.dictionary.models import SignbankAPIToken + def generate_auth_token(length=16): """Generate a random authentication token.""" @@ -16,3 +21,50 @@ def hash_token(token): return hash_object.hexdigest() +class APIAuthException(Exception): + """ Exception class to raise any problems in authorizing a user for the API""" + pass + + +def get_api_user(request): + """ + Return a user if there is a correct API token in the request. + The HTTP header must contain: + + Authorization:"Bearer XXXXXX" + + where XXXXXX represents the user's API Token + """ + auth_token_request = request.headers.get('Authorization', '') + if not auth_token_request: + return None + + auth_token = auth_token_request.removeprefix('Bearer').strip() + if not auth_token: + raise APIAuthException(_("No Authorization token found")) + + hashed_token = hash_token(auth_token) + signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() + if not signbank_token: + raise APIAuthException(_("Your Authorization Token does not match anything.")) + + return signbank_token.signbank_user + + +def put_api_user_in_request(func): + """A decorator to replace the request.user with the user found by checking an API token""" + def wrapper(*args, **kwargs): + if not args or not isinstance(args[0], HttpRequest): + return func(*args, **kwargs) + + request = args[0] + + try: + api_user = get_api_user(request) + except APIAuthException as api_auth_exception: + return JsonResponse({'errors': [str(api_auth_exception)]}) + + if api_user: + request.user = api_user + return func(*args, **kwargs) + return wrapper diff --git a/signbank/dictionary/views.py b/signbank/dictionary/views.py index be7dd0102..b104f502c 100644 --- a/signbank/dictionary/views.py +++ b/signbank/dictionary/views.py @@ -50,6 +50,8 @@ from django.utils.translation import gettext_lazy as _, activate from signbank.abstract_machine import get_interface_language_api +from signbank.api_token import put_api_user_in_request + def login_required_config(f): """like @login_required if the ALWAYS_REQUIRE_LOGIN setting is True""" @@ -2308,6 +2310,7 @@ def package(request): return response +@put_api_user_in_request def info(request): import guardian user_datasets = guardian.shortcuts.get_objects_for_user(request.user, 'change_dataset', Dataset) diff --git a/signbank/gloss_morphology_update.py b/signbank/gloss_morphology_update.py index 94db05653..4bc1a64f3 100644 --- a/signbank/gloss_morphology_update.py +++ b/signbank/gloss_morphology_update.py @@ -7,7 +7,7 @@ from guardian.shortcuts import get_objects_for_user from signbank.tools import get_interface_language_and_default_language_codes from signbank.csv_interface import normalize_field_choice -from signbank.api_token import hash_token +from signbank.api_token import put_api_user_in_request from signbank.tools import (gloss_from_identifier, get_default_annotationidglosstranslation, check_existence_sequential_morphology, check_existence_simultaneous_morphology, @@ -146,36 +146,20 @@ def gloss_update_do_changes(user, gloss, fields_to_update, language_code): @csrf_exempt +@put_api_user_in_request def api_update_gloss_morphology(request, datasetid, glossid): results = dict() - auth_token_request = request.headers.get('Authorization', '') interface_language_code = request.headers.get('Accept-Language', 'en') if interface_language_code not in settings.MODELTRANSLATION_LANGUAGES: interface_language_code = 'en' activate(interface_language_code) - if auth_token_request: - auth_token = auth_token_request.split('Bearer ')[-1] - hashed_token = hash_token(auth_token) - signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() - if not signbank_token: - results['errors'] = [gettext("Your Authorization Token does not match anything.")] - return JsonResponse(results) - username = signbank_token.signbank_user.username - user = User.objects.get(username=username) - elif request.user: - user = request.user - else: - results['errors'] = [gettext("User not found in request.")] - return JsonResponse(results) - - activate(interface_language_code) results['glossid'] = glossid errors = dict() - if not user.is_authenticated: + if not request.user.is_authenticated: errors[gettext("User")] = gettext("You must be logged in to use this functionality.") results['errors'] = errors results['updatestatus'] = "Failed" @@ -188,7 +172,7 @@ def api_update_gloss_morphology(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - change_permit_datasets = get_objects_for_user(user, 'change_dataset', Dataset) + change_permit_datasets = get_objects_for_user(request.user, 'change_dataset', Dataset) if dataset not in change_permit_datasets: errors[gettext("Dataset")] = gettext("No change permission for dataset.") results['errors'] = errors @@ -225,7 +209,7 @@ def api_update_gloss_morphology(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - if not user.has_perm('dictionary.change_gloss'): + if not request.user.has_perm('dictionary.change_gloss'): errors[gettext("Gloss")] = gettext("No change gloss permission.") results['errors'] = errors results['updatestatus'] = "Failed" @@ -245,7 +229,7 @@ def api_update_gloss_morphology(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - gloss_update_do_changes(user, gloss, fields_to_update, interface_language_code) + gloss_update_do_changes(request.user, gloss, fields_to_update, interface_language_code) results['errors'] = {} results['updatestatus'] = "Success" diff --git a/signbank/gloss_update.py b/signbank/gloss_update.py index bda7321df..0b045103a 100644 --- a/signbank/gloss_update.py +++ b/signbank/gloss_update.py @@ -8,7 +8,7 @@ from guardian.shortcuts import get_objects_for_user from signbank.tools import get_interface_language_and_default_language_codes from signbank.csv_interface import normalize_field_choice -from signbank.api_token import hash_token +from signbank.api_token import put_api_user_in_request import datetime as DT import ast @@ -530,28 +530,14 @@ def gloss_update(gloss, update_fields_dict, language_code): @csrf_exempt +@put_api_user_in_request def api_update_gloss(request, datasetid, glossid): results = dict() - auth_token_request = request.headers.get('Authorization', '') interface_language_code = request.headers.get('Accept-Language', 'en') if interface_language_code not in settings.MODELTRANSLATION_LANGUAGES: interface_language_code = 'en' activate(interface_language_code) - if auth_token_request: - auth_token = auth_token_request.split('Bearer ')[-1] - hashed_token = hash_token(auth_token) - signbank_token = SignbankAPIToken.objects.filter(api_token=hashed_token).first() - if not signbank_token: - results['errors'] = [gettext("Your Authorization Token does not match anything.")] - return JsonResponse(results) - username = signbank_token.signbank_user.username - user = User.objects.get(username=username) - elif request.user: - user = request.user - else: - results['errors'] = [gettext("User not found in request.")] - return JsonResponse(results) activate(interface_language_code) @@ -559,7 +545,7 @@ def api_update_gloss(request, datasetid, glossid): errors = dict() - if not user.is_authenticated: + if not request.user.is_authenticated: errors[gettext("User")] = gettext("You must be logged in to use this functionality.") results['errors'] = errors results['updatestatus'] = "Failed" @@ -572,7 +558,7 @@ def api_update_gloss(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - change_permit_datasets = get_objects_for_user(user, 'change_dataset', Dataset) + change_permit_datasets = get_objects_for_user(request.user, 'change_dataset', Dataset) if dataset not in change_permit_datasets: errors[gettext("Dataset")] = gettext("No change permission for dataset.") results['errors'] = errors @@ -609,7 +595,7 @@ def api_update_gloss(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - if not user.has_perm('dictionary.change_gloss'): + if not request.user.has_perm('dictionary.change_gloss'): errors[gettext("Gloss")] = gettext("No change gloss permission.") results['errors'] = errors results['updatestatus'] = "Failed" @@ -642,7 +628,7 @@ def api_update_gloss(request, datasetid, glossid): results['updatestatus'] = "Failed" return JsonResponse(results) - gloss_update_do_changes(user, gloss, fields_to_update, interface_language_code) + gloss_update_do_changes(request.user, gloss, fields_to_update, interface_language_code) results['errors'] = {} results['updatestatus'] = "Success"