diff --git a/alembic/versions/885054f5dc6b_.py b/alembic/versions/885054f5dc6b_.py new file mode 100644 index 0000000..58bb85a --- /dev/null +++ b/alembic/versions/885054f5dc6b_.py @@ -0,0 +1,39 @@ +"""empty message + +Revision ID: 885054f5dc6b +Revises: b2cd3b252cb1 +Create Date: 2024-02-29 16:52:41.238933 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "885054f5dc6b" +down_revision: Union[str, None] = "b2cd3b252cb1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "sk_events", + sa.Column("id", sa.String(), nullable=False), + sa.Column("interaction_id", sa.String(), nullable=True), + sa.Column("timestamp", sa.String(), nullable=True), + sa.Column("json_content", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index(op.f("ix_sk_events_id"), "sk_events", ["id"], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_sk_events_id"), table_name="sk_events") + op.drop_table("sk_events") + # ### end Alembic commands ### diff --git a/alembic/versions/b2cd3b252cb1_create_event_outcome_table.py b/alembic/versions/b2cd3b252cb1_create_event_outcome_table.py new file mode 100644 index 0000000..0d6c0d9 --- /dev/null +++ b/alembic/versions/b2cd3b252cb1_create_event_outcome_table.py @@ -0,0 +1,64 @@ +""" +Creates event_outcome table + +Revision ID: b2cd3b252cb1 +Revises: b5c8e1cfcb42 +Create Date: 2024-02-29 11:24:11.278296 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "b2cd3b252cb1" +down_revision: Union[str, None] = "b5c8e1cfcb42" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "event_outcome", + sa.Column("id", sa.String(), nullable=False), + sa.Column("company_id", sa.String(), nullable=True), + sa.Column("connection_id", sa.String(), nullable=True), + sa.Column("connector_id", sa.String(), nullable=True), + sa.Column("flow_id", sa.String(), nullable=True), + sa.Column("flow_version_id", sa.String(), nullable=True), + sa.Column("event_id", sa.String(), nullable=True), + sa.Column("interaction_id", sa.String(), nullable=True), + sa.Column("name", sa.String(), nullable=True), + sa.Column("node_description", sa.String(), nullable=True), + sa.Column("node_title", sa.String(), nullable=True), + sa.Column("outcome_description", sa.String(), nullable=True), + sa.Column("outcome_status", sa.String(), nullable=True), + sa.Column("outcome_type", sa.String(), nullable=True), + sa.Column("should_continue_on_error", sa.String(), nullable=True), + sa.Column("template_precompile", sa.String(), nullable=True), + sa.Column("signal_id", sa.String(), nullable=True), + sa.Column("transition_id", sa.String(), nullable=True), + sa.Column("timestamp", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index(op.f("ix_event_outcome_id"), "event_outcome", ["id"], unique=False) + op.drop_table("response") + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "response", + sa.Column("interaction_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("response_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("survey_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("session_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("dist", sa.VARCHAR(), autoincrement=False, nullable=False), + ) + op.drop_index(op.f("ix_event_outcome_id"), table_name="event_outcome") + op.drop_table("event_outcome") + # ### end Alembic commands ### diff --git a/gdrive/archive_api.py b/gdrive/archive_api.py new file mode 100644 index 0000000..ab7168d --- /dev/null +++ b/gdrive/archive_api.py @@ -0,0 +1,35 @@ +""" +Google Analytics Rest API +""" +import logging + +import fastapi +from opensearchpy import OpenSearch +from pydantic import BaseModel +from fastapi import responses + +from gdrive import settings +from gdrive.elastic_search import elastic_search_client + +log = logging.getLogger(__name__) +router = fastapi.APIRouter() + + +class ArchiveInteractionEventsRequest(BaseModel): + interaction_id: str + + +@router.post("/archive/interaction-sk-events") +async def archive_interaction_events(req: ArchiveInteractionEventsRequest): + return responses.JSONResponse( + status_code=202, + content=elastic_search_client.get_interaction_sk_event(req.interaction_id), + ) + + +@router.post("/archive/interaction-events-outcome") +async def archive_interaction_events(req: ArchiveInteractionEventsRequest): + return responses.JSONResponse( + status_code=202, + content=elastic_search_client.get_interaction_event_outcome(req.interaction_id), + ) diff --git a/gdrive/database/crud.py b/gdrive/database/crud.py index 8a40772..383f080 100644 --- a/gdrive/database/crud.py +++ b/gdrive/database/crud.py @@ -1,10 +1,7 @@ -import sqlalchemy -from sqlalchemy import orm +from gdrive.database import database -from gdrive.database import database, models - -def create_participant(db_item: models.ParticipantModel): +def create_row(db_item): session = database.SessionLocal() session.add(db_item) session.commit() diff --git a/gdrive/database/models.py b/gdrive/database/models.py index d7a0f35..bd8241d 100644 --- a/gdrive/database/models.py +++ b/gdrive/database/models.py @@ -6,6 +6,66 @@ Base = declarative.declarative_base() +class SkEventModel(Base): + __tablename__ = "sk_events" + + id = sqla.Column(sqla.String, primary_key=True, index=True) + interaction_id = sqla.Column(sqla.String) + timestamp = sqla.Column(sqla.String) + json_content = sqla.Column(sqla.JSON) + + def as_list(self, index: str) -> list: + return [index, self.id, self.interaction_id, self.timestamp, self.json_content] + + +class EventOutcomeModel(Base): + __tablename__ = "event_outcome" + + id = sqla.Column(sqla.String, primary_key=True, index=True) + company_id = sqla.Column(sqla.String) + connection_id = sqla.Column(sqla.String) + connector_id = sqla.Column(sqla.String) + flow_id = sqla.Column(sqla.String) + flow_version_id = sqla.Column(sqla.String) + event_id = sqla.Column(sqla.String) + interaction_id = sqla.Column(sqla.String) + name = sqla.Column(sqla.String) + node_description = sqla.Column(sqla.String) + node_title = sqla.Column(sqla.String) + outcome_description = sqla.Column(sqla.String) + outcome_status = sqla.Column(sqla.String) + outcome_type = sqla.Column(sqla.String) + should_continue_on_error = sqla.Column(sqla.String) + template_precompile = sqla.Column(sqla.String) + signal_id = sqla.Column(sqla.String) + transition_id = sqla.Column(sqla.String) + timestamp = sqla.Column(sqla.String) + + def as_list(self, index: str) -> list: + return [ + index, + self.id, + self.company_id, + self.connection_id, + self.connector_id, + self.flow_id, + self.flow_version_id, + self.event_id, + self.interaction_id, + self.name, + self.node_description, + self.node_title, + self.outcome_description, + self.outcome_status, + self.outcome_type, + self.should_continue_on_error, + self.template_precompile, + self.signal_id, + self.transition_id, + self.timestamp, + ] + + class ParticipantModel(Base): __tablename__ = "participant" diff --git a/gdrive/elastic_search/elastic_search_client.py b/gdrive/elastic_search/elastic_search_client.py new file mode 100644 index 0000000..5899517 --- /dev/null +++ b/gdrive/elastic_search/elastic_search_client.py @@ -0,0 +1,87 @@ +import json +import logging + +from opensearchpy import OpenSearch +from sqlalchemy import exc +from gdrive import settings +from gdrive.database import crud, models + +log = logging.getLogger(__name__) + + +def get_interaction_sk_event(interaction_id: str): + es = OpenSearch( + hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300 + ) + + query = { + "query": { + "bool": {"should": [{"match_phrase": {"interactionId": interaction_id}}]} + }, + "sort": {"tsEms": {"order": "asc"}}, + } + + qstring = json.dumps(query) + r = es.search(body=qstring, index="dev-skevents-*") + for hit in r["hits"]["hits"]: + model = models.SkEventModel( + id=hit["_id"], + interaction_id=hit["_source"]["interactionId"], + json_content=hit["_source"], + timestamp=hit["_source"]["tsEms"], + ) + try: + crud.create_row(model) + except exc.IntegrityError as _: + log.error("Error writing id=%s" % model.id) + + return r + + +def get_interaction_event_outcome(interaction_id: str): + es = OpenSearch( + hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300 + ) + + query = { + "query": { + "bool": {"should": [{"match_phrase": {"interactionId": interaction_id}}]} + }, + "sort": {"tsEms": {"order": "asc"}}, + } + + qstring = json.dumps(query) + r = es.search(body=qstring, index="dev-eventsoutcome-*") + for hit in r["hits"]["hits"]: + model = models.EventOutcomeModel( + id=hit["_id"], + company_id=hit["_source"]["companyId"], + connection_id=hit["_source"]["companyId"], + connector_id=hit["_source"]["connectorId"], + flow_id=hit["_source"]["flowId"], + flow_version_id=hit["_source"]["flowVersionId"], + event_id=hit["_source"]["id"], + interaction_id=hit["_source"]["interactionId"], + name=hit["_source"]["name"], + node_description=hit["_source"]["properties"]["nodeDescription"]["value"], + node_title=hit["_source"]["properties"]["nodeTitle"]["value"], + outcome_description=hit["_source"]["properties"]["outcomeDescription"][ + "value" + ], + outcome_status=hit["_source"]["properties"]["outcomeStatus"]["value"], + outcome_type=hit["_source"]["properties"]["outcomeType"]["value"], + should_continue_on_error=hit["_source"]["properties"][ + "shouldContinueOnError" + ]["value"], + template_precompile=hit["_source"]["properties"]["templatePrecompile"], + signal_id=hit["_source"]["signalId"], + transition_id=hit["_source"]["transitionId"], + timestamp=hit["_source"]["tsEms"], + ) + + try: + crud.create_row(model) + except exc.IntegrityError as _: + log.error("Error writing id=%s" % model.id) + + return r diff --git a/gdrive/main.py b/gdrive/main.py index 0a438c7..0b6cba7 100644 --- a/gdrive/main.py +++ b/gdrive/main.py @@ -4,7 +4,7 @@ import fastapi import starlette_prometheus -from . import api, export_api, analytics_api, settings +from . import api, export_api, analytics_api, archive_api, settings app = fastapi.FastAPI() @@ -14,3 +14,4 @@ app.include_router(api.router) app.include_router(export_api.router) app.include_router(analytics_api.router) +app.include_router(archive_api.router)