-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscrapper.py
142 lines (124 loc) · 4.55 KB
/
scrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from selenium import webdriver
from error import MyError
from utils import *
from selenium.webdriver.support.ui import WebDriverWait
from config import *
from logger import obj_processed
options = webdriver.ChromeOptions()
options.add_argument('-headless')
options.add_argument('-no-sandbox')
options.add_argument('-disable-dev-shm-usage')
options.add_argument('-incognito')
if USER and PASS:
user_pass = f"{USER}:{PASS}@"
else:
user_pass = ""
def scrapper(q, lock):
global obj_processed
while not q.empty():
url, tries = q.get()
if url is None:
break
tries_check = None
try:
if tries > 4:
raise MyError(f"Tried five times {url}")
tries_check = False
except MyError as e:
print(f"At tries_check - {e.msg} - {url}")
tries_check = True
continue
finally:
if tries_check == True:
q.task_done()
# t1 = time.time()
driver_check = None
try:
driver = webdriver.Chrome(options=options)
driver_check = False
except Exception as e:
print(f"At driver_check - {e} - {url}")
q.put([url, tries])
driver_check = True
continue
finally:
if driver_check == True:
q.task_done()
get_check = None
try:
if user_pass in url:
driver.get(url)
else:
url = url.replace("https://", f"https://{user_pass}")
driver.get(url)
get_check = False
except Exception as e:
driver.close()
driver.quit()
print(f"At get_check - {e} - {url}")
q.put([url, tries])
get_check = True
continue
finally:
if get_check == True:
q.task_done()
if "exception" not in driver.title:
load_check = None
try:
# ts = time.time()
innerHTML = WebDriverWait(driver, 10).until(
is_loaded('list', driver))
# print(innerHTML.get_attribute("innerHTML")[16:30])
# print(f"{threading.currentThread().getName()} - time to load page ", time.time() - ts)
load_check = False
except Exception as e:
driver.close()
driver.quit()
print(f"At load_check - {e} - {url}")
q.put([url, tries+1])
load_check = True
continue
finally:
if load_check == True:
q.task_done()
# driver.save_screenshot("file.png")
total_length = finder(driver, 1)
try:
total_elements = driver.find_element_by_xpath(
"//*[@id='count']/span").text
# if total_elements:
# print(f"{threading.currentThread().getName()} - found")
except:
total_elements = ""
while not total_elements:
screen_height = driver.execute_script(
"return document.body.scrollHeight;") # get the screen height of the web
driver.execute_script(f"window.scrollTo(0, {screen_height});")
length = False
try:
length = WebDriverWait(driver, 10).until(
scroll_load(total_length, driver))
except Exception as e:
print(f"{e} - {url}")
# length = WebDriverWait(driver, 5).until(
# scroll_load(total_length, driver))
if length:
total_length = length
try:
total_elements = driver.find_element_by_xpath(
"//*[@id='count']/span").text
except:
total_elements = ""
# print(f"total elements found: {total_elements} in {threading.currentThread().getName()}")
# print(f"{threading.currentThread().getName()} - Time taken for function: {time.time()-t1}")
file_writer(driver, lock, q)
# driver.save_screenshot("file.png")
else:
print(f"{url} - {driver.title}")
driver.close()
driver.quit()
with lock:
obj_processed += 1
print(f"total folders processed: {obj_processed}")
# print(f"{threading.currentThread().getName()} - ending")
q.task_done()