-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslack_client.py
72 lines (59 loc) · 1.58 KB
/
slack_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
import requests
class SlackClient:
def __init__(
self,
api_token: str = None
):
self.base_url = "https://slack.com/api"
self.api_token = api_token
self.headers = {
"Authorization": f"Bearer {self.api_token}",
}
def _request(
self,
method: str,
url: str,
params: dict = None,
):
print(f"method: {method}")
print(f"url: {url}")
print(f"params: {params}")
try:
response = requests.request(method, url, params=params, headers=self.headers)
except Exception as e:
raise Exception(e)
response_json = response.json()
if (
not response_json["ok"] or
response.status_code >= 400
):
raise Exception(response.json())
return response
def get_conversations_history(
self,
channel: str,
limit: int = 1000,
latest: str = None,
oldest: str = None,
):
url = f"{self.base_url}/conversations.history"
method = "GET"
params = {
"channel": channel,
"limit": limit,
}
if latest is not None:
params["latest"] = latest
if oldest is not None:
params["oldest"] = oldest
response = self._request(method, url, params=params)
return response.json()
def send_slack_message(
self,
url: str,
message: str
):
requests.post(url, data=json.dumps({
"text": message,
}))