Source code for updaters.telegram

"""Module containing class for sending Telegram replies and emoji reactions."""

import asyncio
import logging
from pathlib import Path

from telethon import TelegramClient
from telethon.errors import SessionPasswordNeededError

from trackers.config import telegram_config
from trackers.models import Mention
from updaters.base import BaseUpdater

logger = logging.getLogger(__name__)


[docs] class TelegramUpdater(BaseUpdater): """Main class for retrieving and adding Telegram messages. :var TelegramUpdater.client: authenticated Telegram client :type TelegramUpdater.client: :class:`telethon.TelegramClient` :var TelegramUpdater._is_connected: is client connected or not :type TelegramUpdater._is_connected: Boolean """ def __init__(self, *args, **kwargs): """Initialize Twitter/X updater. :var config: configuration dictionary for Telegram API :type config: dict :var session_name: name of the client session :type session_name: str :var session_path: full path on disk to session database file :type session_path: :class:`pathlib.PosixPath` """ super().__init__(*args, **kwargs) config = telegram_config() session_name = config.get("session_name", "telegram_tracker") session_path = ( Path(__file__).resolve().parent.parent / "fixtures" / f"{session_name}.session" ) self.client = TelegramClient( session=session_path, api_id=config["api_id"], api_hash=config["api_hash"], ) self._is_connected = False async def _add_reply_async(self, url, text): """Async implementation of adding reply to message. :param url: URL of the message :type url: str :param text: text to reply with :type text: str :var chat_id: unique chat identifier :type chat_id: int :var message_id: unique message identifier in the chat :type message_id: int :return: True if successful, False otherwise :rtype: bool """ try: await self._ensure_connected() chat_id, message_id = self._parse_message_url(url) await self.client.send_message( entity=chat_id, message=text, reply_to=message_id ) logger.info(f"Added reply to message: {url}") return True except Exception as e: logger.error(f"Error adding reply to {url}: {e}") return False async def _ensure_connected(self): """Ensure Telegram client is connected. :var phone: app creator's phone number :type phone: str :var code: code received via Telegram app :type code: str :var password: app creator's 2FA password :type password: str """ if not self._is_connected: try: await self.client.connect() if not await self.client.is_user_authorized(): phone = input("Please enter your phone number: ") await self.client.send_code_request(phone) code = input("Please enter the code you received: ") await self.client.sign_in(phone, code) self._is_connected = True except SessionPasswordNeededError: password = input("Please enter your 2FA password: ") await self.client.sign_in(password=password) self._is_connected = True except Exception as e: logger.error(f"Error connecting Telegram client: {e}") raise def _parse_message_url(self, url): """Parse Telegram message URL to extract chat_id and message_id. :param url: URL of the message :type url: str :var parts: message URL's parts :type parts: list :var chat_id: unique chat identifier :type chat_id: int :var message_id: unique message identifier in the chat :type message_id: int :return: tuple of (chat_id, message_id) :rtype: two-tuple """ parts = url.split("/") chat_id = int(parts[-2]) message_id = int(parts[-1]) return chat_id, message_id def _process_action(self, action_callback, *args): """Run `action_callback` with `args` in an asynchronous loop. :param action_callback: method to call :type action_callback: object :var loop: asyncio event loop :type loop: :class:`asyncio.events.AbstractEventLoop` :var result: future's result :type result: object :return: True for success, False otherwise :rtype: bool """ loop = None try: # Create event loop for async operations loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Run the async method result = loop.run_until_complete(action_callback(*args)) return result except Exception as e: logger.error(f"Error raised for {action_callback.__name__}: {e}") return False finally: # Always close the loop to free resources if loop and not loop.is_closed(): try: loop.close() except Exception as e: logger.warning(f"Error closing event loop: {e}")
[docs] def add_reaction_to_message(self, url, reaction_name): """Add reaction to the Telegram message defined by `url`. NOTE: not implemented yet :param url: URL of the message to react to :type url: str :param reaction_name: name of the reaction to add (e.g. "duplicate") :type reaction_name: str :return: True for success, False otherwise :rtype: Boolean """ return True
[docs] def add_reply_to_message(self, url, text): """Add `text` to message defined by `url`. :param url: URL of the message to reply to :type url: str :param text: text to reply with :type text: str :return: True for success, False otherwise :rtype: bool """ return self._process_action(self._add_reply_async, url, text)
[docs] def message_from_url(self, url): """Retrieve message content from provided Telegram `url`. :param url: Telegram URL to get message from :type url: str :return: dictionary with message data :rtype: dict """ return Mention.objects.message_from_url(url)