Skip to content

Commit

Permalink
Adding an email validation route to the qualtrix service
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-moore-97 committed Nov 6, 2023
1 parent 7715b81 commit fba515b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
25 changes: 24 additions & 1 deletion qualtrix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging

import fastapi
from fastapi import HTTPException
from fastapi import HTTPException, responses
from pydantic import BaseModel, typing

from qualtrix import client, error
Expand All @@ -27,6 +27,11 @@ class SessionModel(SurveyModel):
sessionId: str


class EmailValidationModel(BaseModel):
directoryId: str
email: str


@router.post("/bulk-responses")
async def test():
return client.result_export()
Expand Down Expand Up @@ -54,3 +59,21 @@ async def session(request: SessionModel):
return client.delete_session(request.surveyId, request.sessionId)
except error.QualtricsError as e:
raise HTTPException(status_code=400, detail=e.args)


@router.post("/email")
async def email(request: EmailValidationModel):
"""
Checks if the supplied email exists in the Contact Pool
"""
try:
return responses.JSONResponse(
status_code=200,
content={
"emailPresent": client.validate_email(
request.email, request.directoryId
)
},
)
except error.QualtricsError as e:
raise HTTPException(status_code=422, detail=e.args)
19 changes: 19 additions & 0 deletions qualtrix/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
# Permisions # read:survey_responses

auth_header = {"X-API-TOKEN": settings.API_TOKEN}
application_header = {
"X-API-TOKEN": settings.API_TOKEN,
"content-type": "application/json",
}


def get_response(survey_id: str, response_id: str):
Expand Down Expand Up @@ -67,6 +71,21 @@ def get_survey_schema(survey_id: str):
return r.json()


def validate_email(email: str, directory_id=settings.DIRECTORY_ID):
r = requests.post(
settings.BASE_URL
+ f"/directories/{directory_id}/contacts/search?includeEmbedded=true&includeSegments=false",
headers=application_header,
json=({"filter": {"filterType": "email", "comparison": "eq", "value": email}}),
)

if r.status_code != 200:
raise error.QualtricsError("Survey response not found, likely bad POOL ID")

elements = list((dict(r.json())["result"]["elements"]))
return len(elements) != 0


def result_export(survey_id: str):
r_body = {
"format": "json",
Expand Down

0 comments on commit fba515b

Please sign in to comment.