forked from robertoyamanaka/mango-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
268 lines (231 loc) Β· 11.9 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
from telethon import TelegramClient, events, functions, types
import sqlite3
import logging
import os
from dotenv import load_dotenv
import json
import requests
# Load environment variables
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Update Telegram API credentials
api_id = int(os.getenv('API_ID')) # Convert to int since env vars are strings
api_hash = os.getenv('API_HASH')
bot_token = os.getenv('BOT_TOKEN')
def score_response(message):
"""Score the message using Red Pill AI"""
scoring_criteria = {
"question_quality": "How interesting or thought-provoking is this question for the Web3 community?",
"technical_value": "Does this message contain valuable technical information or insights?",
"link_relevance": "Are there relevant links or references shared that add value?",
"community_engagement": "How likely is this message to spark meaningful community discussion?",
"unique_perspective": "Does this message offer a unique or innovative point of view?",
"market_insight": "Does this message provide valuable market or trend insights?",
"resource_sharing": "How valuable are the shared resources or tools mentioned?",
"problem_solving": "Does this message help solve a community member's problem?",
"knowledge_sharing": "How effectively does this message share knowledge or experience?",
"credibility": "How credible and well-supported are the claims or statements?"
}
scores = {}
for criterion, prompt in scoring_criteria.items():
api_response = requests.post(
url="https://api.red-pill.ai/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.getenv('REDPILL_API_KEY')}",
},
data=json.dumps({
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "You are a Web3 community analyst. You must ONLY return a JSON object with a single 'score' field containing a number between 0-10. Example: {\"score\": 7}. Do not include any other text or explanation."},
{"role": "user", "content": f"{prompt}\n\nMessage to analyze:\n{message}"}
],
"temperature": 0.3
})
)
try:
score_text = api_response.json()['choices'][0]['message']['content'].strip()
if not score_text.startswith('{'):
score_text = f'{{"score": {score_text}}}'
score_data = json.loads(score_text)
score = float(score_data['score'])
scores[criterion] = min(max(score, 0), 10)
except Exception as e:
logger.error(f"Error scoring criterion {criterion}: {e}")
scores[criterion] = 0
# Calculate average score
non_zero_scores = [s for s in scores.values() if s > 0]
average_score = sum(non_zero_scores) / len(non_zero_scores) if non_zero_scores else 0
scores['average_score'] = round(average_score, 1)
return scores, average_score
async def main():
try:
# Initialize Telegram client
logger.info("Initializing Telegram client...")
client = TelegramClient('bot_session', int(api_id), api_hash)
client.parse_mode = 'html'
await client.start(bot_token=bot_token)
await client.get_me()
await client.catch_up()
logger.info("Bot successfully connected to Telegram!")
# Database setup
logger.info("Setting up database...")
conn = sqlite3.connect('telegram_scores.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS user_scores
(user_id INTEGER,
username TEXT,
group_id INTEGER,
group_name TEXT,
message TEXT,
score_data JSON,
average_score FLOAT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
conn.commit()
logger.info("Database setup complete!")
@client.on(events.NewMessage(func=None))
async def handle_new_message(event):
try:
print("\n=== INCOMING MESSAGE DEBUG ===")
print(f"Raw text: {event.raw_text}")
print(f"Message type: {type(event.message)}")
print(f"Chat type: {type(await event.get_chat())}")
print(f"Is group: {event.is_group}")
# Get sender and message information
sender = await event.get_sender()
user_id = sender.id
username = sender.username or str(user_id)
message = event.raw_text
# Skip command messages for scoring
if message and not message.startswith('/'): # Only process non-command messages
print("\n=== Processing Message ===")
logger.info(f"Processing message: '{message}' from user: {username}")
# Score the message
print("Calling score_response function...")
scores, average_score = score_response(message)
print("\nScores received:")
print(json.dumps(scores, indent=2))
print(f"Average score: {average_score}")
if average_score > 0:
# Store in database and respond
cursor.execute('''INSERT INTO user_scores
(user_id, username, group_id, group_name, message, score_data, average_score)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(user_id, username, event.chat_id,
(await event.get_chat()).title if event.is_group else "Private",
message, json.dumps(scores), average_score))
conn.commit()
await event.respond(f"Message scored: {average_score}/10")
except Exception as e:
print(f"ERROR in message handler: {str(e)}")
logger.error(f"Error in message handler: {e}", exc_info=True)
# Add a debug command to check database contents
@client.on(events.NewMessage(pattern='/debug'))
async def debug_database(event):
try:
print("\n=== Database Debug ===")
cursor.execute('SELECT COUNT(*) FROM user_scores')
count = cursor.fetchone()[0]
print(f"Total records in database: {count}")
cursor.execute('SELECT * FROM user_scores ORDER BY timestamp DESC LIMIT 5')
recent_records = cursor.fetchall()
response = f"Database Status:\nTotal records: {count}\n\nRecent entries:"
for record in recent_records:
response += f"\n\nUser: @{record[1]}"
response += f"\nMessage: {record[4][:50]}..."
response += f"\nScore: {record[6]}/10"
response += f"\nTimestamp: {record[7]}"
await event.respond(response)
except Exception as e:
print(f"Debug error: {e}")
await event.respond(f"Debug error: {e}")
@client.on(events.NewMessage(pattern='/ping'))
async def ping(event):
try:
await event.reply('Pong!') # Using reply instead of respond
logger.info("Responded to ping command")
except Exception as e:
logger.error(f"Error in ping handler: {e}")
# Add score checking command
@client.on(events.NewMessage(pattern='/score'))
async def check_score(event):
try:
# Get sender information from the message
sender = await event.get_sender()
user_id = sender.id
username = sender.username or str(user_id)
# Query the database for user's average scores
cursor.execute('''
SELECT AVG(average_score) as avg_score,
COUNT(*) as message_count
FROM user_scores
WHERE user_id = ?
''', (user_id,))
result = cursor.fetchone()
if result and result[0] is not None: # Check if user has any scores
avg_score = round(result[0], 2)
message_count = result[1]
await event.respond(f"@{username}'s stats:\nAverage Score: {avg_score}/10\nMessages Scored: {message_count}")
else:
await event.respond(f"@{username}, you don't have any scored messages yet!")
except Exception as e:
logger.error(f"Error in score handler: {e}")
await event.respond("Sorry, there was an error checking your score.")
# Add leaderboard command
@client.on(events.NewMessage(pattern='/leaderboard'))
async def show_leaderboard(event):
try:
# Get overall top contributors
cursor.execute('''
SELECT
username,
AVG(average_score) as avg_score,
COUNT(*) as message_count,
MAX(average_score) as best_score
FROM user_scores
GROUP BY username
HAVING message_count >= 3 -- Minimum 3 messages to qualify
ORDER BY avg_score DESC
LIMIT 5
''')
top_users = cursor.fetchall()
if top_users:
leaderboard = "π Community Leaderboard\n\n"
leaderboard += "Top Contributors:\n"
for i, (username, avg_score, msg_count, best_score) in enumerate(top_users, 1):
avg_score = round(float(avg_score), 2)
best_score = round(float(best_score), 2)
leaderboard += f"{i}. @{username}\n"
leaderboard += f" π Avg: {avg_score}/10\n"
leaderboard += f" π¬ Messages: {msg_count}\n"
leaderboard += f" β Best: {best_score}/10\n\n"
# Get today's top score
cursor.execute('''
SELECT username, average_score
FROM user_scores
WHERE date(timestamp) = date('now')
ORDER BY average_score DESC
LIMIT 1
''')
today_top = cursor.fetchone()
if today_top:
username, score = today_top
leaderboard += f"\nπ Today's Best: @{username} ({round(float(score), 2)}/10)"
else:
leaderboard = "No scores yet! Be the first to contribute! π"
await event.respond(leaderboard)
except Exception as e:
logger.error(f"Error in leaderboard handler: {e}")
await event.respond("Sorry, there was an error fetching the leaderboard.")
logger.info("Bot is running...")
await client.run_until_disconnected()
except Exception as e:
logger.error(f"Critical error: {e}", exc_info=True)
# Run the bot
if __name__ == '__main__':
import asyncio
asyncio.run(main())