forked from harperreed/twitteroauth-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
151 lines (127 loc) · 5.56 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'''
Python Oauth client for Twitter
Used the SampleClient from the OAUTH.org example python client as basis.
props to leahculver for making a very hard to use but in the end usable oauth lib.
'''
import httplib
import urllib
import time
import webbrowser
import oauth as oauth
from urlparse import urlparse
class TwitterOAuthClient(oauth.OAuthClient):
api_root_url = 'https://twitter.com' #for testing 'http://term.ie'
api_root_port = "80"
#set api urls
def request_token_url(self):
return self.api_root_url + '/oauth/request_token'
def authorize_url(self):
return self.api_root_url + '/oauth/authorize'
def access_token_url(self):
return self.api_root_url + '/oauth/access_token'
#oauth object
def __init__(self, consumer_key, consumer_secret, oauth_token=None, oauth_token_secret=None):
self.sha1_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
self.consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
if ((oauth_token != None) and (oauth_token_secret!=None)):
self.token = oauth.OAuthConsumer(oauth_token, oauth_token_secret)
else:
self.token = None
def oauth_request(self,url, args = {}, method=None):
if (method==None):
if args=={}:
method = "GET"
else:
method = "POST"
req = oauth.OAuthRequest.from_consumer_and_token(self.consumer, self.token, method, url, args)
req.sign_request(self.sha1_method, self.consumer,self.token)
if (method=="GET"):
return self.http_wrapper(req.to_url())
elif (method == "POST"):
return self.http_wrapper(req.get_normalized_http_url(),req.to_postdata())
# trying to make a more robust http wrapper. this is a failure ;)
def http_wrapper_fucked(self, url, postdata=""):
parsed_url = urlparse(url)
connection_url = parsed_url.path+"?"+parsed_url.query
hostname = parsed_url.hostname
scheme = parsed_url.scheme
headers = {'Content-Type' :'application/x-www-form-urlencoded'}
if scheme=="https":
connection = httplib.HTTPSConnection(hostname)
else:
connection = httplib.HTTPConnection(hostname)
connection.request("POST", connection_url, body=postdata, headers=headers)
connection_response = connection.getresponse()
self.last_http_status = connection_response.status
self.last_api_call= url
response= connection_response.read()
#this is barely working. (i think. mostly it is that everyone else is using httplib)
def http_wrapper(self, url, postdata={}):
try:
if (postdata != {}):
f = urllib.urlopen(url, postdata)
else:
f = urllib.urlopen(url)
response = f.read()
except:
response = ""
return response
def get_request_token(self):
response = self.oauth_request(self.request_token_url())
token = self.oauth_parse_response(response)
try:
self.token = oauth.OAuthConsumer(token['oauth_token'],token['oauth_token_secret'])
return token
except:
raise oauth.OAuthError('Invalid oauth_token')
def oauth_parse_response(self, response_string):
r = {}
for param in response_string.split("&"):
pair = param.split("=")
if (len(pair)!=2):
break
r[pair[0]]=pair[1]
return r
def get_authorize_url(self, token):
return self.authorize_url() + '?oauth_token=' +token
def get_access_token(self,token=None):
r = self.oauth_request(self.access_token_url())
token = self.oauth_parse_response(r)
self.token = oauth.OAuthConsumer(token['oauth_token'],token['oauth_token_secret'])
return token
def oauth_request(self, url, args={}, method=None):
if (method==None):
if args=={}:
method = "GET"
else:
method = "POST"
req = oauth.OAuthRequest.from_consumer_and_token(self.consumer, self.token, method, url, args)
req.sign_request(self.sha1_method, self.consumer,self.token)
if (method=="GET"):
return self.http_wrapper(req.to_url())
elif (method == "POST"):
return self.http_wrapper(req.get_normalized_http_url(),req.to_postdata())
if __name__ == '__main__':
consumer_key = ''
consumer_secret = ''
while not consumer_key:
consumer_key = raw_input('Please enter consumer key: ')
while not consumer_secret:
consumer_secret = raw_input('Please enter consumer secret: ')
auth_client = TwitterOAuthClient(consumer_key,consumer_secret)
tok = auth_client.get_request_token()
token = tok['oauth_token']
token_secret = tok['oauth_token_secret']
url = auth_client.get_authorize_url(token)
webbrowser.open(url)
print "Visit this URL to authorize your app: " + url
response_token = raw_input('What is the oauth_token from twitter: ')
response_client = TwitterOAuthClient(consumer_key, consumer_secret,token, token_secret)
tok = response_client.get_access_token()
print "Making signed request"
#verify user access
content = response_client.oauth_request('https://twitter.com/account/verify_credentials.json', method='POST')
#make an update
#content = response_client.oauth_request('https://twitter.com/statuses/update.xml', {'status':'Updated from a python oauth client. awesome.'}, method='POST')
print content
print 'Done.'