Skip to content

Commit

Permalink
Merge pull request #62 from naxa-developers/fix/task-validation
Browse files Browse the repository at this point in the history
fix: task validation using threds
  • Loading branch information
kaditya97 authored Feb 27, 2024
2 parents 1c577da + f4d337f commit 1c0a91f
Showing 1 changed file with 73 additions and 50 deletions.
123 changes: 73 additions & 50 deletions backend/services/validator_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from flask import current_app
from sqlalchemy import text
from multiprocessing.dummy import Pool as ThreadPool
import os

from backend import db
from backend.exceptions import NotFound
from backend.models.dtos.mapping_dto import TaskDTOs
from backend.models.dtos.stats_dto import Pagination
Expand Down Expand Up @@ -28,7 +31,6 @@
from backend.services.stats_service import StatsService
from backend.services.users.user_service import UserService
from backend.services.mapping_service import MappingService
import threading


class ValidatorServiceError(Exception):
Expand Down Expand Up @@ -136,56 +138,65 @@ def _user_can_validate_task(user_id: int, mapped_by: int) -> bool:
return False

@staticmethod
def _process_tasks(
task_to_unlock, project_id, validated_dto, message_sent_to, dtos
):
task = task_to_unlock["task"]

if task_to_unlock["comment"]:
# Parses comment to see if any users have been @'d
MessageService.send_message_after_comment(
validated_dto.user_id,
task_to_unlock["comment"],
task.id,
validated_dto.project_id,
)
if (
task_to_unlock["new_state"] == TaskStatus.VALIDATED
or task_to_unlock["new_state"] == TaskStatus.INVALIDATED
):
# All mappers get a notification if their task has been validated or invalidated.
# Only once if multiple tasks mapped
if task.mapped_by not in message_sent_to:
MessageService.send_message_after_validation(
task_to_unlock["new_state"],
def _process_tasks(args):
(
app_context,
task_to_unlock,
project_id,
validated_dto,
message_sent_to,
dtos,
) = args
with app_context:
task = task_to_unlock["task"]

if task_to_unlock["comment"]:
# Parses comment to see if any users have been @'d
MessageService.send_message_after_comment(
validated_dto.user_id,
task.mapped_by,
task_to_unlock["comment"],
task.id,
validated_dto.project_id,
)
message_sent_to.append(task.mapped_by)
if (
task_to_unlock["new_state"] == TaskStatus.VALIDATED
or task_to_unlock["new_state"] == TaskStatus.INVALIDATED
):
# All mappers get a notification if their task has been validated or invalidated.
# Only once if multiple tasks mapped
if task.mapped_by not in message_sent_to:
MessageService.send_message_after_validation(
task_to_unlock["new_state"],
validated_dto.user_id,
task.mapped_by,
task.id,
validated_dto.project_id,
)
message_sent_to.append(task.mapped_by)

if task_to_unlock["new_state"] == TaskStatus.VALIDATED:
# Set last_validation_date for the mapper to current date
task.mapper.last_validation_date = timestamp()
if task_to_unlock["new_state"] == TaskStatus.VALIDATED:
# Set last_validation_date for the mapper to current date
task.mapper.last_validation_date = timestamp()

# Update stats if user setting task to a different state from previous state
prev_status = TaskHistory.get_last_status(project_id, task.id)
if prev_status != task_to_unlock["new_state"]:
StatsService.update_stats_after_task_state_change(
validated_dto.project_id,
# Update stats if user setting task to a different state from previous state
prev_status = TaskHistory.get_last_status(project_id, task.id)
if prev_status != task_to_unlock["new_state"]:
StatsService.update_stats_after_task_state_change(
validated_dto.project_id,
validated_dto.user_id,
prev_status,
task_to_unlock["new_state"],
)
task_mapping_issues = ValidatorService.get_task_mapping_issues(
task_to_unlock
)
task.unlock_task(
validated_dto.user_id,
prev_status,
task_to_unlock["new_state"],
task_to_unlock["comment"],
issues=task_mapping_issues,
)
task_mapping_issues = ValidatorService.get_task_mapping_issues(task_to_unlock)
task.unlock_task(
validated_dto.user_id,
task_to_unlock["new_state"],
task_to_unlock["comment"],
issues=task_mapping_issues,
)
dtos.append(task.as_dto_with_instructions(validated_dto.preferred_locale))
dtos.append(task.as_dto_with_instructions(validated_dto.preferred_locale))

@staticmethod
def unlock_tasks_after_validation(
Expand All @@ -205,17 +216,29 @@ def unlock_tasks_after_validation(
# Unlock all tasks
dtos = []
message_sent_to = []
threads = []
args_list = []
for task_to_unlock in tasks_to_unlock:
thread = threading.Thread(
target=ValidatorService._process_tasks,
args=(task_to_unlock, project_id, validated_dto, message_sent_to, dtos),
args = (
current_app.app_context(),
task_to_unlock,
project_id,
validated_dto,
message_sent_to,
dtos,
)
threads.append(thread)
thread.start()
args_list.append(args)

# Create a pool and Process the tasks in parallel
pool = ThreadPool(os.cpu_count())
pool.map(ValidatorService._process_tasks, args_list)

# Close the pool and wait for the threads to finish
pool.close()
pool.join()
db.session.commit()

# Send email on project progress
ProjectService.send_email_on_project_progress(validated_dto.project_id)
for thread in threads:
thread.join()
task_dtos = TaskDTOs()
task_dtos.tasks = dtos

Expand Down

0 comments on commit 1c0a91f

Please sign in to comment.