"""Tools for deduplicating and forwarding Max messages to Telegram.""" from __future__ import annotations from collections import deque from dataclasses import dataclass from typing import Iterable, Literal, Optional, Sequence from aiogram import Bot from aiogram.client.session.aiohttp import AiohttpSession from aiogram.client.telegram import PRODUCTION, TEST, TelegramAPIServer from aiogram.types import BufferedInputFile, InputMediaPhoto, InputMediaVideo from pymax.static import AttachType from pymax.types import Message, User class MessageDeduplicator: """Keep a bounded set of processed message IDs.""" def __init__(self, limit: int = 1024) -> None: self._limit = limit self._order: deque[int] = deque() self._seen: set[int] = set() def add(self, message_id: int) -> bool: if message_id in self._seen: return False self._seen.add(message_id) self._order.append(message_id) if len(self._order) > self._limit: old = self._order.popleft() self._seen.discard(old) return True class TelegramForwarder: def __init__(self, bot_token: str, chat_id: int, test_dc: bool) -> None: self._bot_token = bot_token self._chat_id = chat_id self._test_dc = test_dc self._bot: Bot | None = None async def start(self) -> None: if self._bot is not None: return server: TelegramAPIServer = TEST if self._test_dc else PRODUCTION self._bot = Bot(self._bot_token, session=AiohttpSession(api=server)) async def close(self) -> None: if self._bot is not None: await self._bot.session.close() self._bot = None async def send_message( self, text: str, *, reply_to_message_id: int | None = None ) -> int: if self._bot is None: raise RuntimeError("TelegramForwarder is not started") message = await self._bot.send_message( chat_id=self._chat_id, text=text, reply_to_message_id=reply_to_message_id, allow_sending_without_reply=reply_to_message_id is not None, parse_mode="MARKDOWN", ) return int(message.message_id) async def forward( self, text: str, attachments: Sequence["ForwardAttachment"], *, reply_to_message_id: int | None = None, ) -> int: """Send text and optional media, returning the primary Telegram message id.""" if not attachments: return await self.send_message( text, reply_to_message_id=reply_to_message_id ) caption = text or None if caption and len(caption) > 1024: primary_id = await self.send_message( text, reply_to_message_id=reply_to_message_id ) await self.send_attachments( attachments, reply_to_message_id=primary_id, ) return primary_id return await self.send_attachments( attachments, reply_to_message_id=reply_to_message_id, caption=caption, ) async def send_attachments( self, attachments: Sequence["ForwardAttachment"], *, reply_to_message_id: int | None = None, caption: str | None = None, ) -> int: """Send one or more media attachments, returning the lead message id.""" if self._bot is None: raise RuntimeError("TelegramForwarder is not started") if not attachments: raise ValueError("attachments must not be empty") allow_without_reply = reply_to_message_id is not None if len(attachments) == 1: attachment = attachments[0] if attachment.kind == "photo": message = await self._bot.send_photo( chat_id=self._chat_id, photo=attachment.media, caption=caption, parse_mode="MARKDOWN" if caption else None, reply_to_message_id=reply_to_message_id, allow_sending_without_reply=allow_without_reply, ) elif attachment.kind == "video": message = await self._bot.send_video( chat_id=self._chat_id, video=attachment.media, caption=caption, parse_mode="MARKDOWN" if caption else None, reply_to_message_id=reply_to_message_id, allow_sending_without_reply=allow_without_reply, ) else: raise ValueError(f"Unsupported attachment kind: {attachment.kind}") return int(message.message_id) media_group = [] for index, attachment in enumerate(attachments): item_caption = caption if index == 0 else None if attachment.kind == "photo": media_group.append( InputMediaPhoto( media=attachment.media, caption=item_caption, parse_mode="MARKDOWN" if item_caption else None, ) ) elif attachment.kind == "video": media_group.append( InputMediaVideo( media=attachment.media, caption=item_caption, parse_mode="MARKDOWN" if item_caption else None, ) ) else: raise ValueError(f"Unsupported attachment kind: {attachment.kind}") messages = await self._bot.send_media_group( chat_id=self._chat_id, media=media_group, reply_to_message_id=reply_to_message_id, allow_sending_without_reply=allow_without_reply, ) return int(messages[0].message_id) @dataclass(slots=True) class ForwardAttachment: kind: Literal["photo", "video"] media: str | BufferedInputFile def format_sender(user: Optional[User], fallback_sender_id: Optional[int]) -> str: if user and user.names: return user.names[0].name if fallback_sender_id is not None: return f"user:{fallback_sender_id}" return "system" def iter_attachment_descriptions(message: Message) -> Iterable[str]: if not message.attaches: return [] descriptions: list[str] = [] for attach in message.attaches: attach_type = getattr(attach, "type", None) if isinstance(attach_type, AttachType): type_name = attach_type.value.lower() else: type_name = str(attach_type) parts = [type_name] base_url = getattr(attach, "base_url", None) if base_url: parts.append(base_url) token = getattr(attach, "photo_token", None) or getattr(attach, "token", None) if token: parts.append(f"token={token}") descriptions.append(" ".join(parts)) return descriptions def build_forward_text( message: Message, sender: Optional[User], *, include_attachment_descriptions: bool = True, ) -> str: sender_name = format_sender(sender, message.sender) parts: list[str] = [f"От {sender_name}:"] text = (message.text or "").strip() if text: parts.append(text) attachments: list[str] = [] if include_attachment_descriptions: attachments = list(iter_attachment_descriptions(message)) if attachments: parts.append("Attachments:") parts.extend(f"- {item}" for item in attachments) if not text and not attachments: parts.append("(empty message)") return "\n".join(parts)