forked from c29r3/icloud-hidemyemail-generator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
197 lines (162 loc) · 6.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import asyncio
import datetime
import aiofiles
import os
from typing import Union, List, Optional
import re
from rich.text import Text
from rich.prompt import IntPrompt
from rich.console import Console
from rich.table import Table
from icloud import HideMyEmail
MAX_CONCURRENT_TASKS = 2
async def write_email_to_file(email: str, filename: str = 'emails.txt'):
async with aiofiles.open(filename, 'a') as f:
await f.write(email + '\n')
class RichHideMyEmail(HideMyEmail):
_cookie_file = "cookie.txt"
def __init__(self):
super().__init__()
self.console = Console()
self.table = Table()
if os.path.exists(self._cookie_file):
# load in a cookie string from file
with open(self._cookie_file, "r") as f:
self.cookies = [line for line in f if not line.startswith("//")][0]
else:
self.console.log(
'[bold yellow][WARN][/] No "cookie.txt" file found! Generation might not work due to unauthorized access.'
)
async def _generate_one(self) -> Union[str, None]:
# First, generate an email
gen_res = await self.generate_email()
if not gen_res:
return
elif "You have reached the limit" in str(gen_res):
error = gen_res["error"] if "error" in gen_res else {}
err_msg = "Unknown"
if type(error) == int and "reason" in gen_res:
err_msg = gen_res["reason"]
elif type(error) == dict and "errorMessage" in error:
err_msg = error["errorMessage"]
self.console.log(
f"[bold red][ERR][/] - Failed to generate email. Reason: {err_msg}"
)
await asyncio.sleep(1810)
return
email = gen_res["result"]["hme"]
self.console.log(f'[50%] "{email}" - Successfully generated')
# Then, reserve it
reserve_res = await self.reserve_email(email)
if not reserve_res:
return
elif "success" not in reserve_res or not reserve_res["success"]:
error = reserve_res["error"] if "error" in reserve_res else {}
err_msg = "Unknown"
if type(error) == int and "reason" in reserve_res:
err_msg = reserve_res["reason"]
elif type(error) == dict and "errorMessage" in error:
err_msg = error["errorMessage"]
self.console.log(
f'[bold red][ERR][/] "{email}" - Failed to reserve email. Reason: {err_msg}'
)
await asyncio.sleep(30)
return
self.console.log(f'[100%] "{email}" - Successfully reserved')
await write_email_to_file(email=email)
return email
async def _generate(self, num: int):
tasks = []
for _ in range(num):
task = asyncio.ensure_future(self._generate_one())
await asyncio.sleep(3)
tasks.append(task)
return filter(lambda e: e is not None, await asyncio.gather(*tasks))
async def generate(self, count: Optional[int]) -> List[str]:
try:
emails = []
self.console.rule()
if count is None:
s = IntPrompt.ask(
Text.assemble(("How many iCloud emails you want to generate?")),
console=self.console,
)
count = int(s)
self.console.log(f"Generating {count} email(s)...")
self.console.rule()
with self.console.status(f"[bold green]Generating iCloud email(s)..."):
while count > 0:
batch = await self._generate(
count if count < MAX_CONCURRENT_TASKS else MAX_CONCURRENT_TASKS
)
count -= MAX_CONCURRENT_TASKS
emails += batch
if len(emails) > 0:
with open("emails_total.txt", "a+") as f:
f.write(os.linesep.join(emails) + os.linesep)
self.console.rule()
self.console.log(
f':star: Emails have been saved into the "emails.txt" file'
)
self.console.log(
f"[bold green]All done![/] Successfully generated [bold green]{len(emails)}[/] email(s)"
)
return emails
except KeyboardInterrupt:
return []
async def list(self, active: bool, search: str) -> None:
gen_res = await self.list_email()
if not gen_res:
return
if "success" not in gen_res or not gen_res["success"]:
error = gen_res["error"] if "error" in gen_res else {}
err_msg = "Unknown"
if type(error) == int and "reason" in gen_res:
err_msg = gen_res["reason"]
elif type(error) == dict and "errorMessage" in error:
err_msg = error["errorMessage"]
self.console.log(
f"[bold red][ERR][/] - Failed to generate email. Reason: {err_msg}"
)
return
self.table.add_column("Label")
self.table.add_column("Hide my email")
self.table.add_column("Created Date Time")
self.table.add_column("IsActive")
for row in gen_res["result"]["hmeEmails"]:
if row["isActive"] == active:
if search is not None and re.search(search, row["label"]):
self.table.add_row(
row["label"],
row["hme"],
str(
datetime.datetime.fromtimestamp(
row["createTimestamp"] / 1000
)
),
str(row["isActive"]),
)
else:
self.table.add_row(
row["label"],
row["hme"],
str(
datetime.datetime.fromtimestamp(
row["createTimestamp"] / 1000
)
),
str(row["isActive"]),
)
self.console.print(self.table)
async def generate(count: Optional[int]) -> None:
async with RichHideMyEmail() as hme:
await hme.generate(count)
async def list(active: bool, search: str) -> None:
async with RichHideMyEmail() as hme:
await hme.list(active, search)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(generate(None))
except KeyboardInterrupt:
pass