-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_api_service.py
550 lines (438 loc) · 19.4 KB
/
twitter_api_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
import os
import json
from typing import Optional, Generator
import tweepy
from exceptions import TwitterAPISetupError, PrivateAccountError
from utils import logger
FriendGenerator = Generator[tuple[tweepy.user.User, tweepy.tweet.Tweet], None, None]
FollowerGenerator = Generator[tuple[tweepy.user.User, tweepy.tweet.Tweet], None, None]
TweetGenerator = Generator[tuple[tweepy.tweet.Tweet, dict], None, None]
class TwitterAPIService:
"""Handle API requests
:type forme: bool
:param forme: Whether the API will be used for account owner or authorized user
"""
def __init__(self, forme: Optional[bool] = False) -> None:
self._forme = forme
self._api_v1 = None
self._api_v2 = None
self._authorized_client = None
self._current_client = None
self._external_user_creds_file = "external_user_creds.json"
def setup_api_access(self) -> None:
"""Setup access for developer or authorized(external) user"""
if self._forme:
self._setup_api_access_v2()
self._current_client = self._api_v2
else:
self._authorize_with_pin()
self._current_client = self._authorized_client
def get_user(
self,
username: str,
user_fields: Optional[list[str]] = None,
expansions: Optional[str] = None,
user_auth: Optional[bool] = False,
) -> tuple:
"""Get user given by username
Pass user fields, tweet fields, and expansions for additional data.
https://docs.tweepy.org/en/latest/client.html#user-fields
https://docs.tweepy.org/en/latest/client.html#expansions
:type username: str
:param username: Twitter username
:type user_fields: list
:param user_fields: Additional user fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:rtype: tuple
:returns: User data and includes objects as tuple
"""
response = self._current_client.get_user(
username=username,
user_fields=user_fields,
expansions=expansions,
user_auth=user_auth,
)
return (response.data, response.includes)
def get_users(
self,
usernames: str,
user_fields: Optional[list[str]] = None,
expansions: Optional[str] = None,
user_auth: Optional[bool] = False,
) -> list:
"""Get users given by usernames
Pass user fields, tweet fields, and expansions for additional data.
https://docs.tweepy.org/en/latest/client.html#user-fields
https://docs.tweepy.org/en/latest/client.html#expansions
:type usernames: list
:param usernames: Twitter usernames
:type user_fields: list
:param user_fields: Additional user fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:rtype: list
:returns: List of user data and includes objects as tuple
"""
response = self._current_client.get_users(
usernames=usernames,
user_fields=user_fields,
expansions=expansions,
user_auth=user_auth,
)
users_data, users_includes = response.data, response.includes.get("tweets", [])
user_include_pairs = []
# There are less pinned tweets than users, so we need to match them.
for user_data in users_data:
pinned_tweet_id = user_data.pinned_tweet_id
for tweet in users_includes:
if pinned_tweet_id == tweet["id"]:
user_include_pairs.append((user_data, tweet))
break
else:
user_include_pairs.append((user_data, None))
return user_include_pairs
def get_friends(
self,
username: str,
user_fields: Optional[list[str]] = None,
expansions: Optional[str] = None,
user_auth: Optional[bool] = False,
max_results: Optional[int] = 1000,
) -> FriendGenerator:
"""Get friends data for the username
:type username: str
:param username: Twitter username
:type user_fields: list
:param user_fields: Additional user fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:type max_results: int
:param max_results: Number of maximum results to get for a page
:rtype: Generator
:returns: List of user data and includes objects as tuple
"""
if self._is_account_protected(username, user_auth=user_auth):
raise PrivateAccountError("Could not extract data from private account!")
user = self.get_user(username, user_auth=user_auth)[0]
for response in tweepy.Paginator(
self._current_client.get_users_following,
user.id,
max_results=max_results,
user_fields=user_fields,
expansions=expansions,
user_auth=user_auth,
):
friends_data, friends_includes = response.data, response.includes.get("tweets", [])
user_include_pairs = []
for friend_data in friends_data:
pinned_tweet_id = friend_data.pinned_tweet_id
for tweet in friends_includes:
if pinned_tweet_id == tweet["id"]:
user_include_pairs.append((friend_data, tweet))
break
else:
user_include_pairs.append((friend_data, None))
for friend_data in user_include_pairs:
yield friend_data
def get_followers(
self,
username: str,
user_fields: Optional[list[str]] = None,
expansions: Optional[str] = None,
user_auth: Optional[bool] = False,
max_results: Optional[int] = 1000,
) -> FollowerGenerator:
"""Get followers data for the username
:type username: str
:param username: Twitter username
:type user_fields: list
:param user_fields: Additional user fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:type max_results: int
:param max_results: Number of maximum results to get for a page
:rtype: Generator
:returns: List of user data and includes objects as tuple
"""
if self._is_account_protected(username, user_auth=user_auth):
raise PrivateAccountError("Could not extract data from private account!")
user = self.get_user(username, user_auth=user_auth)[0]
for response in tweepy.Paginator(
self._current_client.get_users_followers,
user.id,
max_results=max_results,
user_fields=user_fields,
expansions=expansions,
user_auth=user_auth,
):
followers_data, followers_includes = response.data, response.includes.get("tweets", [])
user_include_pairs = []
for follower_data in followers_data:
pinned_tweet_id = follower_data.pinned_tweet_id
for tweet in followers_includes:
if pinned_tweet_id == tweet["id"]:
user_include_pairs.append((follower_data, tweet))
break
else:
user_include_pairs.append((follower_data, None))
for follower_data in user_include_pairs:
yield follower_data
def get_user_tweets(
self,
username: str,
tweet_fields: Optional[list[str]] = None,
place_fields: Optional[list[str]] = None,
media_fields: Optional[list[str]] = None,
expansions: Optional[list[str]] = None,
exclude: Optional[list[str]] = None,
max_results: Optional[int] = 100,
user_auth: Optional[bool] = False,
) -> TweetGenerator:
"""Get tweets for the given username
:type username: str
:param username: Twitter username
:type tweet_fields: list
:param tweet_fields: Additional tweet fields to get
:type place_fields: list
:param place_fields: Additional place fields to get
:type media_fields: list
:param media_fields: Additional media fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type exclude: list
:param exclude: List of fields to exclude (replies,retweets)
:type max_results: int
:param max_results: Number of maximum results to get for a page
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:rtype: Generator
:returns: List of tweet data and includes objects as tuple
"""
if self._is_account_protected(username, user_auth=user_auth):
raise PrivateAccountError("Could not extract data from private account!")
user = self.get_user(username)[0]
for response in tweepy.Paginator(
self._current_client.get_users_tweets,
user.id,
tweet_fields=tweet_fields,
place_fields=place_fields,
media_fields=media_fields,
expansions=expansions,
exclude=exclude,
max_results=max_results,
user_auth=user_auth,
):
tweets_data, tweets_includes = response.data, response.includes
tweet_include_pairs = []
if tweets_data:
for tweet_data in tweets_data:
includes = {}
if tweet_data.attachments and "media_keys" in tweet_data.attachments:
media_keys = tweet_data.attachments["media_keys"]
if "media" in tweets_includes:
includes["media"] = []
for media_item in tweets_includes["media"]:
if media_item.media_key in media_keys:
includes["media"].append(media_item)
if tweet_data.geo and "places" in tweets_includes:
includes["places"] = []
place_id = tweet_data.geo["place_id"]
for place_item in tweets_includes["places"]:
if place_id == place_item.id:
includes["places"].append(place_item)
if includes:
tweet_include_pairs.append((tweet_data, includes))
else:
tweet_include_pairs.append((tweet_data, None))
for tweet_data in tweet_include_pairs:
yield tweet_data
def get_search_tweets(
self,
search_keyword: str,
excludes: Optional[list[str]] = None,
tweet_fields: Optional[list[str]] = None,
place_fields: Optional[list[str]] = None,
media_fields: Optional[list[str]] = None,
expansions: Optional[list[str]] = None,
max_results: Optional[int] = 100,
user_auth: Optional[bool] = False,
) -> TweetGenerator:
"""Extract latest tweets for the given search keyword
:type search_keyword: str
:param search_keyword: Keyword to search
:type tweet_fields: list
:param tweet_fields: Additional tweet fields to get
:type place_fields: list
:param place_fields: Additional place fields to get
:type media_fields: list
:param media_fields: Additional media fields to get
:type expansions: list
:param expansions: Additional data objects to get
:type max_results: int
:param max_results: Number of maximum results to get for a page
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:rtype: Generator
:returns: List of tweet data and includes objects as tuple
"""
tweet_fields.append("author_id")
expansions.append("author_id")
user_fields = [
"created_at",
"description",
"entities",
"location",
"profile_image_url",
"protected",
"public_metrics",
"url",
"verified",
]
query = f"{search_keyword}"
for exclude in excludes:
if exclude == "replies":
query += " -is:reply"
elif exclude == "retweets":
query += " -is:retweet"
for response in tweepy.Paginator(
self._current_client.search_recent_tweets,
query,
tweet_fields=tweet_fields,
user_fields=user_fields,
place_fields=place_fields,
media_fields=media_fields,
expansions=expansions,
max_results=max_results,
user_auth=user_auth,
):
tweets_data, tweets_includes = response.data, response.includes
tweet_include_pairs = []
for tweet_data in tweets_data:
includes = {}
if tweet_data.attachments and "media_keys" in tweet_data.attachments:
media_keys = tweet_data.attachments["media_keys"]
if "media" in tweets_includes:
includes["media"] = []
for media_item in tweets_includes["media"]:
if media_item.media_key in media_keys:
includes["media"].append(media_item)
if tweet_data.geo and "places" in tweets_includes:
includes["places"] = []
place_id = tweet_data.geo["place_id"]
for place_item in tweets_includes["places"]:
if place_id == place_item.id:
includes["places"].append(place_item)
if tweet_data.author_id and "users" in tweets_includes:
author_id = tweet_data.author_id
for user in tweets_includes["users"]:
if author_id == user.id:
includes["author"] = user
if includes:
tweet_include_pairs.append((tweet_data, includes))
else:
tweet_include_pairs.append((tweet_data, None))
for tweet_data in tweet_include_pairs:
yield tweet_data
def _is_account_protected(self, username: str, user_auth: Optional[bool] = False) -> bool:
"""Check if account is protected
:type username: str
:param username: Twitter username
:type user_auth: bool
:param user_auth: Whether requests are done on behalf of another account
:rtype: bool
:returns: Whether account is protected
"""
response = self._current_client.get_user(
username=username, user_fields="protected", user_auth=user_auth
)
return response.data.protected
def _setup_api_access_v1(self) -> None:
"""Setup access for Twitter v1 API"""
logger.debug("Setting up v1 API access...")
try:
ACCESS_TOKEN = os.environ["TWITTER_ACCESS_TOKEN"]
ACCESS_TOKEN_SECRET = os.environ["TWITTER_ACCESS_TOKEN_SECRET"]
CONSUMER_KEY = os.environ["TWITTER_CONSUMER_KEY"]
CONSUMER_SECRET = os.environ["TWITTER_CONSUMER_SECRET"]
except KeyError as exp:
raise TwitterAPISetupError(
"Failed to find credentials setup! Setup environment variables."
) from exp
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
self._api_v1 = tweepy.API(auth, wait_on_rate_limit=True)
def _setup_api_access_v2(self) -> None:
"""Setup access for Twitter v2 API as app"""
logger.debug("Setting up v2 API access...")
try:
BEARER_TOKEN = os.environ["TWITTER_BEARER_TOKEN_CODE"]
except KeyError as exp:
raise TwitterAPISetupError(
"Failed to find credentials setup! Setup environment variables."
) from exp
self._api_v2 = tweepy.Client(bearer_token=BEARER_TOKEN, wait_on_rate_limit=True)
def _authorize_with_pin(self) -> None:
"""Authorize user using the PIN authentication"""
logger.debug("Setting up v2 API access for authorized user...")
try:
CONSUMER_KEY = os.environ["TWITTER_CONSUMER_KEY_CODE"]
CONSUMER_SECRET = os.environ["TWITTER_CONSUMER_SECRET_CODE"]
except KeyError as exp:
raise TwitterAPISetupError(
"Failed to find credentials setup! Setup environment variables."
) from exp
oauth1_user_handler = tweepy.OAuth1UserHandler(
CONSUMER_KEY, CONSUMER_SECRET, callback="oob"
)
if not self._is_credentials_exist():
print("\nPlease get the PIN from the following URL\n")
print(oauth1_user_handler.get_authorization_url())
verifier = input("\nEnter PIN: ")
access_token, access_token_secret = oauth1_user_handler.get_access_token(verifier)
self._save_on_behalf_user_credentials(access_token, access_token_secret)
else:
access_token, access_token_secret = self._get_external_user_credentials()
self._authorized_client = tweepy.Client(
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=access_token,
access_token_secret=access_token_secret,
wait_on_rate_limit=True,
)
logger.info(
f"Performing operations on behalf of {self._authorized_client.get_me().data.username}"
)
def _save_on_behalf_user_credentials(self, access_token: str, access_token_secret: str) -> None:
"""Save access credentials for the on behalf user
:type access_token: str
:param access_token: Authorized user access token
:type access_token_secret: str
:param access_token_secret: Authorized user access token secret
"""
data = {"access_token": access_token, "access_token_secret": access_token_secret}
with open(self._external_user_creds_file, "w") as creds_file:
json.dump(data, creds_file)
def _get_external_user_credentials(self) -> tuple[str, str]:
"""Read access credentials from file for authorized account
:rtype: tuple
:returns: Access token and access token secret pair
"""
logger.debug("Reading credentials from file...")
with open(self._external_user_creds_file) as creds_file:
creds_data = json.load(creds_file)
return (creds_data["access_token"], creds_data["access_token_secret"])
def _is_credentials_exist(self) -> bool:
"""Check if the credentials file exists
:rtype: bool
:returns: Whether credentials file exists
"""
return os.path.exists(self._external_user_creds_file)