Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive events #180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions alembic/versions/885054f5dc6b_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""empty message

Revision ID: 885054f5dc6b
Revises: b2cd3b252cb1
Create Date: 2024-02-29 16:52:41.238933

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "885054f5dc6b"
down_revision: Union[str, None] = "b2cd3b252cb1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"sk_events",
sa.Column("id", sa.String(), nullable=False),
sa.Column("interaction_id", sa.String(), nullable=True),
sa.Column("timestamp", sa.String(), nullable=True),
sa.Column("json_content", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_sk_events_id"), "sk_events", ["id"], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_sk_events_id"), table_name="sk_events")
op.drop_table("sk_events")
# ### end Alembic commands ###
64 changes: 64 additions & 0 deletions alembic/versions/b2cd3b252cb1_create_event_outcome_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Creates event_outcome table

Revision ID: b2cd3b252cb1
Revises: b5c8e1cfcb42
Create Date: 2024-02-29 11:24:11.278296

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "b2cd3b252cb1"
down_revision: Union[str, None] = "b5c8e1cfcb42"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"event_outcome",
sa.Column("id", sa.String(), nullable=False),
sa.Column("company_id", sa.String(), nullable=True),
sa.Column("connection_id", sa.String(), nullable=True),
sa.Column("connector_id", sa.String(), nullable=True),
sa.Column("flow_id", sa.String(), nullable=True),
sa.Column("flow_version_id", sa.String(), nullable=True),
sa.Column("event_id", sa.String(), nullable=True),
sa.Column("interaction_id", sa.String(), nullable=True),
sa.Column("name", sa.String(), nullable=True),
sa.Column("node_description", sa.String(), nullable=True),
sa.Column("node_title", sa.String(), nullable=True),
sa.Column("outcome_description", sa.String(), nullable=True),
sa.Column("outcome_status", sa.String(), nullable=True),
sa.Column("outcome_type", sa.String(), nullable=True),
sa.Column("should_continue_on_error", sa.String(), nullable=True),
sa.Column("template_precompile", sa.String(), nullable=True),
sa.Column("signal_id", sa.String(), nullable=True),
sa.Column("transition_id", sa.String(), nullable=True),
sa.Column("timestamp", sa.String(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_event_outcome_id"), "event_outcome", ["id"], unique=False)
op.drop_table("response")
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"response",
sa.Column("interaction_id", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("response_id", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("survey_id", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("session_id", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("dist", sa.VARCHAR(), autoincrement=False, nullable=False),
)
op.drop_index(op.f("ix_event_outcome_id"), table_name="event_outcome")
op.drop_table("event_outcome")
# ### end Alembic commands ###
35 changes: 35 additions & 0 deletions gdrive/archive_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Google Analytics Rest API
"""
import logging

import fastapi
from opensearchpy import OpenSearch
from pydantic import BaseModel
from fastapi import responses

from gdrive import settings
from gdrive.elastic_search import elastic_search_client

log = logging.getLogger(__name__)
router = fastapi.APIRouter()


class ArchiveInteractionEventsRequest(BaseModel):
interaction_id: str


@router.post("/archive/interaction-sk-events")
async def archive_interaction_events(req: ArchiveInteractionEventsRequest):
return responses.JSONResponse(
status_code=202,
content=elastic_search_client.get_interaction_sk_event(req.interaction_id),
)


@router.post("/archive/interaction-events-outcome")
async def archive_interaction_events(req: ArchiveInteractionEventsRequest):
return responses.JSONResponse(
status_code=202,
content=elastic_search_client.get_interaction_event_outcome(req.interaction_id),
)
7 changes: 2 additions & 5 deletions gdrive/database/crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import sqlalchemy
from sqlalchemy import orm
from gdrive.database import database

from gdrive.database import database, models


def create_participant(db_item: models.ParticipantModel):
def create_row(db_item):
session = database.SessionLocal()
session.add(db_item)
session.commit()
Expand Down
60 changes: 60 additions & 0 deletions gdrive/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,66 @@
Base = declarative.declarative_base()


class SkEventModel(Base):
__tablename__ = "sk_events"

id = sqla.Column(sqla.String, primary_key=True, index=True)
interaction_id = sqla.Column(sqla.String)
timestamp = sqla.Column(sqla.String)
json_content = sqla.Column(sqla.JSON)

def as_list(self, index: str) -> list:
return [index, self.id, self.interaction_id, self.timestamp, self.json_content]


class EventOutcomeModel(Base):
__tablename__ = "event_outcome"

id = sqla.Column(sqla.String, primary_key=True, index=True)
company_id = sqla.Column(sqla.String)
connection_id = sqla.Column(sqla.String)
connector_id = sqla.Column(sqla.String)
flow_id = sqla.Column(sqla.String)
flow_version_id = sqla.Column(sqla.String)
event_id = sqla.Column(sqla.String)
interaction_id = sqla.Column(sqla.String)
name = sqla.Column(sqla.String)
node_description = sqla.Column(sqla.String)
node_title = sqla.Column(sqla.String)
outcome_description = sqla.Column(sqla.String)
outcome_status = sqla.Column(sqla.String)
outcome_type = sqla.Column(sqla.String)
should_continue_on_error = sqla.Column(sqla.String)
template_precompile = sqla.Column(sqla.String)
signal_id = sqla.Column(sqla.String)
transition_id = sqla.Column(sqla.String)
timestamp = sqla.Column(sqla.String)

def as_list(self, index: str) -> list:
return [
index,
self.id,
self.company_id,
self.connection_id,
self.connector_id,
self.flow_id,
self.flow_version_id,
self.event_id,
self.interaction_id,
self.name,
self.node_description,
self.node_title,
self.outcome_description,
self.outcome_status,
self.outcome_type,
self.should_continue_on_error,
self.template_precompile,
self.signal_id,
self.transition_id,
self.timestamp,
]


class ParticipantModel(Base):
__tablename__ = "participant"

Expand Down
87 changes: 87 additions & 0 deletions gdrive/elastic_search/elastic_search_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import json
import logging

from opensearchpy import OpenSearch
from sqlalchemy import exc
from gdrive import settings
from gdrive.database import crud, models

log = logging.getLogger(__name__)


def get_interaction_sk_event(interaction_id: str):
es = OpenSearch(
hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300
)

query = {
"query": {
"bool": {"should": [{"match_phrase": {"interactionId": interaction_id}}]}
},
"sort": {"tsEms": {"order": "asc"}},
}

qstring = json.dumps(query)
r = es.search(body=qstring, index="dev-skevents-*")
for hit in r["hits"]["hits"]:
model = models.SkEventModel(
id=hit["_id"],
interaction_id=hit["_source"]["interactionId"],
json_content=hit["_source"],
timestamp=hit["_source"]["tsEms"],
)
try:
crud.create_row(model)
except exc.IntegrityError as _:
log.error("Error writing id=%s" % model.id)

return r


def get_interaction_event_outcome(interaction_id: str):
es = OpenSearch(
hosts=[{"host": settings.ES_HOST, "port": settings.ES_PORT}], timeout=300
)

query = {
"query": {
"bool": {"should": [{"match_phrase": {"interactionId": interaction_id}}]}
},
"sort": {"tsEms": {"order": "asc"}},
}

qstring = json.dumps(query)
r = es.search(body=qstring, index="dev-eventsoutcome-*")
for hit in r["hits"]["hits"]:
model = models.EventOutcomeModel(
id=hit["_id"],
company_id=hit["_source"]["companyId"],
connection_id=hit["_source"]["companyId"],
connector_id=hit["_source"]["connectorId"],
flow_id=hit["_source"]["flowId"],
flow_version_id=hit["_source"]["flowVersionId"],
event_id=hit["_source"]["id"],
interaction_id=hit["_source"]["interactionId"],
name=hit["_source"]["name"],
node_description=hit["_source"]["properties"]["nodeDescription"]["value"],
node_title=hit["_source"]["properties"]["nodeTitle"]["value"],
outcome_description=hit["_source"]["properties"]["outcomeDescription"][
"value"
],
outcome_status=hit["_source"]["properties"]["outcomeStatus"]["value"],
outcome_type=hit["_source"]["properties"]["outcomeType"]["value"],
should_continue_on_error=hit["_source"]["properties"][
"shouldContinueOnError"
]["value"],
template_precompile=hit["_source"]["properties"]["templatePrecompile"],
signal_id=hit["_source"]["signalId"],
transition_id=hit["_source"]["transitionId"],
timestamp=hit["_source"]["tsEms"],
)

try:
crud.create_row(model)
except exc.IntegrityError as _:
log.error("Error writing id=%s" % model.id)

return r
3 changes: 2 additions & 1 deletion gdrive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fastapi
import starlette_prometheus

from . import api, export_api, analytics_api, settings
from . import api, export_api, analytics_api, archive_api, settings

app = fastapi.FastAPI()

Expand All @@ -14,3 +14,4 @@
app.include_router(api.router)
app.include_router(export_api.router)
app.include_router(analytics_api.router)
app.include_router(archive_api.router)
Loading