-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrack_mastodon.py
63 lines (55 loc) · 1.81 KB
/
track_mastodon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import json
import os
import utils
import twit_api
import tweetdata
class TweepStream:
def __init__(self, user):
self.user = user
def stream(self):
"""Stream tweets from the actual users and reply to them.
:param list users: List of users to track.
returns: none
rtype: str
"""
utils.log_debug(f"[LOGGING]Check fresh tweets for {self.user}")
workdir = os.getcwd()
maxseendir = f"{workdir}/txt"
if not os.path.isdir(maxseendir):
os.mkdir(maxseendir)
try:
current = open(f"{maxseendir}/{self.user}.txt")
for i in current:
maxSeen = i.rstrip("\n")
maxSeen = int(maxSeen)
except:
maxSeen = None
newTweets = []
temp = twit_api.UserTimeline(self.user)
s = temp.list_user_tweets()
for t in s:
t = int(t)
if maxSeen is None:
maxSeen = t
break
if t <= maxSeen:
break
newTweets.append(t)
if newTweets:
maxSeen = max(maxSeen, newTweets[0])
for tweet in reversed(newTweets):
tweets = tweetdata.TweetData(tweet)
try:
test = tweets.reply_to_user()
except:
test = None
if test is not None:
if test != self.user:
utils.log_debug("[LOGGING]This is a reply")
else:
utils.log_debug("[LOGGING]Fresh or threaded tweet")
make_tweet = utils.Mastodon_Post(self.user, tweet)
make_tweet.post_mastodon()
with open(f"{maxseendir}/{self.user}.txt", "w") as f:
f.write(str(maxSeen))
utils.log_debug("[LOGGING] tweet id written.")