Skip to content

Commit

Permalink
Merge pull request #30 from sgratzl/sgratzl/pagination
Browse files Browse the repository at this point in the history
use pagination api
  • Loading branch information
sgratzl authored Jun 25, 2020
2 parents f8b4a24 + 5e0e38e commit 4c8fbae
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 112 deletions.
4 changes: 1 addition & 3 deletions slack_cleaner2/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def _create_default_logger(to_file=False):
log.removeHandler(handler)
if to_file:
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
file_log_handler = logging.FileHandler(
"slack-cleaner." + ts + ".log")
file_log_handler = logging.FileHandler("slack-cleaner." + ts + ".log")
file_log_handler.setLevel(logging.DEBUG)
log.addHandler(file_log_handler)

Expand Down Expand Up @@ -95,7 +94,6 @@ def deleted(self, error: Optional[Exception] = None):
sys.stdout.write(".")
sys.stdout.flush()


def group(self, name: str) -> SlackLoggerLayer:
"""
push another log group
Expand Down
223 changes: 114 additions & 109 deletions slack_cleaner2/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class SlackChannelType(Enum):
"""
enum class for defining the channel type
"""

PUBLIC = 1
PRIVATE = 2
MPIM = 3
Expand Down Expand Up @@ -201,88 +202,73 @@ def __repr__(self):

def _scope(self):
if self.type == SlackChannelType.PRIVATE:
return 'groups:history'
return "groups:history"
if self.type == SlackChannelType.MPIM:
return 'mpim:history'
return "mpim:history"
if self.type == SlackChannelType.IM:
return 'im:history'
return 'channels:history'
return "im:history"
return "channels:history"

def msgs(self, after: TimeIsh = None, before: TimeIsh = None, asc=False, with_replies=False) -> Iterator["SlackMessage"]:
"""
retrieve all messages as a generator
def _iter_message(self, list_f, after: TimeIsh = None, before: TimeIsh = None, asc=False, with_replies=False) -> Iterator["SlackMessage"]:
:param after: limit to entries after the given timestamp
:type after: int,str,time
:param before: limit to entries before the given timestamp
:type before: int,str,time
:param asc: returning a batch of messages in ascending order
:type asc: boolean
:param with_replies: also iterate over all replies / threads
:type with_replies: boolean
:return: generator of SlackMessage objects
:rtype: SlackMessage
"""
after = _parse_time(after)
before = _parse_time(before)
self._slack.log.debug("list msgs of %s (after=%s, before=%s)", self, after, before)
latest = before
oldest = after
has_more = True
while has_more:
messages, has_more = list_f(latest, oldest, limit=1000)

if not messages:
return

# earliest message
# Prepare for next page query
latest = messages[-1]
messages = self._slack.safe_paginated_api(lambda kw: self.api.history(self.id, latest=before, oldest=after, **kw), "messages", [self._scope()], "conversations.history")

for msg in reversed(messages) if asc else messages:
for msg in reversed(list(messages)) if asc else messages:
# Delete user messages
if msg["type"] == "message":
user = _find_user(self._slack, msg)
# Delete user messages
if msg["type"] == "message":
s_msg = SlackMessage(msg, user, self, self._slack)
yield s_msg
s_msg = SlackMessage(msg, user, self, self._slack)
yield s_msg

if with_replies and s_msg.has_replies:
yield from self.replies_to(s_msg, after=after, before=before, asc=asc)

def msgs(self, after: TimeIsh = None, before: TimeIsh = None, asc=False, with_replies=False) -> Iterator["SlackMessage"]:
"""
retrieve all messages as a generator
:param after: limit to entries after the given timestamp
:type after: int,str,time
:param before: limit to entries before the given timestamp
:type before: int,str,time
:param asc: returning a batch of messages in ascending order
:type asc: boolean
:param with_replies: also iterate over all replies / threads
:type with_replies: boolean
:return: generator of SlackMessage objects
:rtype: SlackMessage
"""

def list_f(latest, oldest, limit):
def fun():
return self.api.history(self.id, latest=latest, oldest=oldest, limit=limit)
return self._slack.safe_api(fun, ["messages", "has_more"], [[], False], [self._scope()], 'conversations.history')

yield from self._iter_message(list_f, after, before, asc, with_replies)
if with_replies and s_msg.has_replies:
yield from self.replies_to(s_msg, after=after, before=before, asc=asc)

def replies_to(self, base_msg: "SlackMessage", after: TimeIsh = None, before: TimeIsh = None, asc=False) -> Iterator["SlackMessage"]:
"""
returns the replies to a given SlackMessage instance
returns the replies to a given SlackMessage instance
:param base_msg: message instance to find replies to
:type base_msg: SlackMessage
:param after: limit to entries after the given timestamp
:type after: int,str,time
:param before: limit to entries before the given timestamp
:type before: int,str,time
:param asc: returning a batch of messages in ascending order
:type asc: boolean
:return: generator of SlackMessage replies
:rtype: SlackMessage
"""
:param base_msg: message instance to find replies to
:type base_msg: SlackMessage
:param after: limit to entries after the given timestamp
:type after: int,str,time
:param before: limit to entries before the given timestamp
:type before: int,str,time
:param asc: returning a batch of messages in ascending order
:type asc: boolean
:return: generator of SlackMessage replies
:rtype: SlackMessage
"""
ts = base_msg.json.get("thread_ts", base_msg.json["ts"])
after = _parse_time(after)
before = _parse_time(before)
self._slack.log.debug("list msgs of %s (after=%s, before=%s)", self, after, before)

def list_f(after, before, limit):
def fun():
return self.api.replies(self.id, ts, latest=before, oldest=after, limit=limit)
return self._slack.safe_api(fun, ["messages", "has_more"], [[], False], [self._scope()], 'conversations.replies')
messages = self._slack.safe_paginated_api(lambda kw: self.api.replies(self.id, ts, latest=before, oldest=after, **kw), "messages", [self._scope()], "conversations.replies")

for msg in self._iter_message(list_f, after, before, asc):
if base_msg.ts != msg.ts: # don't yield itself
yield msg
for msg in reversed(list(messages)) if asc else messages:
# Delete user messages
if msg["type"] == "message":
user = _find_user(self._slack, msg)
s_msg = SlackMessage(msg, user, self, self._slack)
if base_msg.ts != s_msg.ts: # don't yield itself
yield s_msg

def files(self, after: TimeIsh = None, before: TimeIsh = None, types: Optional[str] = None) -> Iterator["SlackFile"]:
"""
Expand Down Expand Up @@ -564,25 +550,14 @@ def list(
user = user.id
if isinstance(channel, SlackChannel):
channel = channel.id
page = 1
has_more = True

api = slack.api.files
slack.log.debug("list all files(user=%s, after=%s, before=%s, types=%s, channel=%s", user, after, before, types, channel)

while has_more:
files, paging = slack.safe_api(lambda: api.list(user=user, ts_from=after, ts_to=before, types=types, channel=channel, page=page, count=100),
['files', 'paging'], [[], None], ['files:read'], 'files.list')

if not files:
return
files = slack.safe_paginated_api(lambda kw: api.list(user=user, ts_from=after, ts_to=before, types=types, channel=channel, **kw), "files", ["files:read"], "files.list")

current_page = paging["page"]
total_pages = paging["pages"]
has_more = current_page < total_pages
page = current_page + 1

for sfile in files:
yield SlackFile(sfile, slack.resolve_user(sfile["user"]), slack)
for sfile in files:
yield SlackFile(sfile, slack.resolve_user(sfile["user"]), slack)

def __str__(self) -> str:
return self.name
Expand Down Expand Up @@ -785,8 +760,12 @@ class SlackCleaner:
"""
sleep for the given seconds after a file/message was deleted
"""
page_limit: int
"""
number of elements fetched per page
"""

def __init__(self, token: str, sleep_for=0, log_to_file=False, slacker: Optional[Slacker] = None, session=None, logger: Optional[Logger] = None, show_progress=True):
def __init__(self, token: str, sleep_for=0, log_to_file=False, slacker: Optional[Slacker] = None, session=None, logger: Optional[Logger] = None, show_progress=True, page_limit=200):
"""
:param token: the slack token, see README.md for details
:type token: str
Expand All @@ -802,11 +781,14 @@ def __init__(self, token: str, sleep_for=0, log_to_file=False, slacker: Optional
:type logger: Logger
:param show_progress: show a progress upon deleting an element on the console
:type show_progress: bool
:param page_limit: number of elements to fetch per page
:type page_limit: int
"""

self.log = SlackLogger(log_to_file, logger=logger, show_progress=show_progress)
self.sleep_for = sleep_for
self.token = token
self.page_limit = page_limit

self.log.debug("start")

Expand All @@ -816,13 +798,12 @@ def __init__(self, token: str, sleep_for=0, log_to_file=False, slacker: Optional
slack = Slacker(token, session=session if session else Session(), rate_limit_retries=2)
self.api = slack

raw_users = self.safe_api(
self.api.users.list, "members", [], ['users:read (bot, user)'], 'users.list')
raw_users = self.safe_api(self.api.users.list, "members", [], ["users:read (bot, user)"], "users.list")
self.users = ByKeyLookup[SlackUser]([SlackUser(m, self) for m in raw_users], lambda v: [v.name, v.id])
self.log.debug("collected users %s", self.users)

# determine one self
my_id = self.safe_api(self.api.auth.test, 'user_id', None, [], 'auth.test')
my_id = self.safe_api(self.api.auth.test, "user_id", None, [], "auth.test")
myself = next((u for u in self.users if u.id == my_id), None)
if not myself:
self.log.error("cannot determine my own user, using the first one or a dummy one")
Expand All @@ -831,42 +812,35 @@ def __init__(self, token: str, sleep_for=0, log_to_file=False, slacker: Optional
self.myself = myself

def _get_channel_users(channel: JSONDict):
raw_members = self.safe_api(lambda: self.api.conversations.members(channel["id"]), "members", [])
raw_members = self.safe_paginated_api(lambda kw: self.api.conversations.members(channel["id"], **kw), "members")
return self._resolve_users(raw_members)

raw_channels = self.safe_api(lambda: self.api.conversations.list(
types="public_channel"), "channels", [], ['channels:read'], 'conversations.list (public_channel)')
self.channels = [SlackChannel(m, _get_channel_users(m), SlackChannelType.PUBLIC, self.api.conversations, self)
for m in raw_channels if m.get("is_channel") and not m.get("is_private")]
raw_channels = self.safe_paginated_api(lambda kw: self.api.conversations.list(types="public_channel", **kw), "channels", ["channels:read"], "conversations.list (public_channel)")
self.channels = [SlackChannel(m, _get_channel_users(m), SlackChannelType.PUBLIC, self.api.conversations, self) for m in raw_channels if m.get("is_channel") and not m.get("is_private")]
self.log.debug("collected channels %s", self.channels)

raw_groups = self.safe_api(lambda: self.api.conversations.list(
types="private_channel"), "channels", [], ['groups:read'], 'conversations.list (private_channel)')
self.groups = [SlackChannel(m, _get_channel_users(m), SlackChannelType.PRIVATE, self.api.conversations, self) for m in raw_groups if (
m.get("is_channel") or m.get("is_group")) and m.get("is_private")]
raw_groups = self.safe_paginated_api(lambda kw: self.api.conversations.list(types="private_channel", **kw), "channels", ["groups:read"], "conversations.list (private_channel)")
self.groups = [
SlackChannel(m, _get_channel_users(m), SlackChannelType.PRIVATE, self.api.conversations, self) for m in raw_groups if (m.get("is_channel") or m.get("is_group")) and m.get("is_private")
]
self.log.debug("collected groups %s", self.groups)

raw_mpim = self.safe_api(lambda: self.api.conversations.list(
types="mpim"), "channels", [], ['mpim:read'], 'conversations.list (mpim)')
self.mpim = [SlackChannel(m, _get_channel_users(
m), SlackChannelType.MPIM, self.api.conversations, self) for m in raw_mpim if m.get("is_mpim")]
raw_mpim = self.safe_paginated_api(lambda kw: self.api.conversations.list(types="mpim", **kw), "channels", ["mpim:read"], "conversations.list (mpim)")
self.mpim = [SlackChannel(m, _get_channel_users(m), SlackChannelType.MPIM, self.api.conversations, self) for m in raw_mpim if m.get("is_mpim")]
self.log.debug("collected mpim %s", self.mpim)

raw_ims = self.safe_api(lambda: self.api.conversations.list(
types="im"), "channels", [], ['im:read'], 'conversations.list (im)')
self.ims = [SlackDirectMessage(m, self.resolve_user(
m["user"]), self.api.conversations, self) for m in raw_ims if m.get("is_im")]
raw_ims = self.safe_paginated_api(lambda kw: self.api.conversations.list(types="im", **kw), "channels", ["im:read"], "conversations.list (im)")
self.ims = [SlackDirectMessage(m, self.resolve_user(m["user"]), self.api.conversations, self) for m in raw_ims if m.get("is_im")]
self.log.debug("collected ims %s", self.ims)

# all different types with a similar interface
# al different types with a similar interface
self.conversations = self.channels + self.groups + self.mpim
self.conversations.extend(self.ims)

# pylint: disable=invalid-name
self.c = ByKeyLookup[Union[SlackChannel, SlackDirectMessage]](self.conversations, lambda v: [v.name, v.id])
# pylint: enable=invalid-name


def safe_api(self, fun: Callable, attr: Union[str, Sequence[str]], default_value=None, scopes: Optional[List[str]] = None, method: Optional[str] = None) -> Any:
"""
wrapper for handling common errors
Expand All @@ -887,18 +861,48 @@ def safe_api(self, fun: Callable, attr: Union[str, Sequence[str]], default_value
res = fun()
res = res.body
if not res["ok"]:
self.log.warning('%s: unknown occurred %s', method, res)
self.log.warning("%s: unknown occurred %s", method, res)
return default_value
if isinstance(attr, (list, tuple)):
return tuple([res.get(a) for a in attr])
return res.get(attr, default_value)
except Error as error:
if str(error) == 'missing_scope' and scopes:
self.log.warning('%s: missing scope error: %s is missing', method, f"one of '{scopes}'" if len(scopes) != 1 else scopes[0])
if str(error) == "missing_scope" and scopes:
self.log.warning("%s: missing scope error: %s is missing", method, f"one of '{scopes}'" if len(scopes) != 1 else scopes[0])
else:
self.log.error('%s: unknown error occurred: %s', method, error)
self.log.error("%s: unknown error occurred: %s", method, error)
return default_value

def safe_paginated_api(self, fun: Callable, attr: str, scopes: Optional[List[str]] = None, method: Optional[str] = None) -> Any:
"""
wrapper for iterating over a paginated result
:param fun: function to call the key-word arguments given should be forwarded
:type user_id: Callable
:param attr: attribute name in the body to return
:type attr: str
:param method: method hint name
:type method: str
:param scopes: list of scopes hint
:type scopes: List[str]
"""
limit = self.page_limit
next_cursor = None

def list_page():
if not next_cursor:
# initial call
return fun(dict(limit=limit))
return fun(dict(cursor=next_cursor, limit=limit))

while True:
page, meta = self.safe_api(list_page, [attr, "response_metadata"], [[], dict()], scopes, method)
for elem in page:
yield elem
if not meta or not meta.get("next_cursor"):
break
next_cursor = meta["next_cursor"]

def resolve_user(self, user_id: str) -> SlackUser:
"""
resolve a given user_id with creating a dummy user if needed
Expand Down Expand Up @@ -928,8 +932,8 @@ def post_delete(self, file_or_msg: Union[SlackMessage, SlackFile], error: Option
self.log.deleted(error)

if error:
if str(error) == 'missing_scope':
self.log.warning("cannot delete entry: %s: missing '%s' scope", file_or_msg, 'chat:write' if isinstance(file_or_msg, SlackMessage) else 'files:write')
if str(error) == "missing_scope":
self.log.warning("cannot delete entry: %s: missing '%s' scope", file_or_msg, "chat:write" if isinstance(file_or_msg, SlackMessage) else "files:write")
else:
self.log.warning("cannot delete entry: %s: %s", file_or_msg, error)
else:
Expand Down Expand Up @@ -980,6 +984,7 @@ def msgs(self, channels: Optional[Iterable[SlackChannel]] = None, after: TimeIsh
for msg in channel.msgs(after=after, before=before, with_replies=with_replies):
yield msg


def _find_user(slack: SlackCleaner, msg: Dict[str, Any]) -> Optional[SlackUser]:
if "user" not in msg:
return None
Expand Down

0 comments on commit 4c8fbae

Please sign in to comment.