-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathzhihu.py
235 lines (207 loc) · 10.4 KB
/
zhihu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# -*- coding: utf-8 -*-
import re
import json
import datetime
try:
import urlparse as parse
except:
from urllib import parse
import os
import scrapy
from scrapy.loader import ItemLoader
from items import ZhihuQuestionItem, ZhihuAnswerItem
class ZhihuSpider(scrapy.Spider):
name = "zhihu"
allowed_domains = ["www.zhihu.com"]
start_urls = ['https://www.zhihu.com/']
#question的第一页answer的请求url
start_answer_url = "https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccollapsed_counts%2Creviewing_comments_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.is_blocking%2Cis_blocked%2Cis_followed%2Cvoteup_count%2Cmessage_thread_token%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}"
headers = {
"HOST": "www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0"
}
# custom_settings = {
# 'COOKIES_ENABLED': True,
# }
def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
#如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
#如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
#处理老版本页面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item
def parse_answer(self, reponse):
#处理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
#提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
def get_xsrf(self, session):
'''_xsrf 是一个动态变化的参数'''
index_url = 'https://www.zhihu.com'
# 获取登录时需要用到的_xsrf
index_page = session.get(index_url, headers=self.headers)
html = index_page.text
pattern = r'name="_xsrf" value="(.*?)"'
# 这里的_xsrf 返回的是一个list
_xsrf = re.findall(pattern, html)
return _xsrf[0]
# 获取验证码
def get_captcha(self, session):
import time
import requests
try:
from PIL import Image
except:
pass
t = str(int(time.time() * 1000))
captcha_url = 'https://www.zhihu.com/captcha.gif?r=' + t + "&type=login"
r = session.get(captcha_url, headers=self.headers)
with open('captcha.jpg', 'wb') as f:
f.write(r.content)
f.close()
# 用pillow 的 Image 显示验证码
# 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg'))
captcha = input("please input the captcha\n>")
return captcha
def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
import requests
session = requests.session()
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": self.get_xsrf(session),
"phone_num": "xxx",
"password": "xxxx",
"captcha": ""
}
import time
t = str(int(time.time() * 1000))
captcha_url = 'https://www.zhihu.com/captcha.gif?r=' + t + "&type=login&lang=cn"
yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.get_captcha_login)
return [scrapy.FormRequest(
url = post_url,
formdata = post_data,
headers=self.headers,
callback=self.check_login
)]
def get_captcha_login(self, response):
post_data = response.meta.get("post_data", "")
try:
from PIL import Image
except:
pass
with open('captcha.gif', 'wb') as f:
f.write(response.body)
f.close()
# 用pillow 的 Image 显示验证码
# 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
from zheye import zheye
z = zheye()
positions = z.Recognize('captcha.gif')
position_arr = []
if len(positions)==2:
if positions[1][1] < positions[0][1]:
position_arr.append([positions[1][1], int(positions[1][0])])
position_arr.append([positions[0][1], int(positions[0][0])])
else:
position_arr.append([positions[0][1], positions[0][0]])
position_arr.append([positions[1][1], positions[1][0]])
else:
position_arr.append([positions[0][1], positions[0][0]])
print(positions)
if len(positions) == 2:
post_data["captcha"] = '{"img_size": [200, 44], "input_points": [[%.2f, %.f], [%.2f, %.f]]}' % (
position_arr[0][0] / 2, position_arr[0][1] / 2, position_arr[1][0] / 2, position_arr[1][1] / 2)
else:
post_data["captcha"] = '{"img_size": [200, 44], "input_points": [[%.2f, %.f]]}' % (
position_arr[0][0] / 2, position_arr[0][1] / 2)
post_data['captcha_type'] = "cn"
print (post_data)
return [scrapy.FormRequest(
url = "https://www.zhihu.com/login/phone_num",
formdata = post_data,
headers=self.headers,
callback=self.check_login
)]
def check_login(self, response):
#验证服务器的返回数据判断是否成功
text_json = json.loads(response.text)
if "msg" in text_json and text_json["msg"] == "登录成功":
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)