From 2d04823774762369dda3d63748531450207617cb Mon Sep 17 00:00:00 2001 From: muneerahp Date: Mon, 12 Dec 2022 11:47:05 +0000 Subject: [PATCH] adding option to look for and collect telegram reply channels --- datasources/telegram/search_telegram.py | 37 ++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/datasources/telegram/search_telegram.py b/datasources/telegram/search_telegram.py index c8ef7e5bd..df00d03b0 100644 --- a/datasources/telegram/search_telegram.py +++ b/datasources/telegram/search_telegram.py @@ -18,7 +18,7 @@ from common.lib.helpers import convert_to_int, UserInput from datetime import datetime -from telethon import TelegramClient +from telethon import TelegramClient, utils, types from telethon.errors.rpcerrorlist import UsernameInvalidError, TimeoutError, ChannelPrivateError, BadRequestError, \ FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError from telethon.tl.functions.channels import GetFullChannelRequest @@ -127,6 +127,19 @@ class SearchTelegram(Search): "type": UserInput.OPTION_TOGGLE, "help": "Resolve references", "default": False, + }, + "retrieve-replies-intro": { + "type": UserInput.OPTION_INFO, + "help": "Some public channels have linked discussion groups where users can comment/reply to posts made " + "in the original channel. Enabling this option allows you to check if a channel has a publicly " + "available linked discussion group and collect data from here too. Note that replies are collected " + "with the same parameters (dates, number of messages to collect) as the main channel. Enabling this " + "will increase the size of your dataset." + }, + "retrieve-replies": { + "type": UserInput.OPTION_TOGGLE, + "help": "Retrieve replies", + "default": False, } } @@ -256,6 +269,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): :return list: List of messages, each message a dictionary. """ resolve_refs = self.parameters.get("resolve-entities") + retrieve_replies = self.parameters.get("retrieve-replies") # Adding flag to stop; using for rate limits no_additional_queries = False @@ -266,6 +280,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): delay = 10 retries = 0 processed += 1 + reply_channel_added = False self.dataset.update_progress(processed / len(queries)) if no_additional_queries: @@ -278,6 +293,14 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): i = 0 try: entity_posts = 0 + + # if chat channels are added, they are id-ed using numeric id + # this will fail if they are then formatted as a string on input + try: + query = int(query) + except ValueError: + pass + async for message in client.iter_messages(entity=query, offset_date=max_date): entity_posts += 1 i += 1 @@ -293,6 +316,17 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): # e.g. someone joins the channel - not an actual message continue + if retrieve_replies and (not reply_channel_added) and (message.replies and + message.replies.channel_id): + listed_reply_channel = message.replies.channel_id + + self.dataset.update_status("Reply channel '%s' found and added to process queue" + % listed_reply_channel) + + channel_to_add = utils.get_peer_id(types.PeerChannel(message.replies.channel_id)) + queries.append(channel_to_add) + reply_channel_added = True + # todo: possibly enrich object with e.g. the name of # the channel a message was forwarded from (but that # needs extra API requests...) @@ -815,6 +849,7 @@ def validate_query(query, request, user): "api_phone": query.get("api_phone"), "save-session": query.get("save-session"), "resolve-entities": query.get("resolve-entities"), + "retrieve-replies": query.get("retrieve-replies"), "min_date": min_date, "max_date": max_date }