forked from NocoFlux/TG-File-Sharing-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper_func.py
115 lines (101 loc) · 3.69 KB
/
helper_func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import base64
import re
import asyncio
from pyrogram import filters
from pyrogram.enums import ChatMemberStatus
from config import FORCE_SUB_CHANNEL_1, FORCE_SUB_CHANNEL_2, FORCE_SUB_CHANNEL_3, FORCE_SUB_CHANNEL_4, ADMINS
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.errors import FloodWait
async def is_subscribed(filter, client, update):
if not (FORCE_SUB_CHANNEL_1 or FORCE_SUB_CHANNEL_2 or FORCE_SUB_CHANNEL_3 or FORCE_SUB_CHANNEL_4):
return True
user_id = update.from_user.id
if user_id in ADMINS:
return True
member_status = ChatMemberStatus.OWNER, ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.MEMBER
for channel_id in [FORCE_SUB_CHANNEL_1, FORCE_SUB_CHANNEL_2, FORCE_SUB_CHANNEL_3, FORCE_SUB_CHANNEL_4]:
if not channel_id:
continue
try:
member = await client.get_chat_member(chat_id=channel_id, user_id=user_id)
except UserNotParticipant:
return False
if member.status not in member_status:
return False
return True
async def encode(string):
string_bytes = string.encode("ascii")
base64_bytes = base64.urlsafe_b64encode(string_bytes)
base64_string = (base64_bytes.decode("ascii")).strip("=")
return base64_string
async def decode(base64_string):
base64_string = base64_string.strip("=")
base64_bytes = (base64_string + "=" * (-len(base64_string) % 4)).encode("ascii")
string_bytes = base64.urlsafe_b64decode(base64_bytes)
string = string_bytes.decode("ascii")
return string
async def get_messages(client, message_ids):
messages = []
total_messages = 0
while total_messages != len(message_ids):
temb_ids = message_ids[total_messages:total_messages+200]
try:
msgs = await client.get_messages(
chat_id=client.db_channel.id,
message_ids=temb_ids
)
except FloodWait as e:
await asyncio.sleep(e.x)
msgs = await client.get_messages(
chat_id=client.db_channel.id,
message_ids=temb_ids
)
except:
pass
total_messages += len(temb_ids)
messages.extend(msgs)
return messages
async def get_message_id(client, message):
if message.forward_from_chat:
if message.forward_from_chat.id == client.db_channel.id:
return message.forward_from_message_id
else:
return 0
elif message.forward_sender_name:
return 0
elif message.text:
pattern = "https://t.me/(?:c/)?(.*)/(\d+)"
matches = re.match(pattern, message.text)
if not matches:
return 0
channel_id = matches.group(1)
msg_id = int(matches.group(2))
if channel_id.isdigit():
if f"-100{channel_id}" == str(client.db_channel.id):
return msg_id
else:
if channel_id == client.db_channel.username:
return msg_id
else:
return 0
def get_readable_time(seconds: int) -> str:
count = 0
up_time = ""
time_list = []
time_suffix_list = ["s", "m", "h", "days"]
while count < 4:
count += 1
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
if seconds == 0 and remainder == 0:
break
time_list.append(int(result))
seconds = int(remainder)
hmm = len(time_list)
for x in range(hmm):
time_list[x] = str(time_list[x]) + time_suffix_list[x]
if len(time_list) == 4:
up_time += f"{time_list.pop()}, "
time_list.reverse()
up_time += ":".join(time_list)
return up_time
subscribed = filters.create(is_subscribed)