diff --git a/backend/services/validator_service.py b/backend/services/validator_service.py index 80df580dc1..affb271347 100644 --- a/backend/services/validator_service.py +++ b/backend/services/validator_service.py @@ -1,6 +1,9 @@ from flask import current_app from sqlalchemy import text +from multiprocessing.dummy import Pool as ThreadPool +import os +from backend import db from backend.exceptions import NotFound from backend.models.dtos.mapping_dto import TaskDTOs from backend.models.dtos.stats_dto import Pagination @@ -28,7 +31,6 @@ from backend.services.stats_service import StatsService from backend.services.users.user_service import UserService from backend.services.mapping_service import MappingService -import threading class ValidatorServiceError(Exception): @@ -136,56 +138,65 @@ def _user_can_validate_task(user_id: int, mapped_by: int) -> bool: return False @staticmethod - def _process_tasks( - task_to_unlock, project_id, validated_dto, message_sent_to, dtos - ): - task = task_to_unlock["task"] - - if task_to_unlock["comment"]: - # Parses comment to see if any users have been @'d - MessageService.send_message_after_comment( - validated_dto.user_id, - task_to_unlock["comment"], - task.id, - validated_dto.project_id, - ) - if ( - task_to_unlock["new_state"] == TaskStatus.VALIDATED - or task_to_unlock["new_state"] == TaskStatus.INVALIDATED - ): - # All mappers get a notification if their task has been validated or invalidated. - # Only once if multiple tasks mapped - if task.mapped_by not in message_sent_to: - MessageService.send_message_after_validation( - task_to_unlock["new_state"], + def _process_tasks(args): + ( + app_context, + task_to_unlock, + project_id, + validated_dto, + message_sent_to, + dtos, + ) = args + with app_context: + task = task_to_unlock["task"] + + if task_to_unlock["comment"]: + # Parses comment to see if any users have been @'d + MessageService.send_message_after_comment( validated_dto.user_id, - task.mapped_by, + task_to_unlock["comment"], task.id, validated_dto.project_id, ) - message_sent_to.append(task.mapped_by) + if ( + task_to_unlock["new_state"] == TaskStatus.VALIDATED + or task_to_unlock["new_state"] == TaskStatus.INVALIDATED + ): + # All mappers get a notification if their task has been validated or invalidated. + # Only once if multiple tasks mapped + if task.mapped_by not in message_sent_to: + MessageService.send_message_after_validation( + task_to_unlock["new_state"], + validated_dto.user_id, + task.mapped_by, + task.id, + validated_dto.project_id, + ) + message_sent_to.append(task.mapped_by) - if task_to_unlock["new_state"] == TaskStatus.VALIDATED: - # Set last_validation_date for the mapper to current date - task.mapper.last_validation_date = timestamp() + if task_to_unlock["new_state"] == TaskStatus.VALIDATED: + # Set last_validation_date for the mapper to current date + task.mapper.last_validation_date = timestamp() - # Update stats if user setting task to a different state from previous state - prev_status = TaskHistory.get_last_status(project_id, task.id) - if prev_status != task_to_unlock["new_state"]: - StatsService.update_stats_after_task_state_change( - validated_dto.project_id, + # Update stats if user setting task to a different state from previous state + prev_status = TaskHistory.get_last_status(project_id, task.id) + if prev_status != task_to_unlock["new_state"]: + StatsService.update_stats_after_task_state_change( + validated_dto.project_id, + validated_dto.user_id, + prev_status, + task_to_unlock["new_state"], + ) + task_mapping_issues = ValidatorService.get_task_mapping_issues( + task_to_unlock + ) + task.unlock_task( validated_dto.user_id, - prev_status, task_to_unlock["new_state"], + task_to_unlock["comment"], + issues=task_mapping_issues, ) - task_mapping_issues = ValidatorService.get_task_mapping_issues(task_to_unlock) - task.unlock_task( - validated_dto.user_id, - task_to_unlock["new_state"], - task_to_unlock["comment"], - issues=task_mapping_issues, - ) - dtos.append(task.as_dto_with_instructions(validated_dto.preferred_locale)) + dtos.append(task.as_dto_with_instructions(validated_dto.preferred_locale)) @staticmethod def unlock_tasks_after_validation( @@ -205,17 +216,29 @@ def unlock_tasks_after_validation( # Unlock all tasks dtos = [] message_sent_to = [] - threads = [] + args_list = [] for task_to_unlock in tasks_to_unlock: - thread = threading.Thread( - target=ValidatorService._process_tasks, - args=(task_to_unlock, project_id, validated_dto, message_sent_to, dtos), + args = ( + current_app.app_context(), + task_to_unlock, + project_id, + validated_dto, + message_sent_to, + dtos, ) - threads.append(thread) - thread.start() + args_list.append(args) + + # Create a pool and Process the tasks in parallel + pool = ThreadPool(os.cpu_count()) + pool.map(ValidatorService._process_tasks, args_list) + + # Close the pool and wait for the threads to finish + pool.close() + pool.join() + db.session.commit() + + # Send email on project progress ProjectService.send_email_on_project_progress(validated_dto.project_id) - for thread in threads: - thread.join() task_dtos = TaskDTOs() task_dtos.tasks = dtos @@ -450,4 +473,4 @@ def revert_user_tasks(revert_dto: RevertUserTasksDTO): else: raise ValidatorServiceError( "UserActionNotPermitted- User not permitted to revert tasks" - ) + ) \ No newline at end of file