-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
126 lines (98 loc) · 4.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
# coding: utf-8
# stickers - main.py
# 2022-10-20 19:49
import logging
import os
import shutil
import tempfile
from pathlib import Path
from pyrogram import Client, enums, filters, types
from pyrogram.raw import functions
from pyrogram.raw import types as raw_types
from tgbot_ping import get_runtime
import constants
from helpers import converter, get_ext_from_mime, get_file_id, zip_dir
API_ID = int(os.getenv("API_ID"))
API_HASH = os.getenv("API_HASH")
BOT_TOKEN = os.getenv("BOT_TOKEN")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
app = Client("sticker", API_ID, API_HASH, bot_token=BOT_TOKEN, in_memory=True)
batch = {} # uid - temp dir
@app.on_message(filters.command(["start"]))
async def start_handler(client: "Client", message: "types.Message"):
await message.reply_text(constants.start_text)
@app.on_message(filters.command(["help"]))
async def help_handler(client: "Client", message: "types.Message"):
await message.reply_text(constants.help_text)
@app.on_message(filters.command(["ping"]))
def ping_handler(client: "Client", message: "types.Message"):
message.reply_chat_action(enums.ChatAction.TYPING)
if os.uname().sysname == "Darwin":
bot_info = "test"
else:
bot_info = get_runtime("botsrunner_stickers_1")
message.reply_text(bot_info)
@app.on_message(filters.command(["batch_start", "batch_stop"]))
async def batch_start(client: "Client", message: "types.Message"):
if message.text == "/batch_start":
batch[message.from_user.id] = tempfile.mkdtemp()
await message.reply_text(constants.batch_start)
else:
target_dir = batch.pop(message.from_user.id)
with tempfile.NamedTemporaryFile(suffix=".zip") as zip_filepath:
zip_dir(target_dir, zip_filepath)
await message.reply_chat_action(enums.ChatAction.UPLOAD_DOCUMENT)
await message.reply_document(zip_filepath.name)
shutil.rmtree(target_dir)
@app.on_message(filters.text & filters.incoming & filters.regex(r"https://t.me/addstickers/.*"))
async def entire_set(client: "Client", message: "types.Message"):
short_name = message.text.replace("https://t.me/addstickers/", "")
s = raw_types.InputStickerSetShortName(short_name=short_name)
packs: raw_types.messages.StickerSet = await client.invoke(functions.messages.GetStickerSet(stickerset=s, hash=0))
tempdir = tempfile.mkdtemp()
for doc in packs.documents:
file_id = get_file_id(doc, packs.set.id, packs.set.access_hash)
ext = get_ext_from_mime(doc.mime_type)
with open(Path(tempdir).joinpath(f"{doc.id}{ext}"), "wb") as file:
async for chunk in client.get_file(file_id):
file.write(chunk)
# convert all files in tempdir, delete old file
logging.info("Converting files for entire sticker set...")
for f in Path(tempdir).glob("*"):
converter(f)
f.unlink()
with tempfile.NamedTemporaryFile(suffix=".zip") as zip_filepath:
zip_dir(tempdir, zip_filepath)
await message.reply_chat_action(enums.ChatAction.UPLOAD_DOCUMENT)
await message.reply_document(zip_filepath.name)
shutil.rmtree(tempdir)
@app.on_message(filters.text & filters.incoming)
async def emoji_sticker_handler(client: "Client", message: "types.Message"):
stickers = await app.get_custom_emoji_stickers(
[i.custom_emoji_id for i in message.entities if i.type.name == 'CUSTOM_EMOJI' if i]
)
for sticker in stickers:
await message.reply_chat_action(enums.ChatAction.UPLOAD_DOCUMENT)
with tempfile.NamedTemporaryFile(suffix=sticker.file_name) as temp:
await client.download_media(sticker.file_id, file_name=temp.name)
target_file = converter(temp.name)
await client.send_document(message.chat.id, target_file)
@app.on_message(filters.sticker & filters.incoming)
async def sticker_handler(client: "Client", message: "types.Message"):
await message.reply_chat_action(enums.ChatAction.TYPING)
if temp := batch.get(message.from_user.id):
file_id = message.sticker.file_id
ext = message.sticker.file_name.split(".")[-1]
filepath: "Path" = Path(temp).joinpath(f"{file_id}.{ext}")
await client.download_media(message, file_name=filepath.as_posix())
converter(filepath)
filepath.unlink()
await message.reply_text(constants.batch_save)
else:
with tempfile.NamedTemporaryFile(suffix=message.sticker.file_name) as temp:
await client.download_media(message, file_name=temp.name)
target_file = converter(temp.name)
await client.send_document(message.chat.id, target_file)
if __name__ == '__main__':
app.run()