-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamlit_app.py
187 lines (171 loc) · 8.54 KB
/
streamlit_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import streamlit as st
import requests
import analytics
import uuid
# Segment analytics configuration
WRITE_KEY = st.secrets['SECRET_KEY']
analytics.write_key = WRITE_KEY
# Backend URL
BACKEND_URL = st.secrets['CUSTOM_API']
# Initialize session state
if 'api_key' not in st.session_state:
st.session_state['api_key'] = ''
if 'transcript' not in st.session_state:
st.session_state['transcript'] = ''
if 'combined_transcript' not in st.session_state:
st.session_state['combined_transcript'] = ''
if 'utterances' not in st.session_state:
st.session_state['utterances'] = []
if 'analysis' not in st.session_state:
st.session_state['analysis'] = ''
if 'embeddings_ready' not in st.session_state:
st.session_state['embeddings_ready'] = False
if 'user_id' not in st.session_state:
st.session_state['user_id'] = str(uuid.uuid4()) # Generate a unique user ID
st.title("Call Transcript Analysis")
# API Key Input
st.subheader("API Key")
api_key_input = st.text_input("Enter your API Key:", st.session_state['api_key'], type="password")
if st.button("Set API Key"):
if api_key_input.strip():
st.session_state['api_key'] = api_key_input.strip()
st.success("API Key set successfully!")
else:
st.error("Please enter a valid API Key.")
# Function to track events with Segment
def track_event(event_name, properties=None):
try:
analytics.track(user_id=st.session_state['user_id'], event=event_name, properties=properties)
except Exception as e:
print(f"Analytics tracking failed: {e}")
# Function to generate headers
def get_headers():
return {
"API-Key": st.session_state['api_key'],
"User-Session-ID": st.session_state['user_id']
}
# File uploader
uploaded_file = st.file_uploader(
"Upload an audio file (wav, mp3, m4a)", # Provide a meaningful label
type=["wav", "mp3", "m4a"]
)
if uploaded_file:
st.audio(uploaded_file)
# Track file upload event
track_event("File Uploaded", {"file_name": uploaded_file.name, "file_type": uploaded_file.type, "user_id": st.session_state['user_id']})
if st.button("Transcribe", key="transcribe"):
with st.spinner("Transcribing..."):
try:
files = {"file": uploaded_file}
response = requests.post(f"{BACKEND_URL}/transcribe", files=files, headers=get_headers())
response_data = response.json()
if response.status_code == 200:
st.session_state.transcript = response_data["full_transcript"]
st.session_state.combined_transcript = response_data["combined_transcript"]
st.session_state.utterances = response_data["utterances"]
st.session_state['user_id'] = response_data["user_id"]
st.success("Transcription completed successfully.")
track_event("Transcription Completed", {"user_id": st.session_state['user_id']})
else:
st.error(response_data.get("error", "An error occurred."))
track_event("Transcription Failed", {"error": response_data.get("error", "Unknown error")})
except Exception as e:
st.error(f"Error: {e}")
track_event("Transcription Exception", {"error": str(e)})
if st.session_state.transcript:
st.subheader("Full Transcript")
st.text_area(
"Full Transcript Text",
st.session_state.transcript,
height=150,
label_visibility="collapsed"
)
st.subheader("Diarized Transcript (Speaker Labels)")
diarized_text = "\n".join([f"{utterance['speaker']}: {utterance['text']}" for utterance in st.session_state.utterances])
st.text_area(
"Diarized Transcript Text",
diarized_text,
height=150,
label_visibility="collapsed"
)
# Analyze Button
if st.button("Analyze", key="analyze"):
with st.spinner("Analyzing the call..."):
try:
data = {"transcript": st.session_state.combined_transcript}
response = requests.post(f"{BACKEND_URL}/analyze", json=data, headers=get_headers())
response_data = response.json()
if response.status_code == 200:
st.session_state.analysis = response_data["analysis"]
st.success("Analysis completed successfully.")
track_event("Analysis Completed", {"user_id": st.session_state['user_id']})
else:
error_message = response_data.get("error", "An error occurred.")
st.error(error_message)
track_event("Analysis Failed", {"error": error_message, "user_id": st.session_state['user_id']})
except Exception as e:
st.error(f"Error: {e}")
track_event("Analysis Exception", {"error": str(e)})
# Display Analysis Results
if st.session_state.analysis:
st.subheader("Analysis Results")
st.markdown(st.session_state.analysis, unsafe_allow_html=True)
if not st.session_state.embeddings_ready:
if st.button("Start AI Chat", key="start_chat"):
with st.spinner("Creating embeddings..."):
if not st.session_state['utterances']:
st.error("No utterances available for creating embeddings.")
else:
try:
response = requests.post(
f"{BACKEND_URL}/create_embeddings",
json={"utterances": st.session_state['utterances']},
headers=get_headers()
)
if response.status_code == 200:
st.success("Embeddings created successfully. You can now chat with the AI!")
st.session_state.embeddings_ready = True
track_event("Embeddings Created", {"user_id": st.session_state['user_id']})
else:
st.error(response.json().get("error", "An error occurred."))
track_event("Embeddings Creation Failed", {"error": response.json().get("error")})
except Exception as e:
st.error(f"Error: {e}")
track_event("Embeddings Creation Exception", {"error": str(e)})
else:
st.subheader("Chat About the Call")
user_query = st.text_input("Ask a question about the call:")
if st.button("Ask", key="ask_query"):
if user_query.strip():
with st.spinner("Generating response..."):
try:
response = requests.post(f"{BACKEND_URL}/ask", json={"query": user_query}, headers=get_headers())
if response.status_code == 200:
st.text_area("Response", response.json()["response"], height=150)
track_event("Query Success", {"query": user_query, "user_id": st.session_state['user_id']})
else:
st.error(response.json().get("error", "An error occurred."))
track_event("Query Failed", {"error": response.json().get("error")})
except Exception as e:
st.error(f"Error: {e}")
track_event("Query Exception", {"error": str(e)})
else:
st.error("Please enter a question before clicking 'Ask'.")
# Display Delete Embeddings Button
if st.button("Delete Embeddings", key="delete_embeddings"):
with st.spinner("Deleting embeddings..."):
try:
response = requests.post(
f"{BACKEND_URL}/delete-embeddings",
headers=get_headers()
)
if response.status_code == 200:
st.success(response.json().get("message", "Embeddings deleted successfully."))
st.session_state.embeddings_ready = False # Reset embeddings_ready state
track_event("Embeddings Deleted", {"user_id": st.session_state['user_id']})
else:
st.error(response.json().get("error", "An error occurred during deletion."))
track_event("Embeddings Deletion Failed", {"error": response.json().get("error")})
except Exception as e:
st.error(f"Error: {e}")
track_event("Embeddings Deletion Exception", {"error": str(e)})