-
Notifications
You must be signed in to change notification settings - Fork 74
/
chat.py
253 lines (202 loc) · 9.14 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#! /usr/bin/python3
### Created by Chirag Khatri
### github.com/zvovov
import sched
import sys
import threading
import time
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import WebDriverException as WebDriverException
config = {
'chromedriver_path': "{0}/bin/chromedriver".format(os.environ['HOME']),
'get_msg_interval': 5, # Time (seconds). Recommended value: 5
'colors': True, # True/False. True prints colorful msgs in console
'ww_url': "https://web.whatsapp.com/"
}
incoming_scheduler = sched.scheduler(time.time, time.sleep)
last_printed_msg = None
last_thread_name = ''
# colors in console
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
try:
def main():
global last_thread_name
if len(sys.argv) > 1:
# Use a data directory so that we can persist cookies per session and not have to
# authorize this application every time.
# NOTE: This gets created in your home directory and can get quite large over time.
# To fix this, simply delete this directory and re-authorize your WhatsApp Web session.
chrome_data_dir_directory = "{0}/.chrome/data_dir/whatsapp_web_cli".format(os.environ['HOME'])
if not os.path.exists(chrome_data_dir_directory):
os.makedirs(chrome_data_dir_directory)
driver_options = webdriver.ChromeOptions()
driver_options.add_argument("user-data-dir={0}".format(chrome_data_dir_directory))
# add proxy capability
# PROXY = "scheme://user:pass@host:port"
# driver_options.add_argument('--proxy-server=%s' % PROXY)
# setting up Chrome with selenium
driver = webdriver.Chrome(config['chromedriver_path'], options=driver_options)
# open WW in browser
driver.get(config['ww_url'])
# prompt user to connect device to WW
while True:
isConnected = input(decorateMsg("\n\tPhone connected? y/n: ", bcolors.HEADER))
if isConnected.lower() == 'y':
break
assert "WhatsApp" in driver.title
chooseReceiver(driver)
# getting true name of contact/group
last_thread_name = driver.find_element(By.XPATH, '//*[@id="main"]/header//span[contains(@dir, "auto")]').text
# start background thread
incoming_thread = threading.Thread(target=startGetMsg, args=(driver,))
incoming_thread.start()
while True:
msg = input().strip()
if len(msg) > 7 and 'sendto ' in msg[:7]:
chooseReceiver(driver, receiver=msg[7:])
elif msg == 'stopsending':
print(decorateMsg("\tYou will only receive msgs now.\n\tPress Ctrl+C to exit.", bcolors.WARNING))
# TODO: stop the incoming_scheduler event
break
else:
sendMsg(driver, msg)
else:
sys.exit(decorateMsg("\nError: Missing name of contact/group\npython chat.py <name>", bcolors.FAIL))
# open all contacts page
# driver.find_element(By.TAG_NAME, "button").click()
def sendMsg(driver, msg):
"""
Type 'msg' in 'driver' and press RETURN
"""
# select correct input box to type msg
input_box = driver.find_element(By.XPATH, '//*[@id="main"]//footer//div[contains(@contenteditable, "true")]')
# input_box.clear()
input_box.click()
action = ActionChains(driver)
action.send_keys(msg)
action.send_keys(Keys.RETURN)
action.perform()
def startGetMsg(driver):
"""
Start schdeuler that gets incoming msgs every get_msg_interval seconds
"""
incoming_scheduler.enter(config['get_msg_interval'], 1, getMsg, (driver, incoming_scheduler))
incoming_scheduler.run()
def getMsg(driver, scheduler):
"""
Get incoming msgs from the driver repeatedly
"""
global last_printed_msg
# print conversation name
curr_thread_name = printThreadName(driver)
try:
# get all msgs
all_msgs = driver.find_elements(By.XPATH, '//*[@id="main"]//div[contains(@class, "message")]')
# check if there is atleast one message in the chat
if len(all_msgs) >= 1:
last_msg_outgoing = outgoingMsgCheck(all_msgs[-1])
last_msg_sender, last_msg_text = getMsgMetaInfo(all_msgs[-1])
msgs_present = True
else:
msgs_present = False
except Exception as e:
print(e)
msgs_present = False
if msgs_present:
# if last msg was incoming
if not last_msg_outgoing:
# if last_msg is already printed
if last_printed_msg == last_msg_sender + last_msg_text:
pass
# else print new msgs
else:
print_from = 0
# loop from last msg to first
for i, curr_msg in reversed(list(enumerate(all_msgs))):
curr_msg_outgoing = outgoingMsgCheck(curr_msg)
curr_msg_sender, curr_msg_text = getMsgMetaInfo(curr_msg)
# if curr_msg is outgoing OR if last_printed_msg is found
if curr_msg_outgoing or last_printed_msg == curr_msg_sender + curr_msg_text:
# break
print_from = i
break
# Print all msgs from last printed msg till newest msg
for i in range(print_from + 1, len(all_msgs)):
msg_sender, msg_text = getMsgMetaInfo(all_msgs[i])
last_printed_msg = msg_sender + msg_text
print(decorateMsg(msg_sender + msg_text, bcolors.OKGREEN))
# add the task to the scheduler again
incoming_scheduler.enter(config['get_msg_interval'], 1, getMsg, (driver, scheduler,))
def outgoingMsgCheck(webdriver_element):
"""
Returns True if the selenium webdriver_element has "message-out" in its class.
False, otherwise.
"""
for _class in webdriver_element.get_attribute('class').split():
if _class == "message-out":
return True
return False
def getMsgMetaInfo(webdriver_element):
"""
Returns webdriver_element's sender and message text.
Message Text is a blank string, if it is a non-text message
TODO: Identify msg type and print accordingly
"""
# check for non-text message
try:
msg = webdriver_element.find_element(By.XPATH, './/div[contains(@class, "copyable-text")]')
msg_sender = msg.get_attribute('data-pre-plain-text')
msg_text = msg.find_elements(By.XPATH, './/span[contains(@class, "selectable-text")]')[-1].text
except IndexError:
msg_text = ""
except Exception:
msg_sender = ""
msg_text = ""
return msg_sender, msg_text
def decorateMsg(msg, color=None):
"""
Returns:
colored msg, if colors are enabled in config and a color is provided for msg
msg, otherwise
"""
msg_string = msg
if config['colors']:
if color:
msg_string = color + msg + bcolors.ENDC
return msg_string
def printThreadName(driver):
global last_thread_name
curr_thread_name = driver.find_element(By.XPATH, '//*[@id="main"]/header//span[contains(@dir, "auto")]').text
if curr_thread_name != last_thread_name:
last_thread_name = curr_thread_name
print(decorateMsg("\n\tSending msgs to:", bcolors.OKBLUE), curr_thread_name)
return curr_thread_name
def chooseReceiver(driver, receiver=None):
# search name of friend/group
friend_name = receiver if receiver else ' '.join(sys.argv[1:])
input_box = driver.find_element(By.XPATH, '//*[@id="side"]//div[contains(@class,"copyable-text selectable-text")]')
input_box.clear()
input_box.click()
input_box.send_keys(friend_name)
input_box.send_keys(Keys.RETURN)
printThreadName(driver)
if __name__ == '__main__':
main()
except AssertionError as e:
sys.exit(decorateMsg("\n\tCannot open Whatsapp web URL.", bcolors.WARNING))
except KeyboardInterrupt as e:
sys.exit(decorateMsg("\n\tPress Ctrl+C again to exit.", bcolors.WARNING))
except WebDriverException as e:
sys.exit(print(e, decorateMsg("\n\tChromedriver Error. Read the above error (if any), then\n\tCheck if installed chromedriver version is compatible with installed Chrome version.", bcolors.WARNING)))