All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 4m50s
227 lines
7.6 KiB
Python
227 lines
7.6 KiB
Python
"""Tools for deduplicating and forwarding Max messages to Telegram."""
|
|
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, Literal, Optional, Sequence
|
|
|
|
from aiogram import Bot
|
|
from aiogram.client.session.aiohttp import AiohttpSession
|
|
from aiogram.client.telegram import PRODUCTION, TEST, TelegramAPIServer
|
|
from aiogram.types import BufferedInputFile, InputMediaPhoto, InputMediaVideo
|
|
|
|
from pymax.static import AttachType
|
|
from pymax.types import Message, User
|
|
|
|
|
|
class MessageDeduplicator:
|
|
"""Keep a bounded set of processed message IDs."""
|
|
|
|
def __init__(self, limit: int = 1024) -> None:
|
|
self._limit = limit
|
|
self._order: deque[int] = deque()
|
|
self._seen: set[int] = set()
|
|
|
|
def add(self, message_id: int) -> bool:
|
|
if message_id in self._seen:
|
|
return False
|
|
self._seen.add(message_id)
|
|
self._order.append(message_id)
|
|
if len(self._order) > self._limit:
|
|
old = self._order.popleft()
|
|
self._seen.discard(old)
|
|
return True
|
|
|
|
|
|
class TelegramForwarder:
|
|
def __init__(self, bot_token: str, chat_id: int, test_dc: bool) -> None:
|
|
self._bot_token = bot_token
|
|
self._chat_id = chat_id
|
|
self._test_dc = test_dc
|
|
self._bot: Bot | None = None
|
|
|
|
async def start(self) -> None:
|
|
if self._bot is not None:
|
|
return
|
|
server: TelegramAPIServer = TEST if self._test_dc else PRODUCTION
|
|
self._bot = Bot(self._bot_token, session=AiohttpSession(api=server))
|
|
|
|
async def close(self) -> None:
|
|
if self._bot is not None:
|
|
await self._bot.session.close()
|
|
self._bot = None
|
|
|
|
async def send_message(
|
|
self, text: str, *, reply_to_message_id: int | None = None
|
|
) -> int:
|
|
if self._bot is None:
|
|
raise RuntimeError("TelegramForwarder is not started")
|
|
message = await self._bot.send_message(
|
|
chat_id=self._chat_id,
|
|
text=text,
|
|
reply_to_message_id=reply_to_message_id,
|
|
allow_sending_without_reply=reply_to_message_id is not None,
|
|
parse_mode="MARKDOWN",
|
|
)
|
|
return int(message.message_id)
|
|
|
|
async def forward(
|
|
self,
|
|
text: str,
|
|
attachments: Sequence["ForwardAttachment"],
|
|
*,
|
|
reply_to_message_id: int | None = None,
|
|
) -> int:
|
|
"""Send text and optional media, returning the primary Telegram message id."""
|
|
|
|
if not attachments:
|
|
return await self.send_message(
|
|
text, reply_to_message_id=reply_to_message_id
|
|
)
|
|
|
|
caption = text or None
|
|
if caption and len(caption) > 1024:
|
|
primary_id = await self.send_message(
|
|
text, reply_to_message_id=reply_to_message_id
|
|
)
|
|
await self.send_attachments(
|
|
attachments,
|
|
reply_to_message_id=primary_id,
|
|
)
|
|
return primary_id
|
|
|
|
return await self.send_attachments(
|
|
attachments,
|
|
reply_to_message_id=reply_to_message_id,
|
|
caption=caption,
|
|
)
|
|
|
|
async def send_attachments(
|
|
self,
|
|
attachments: Sequence["ForwardAttachment"],
|
|
*,
|
|
reply_to_message_id: int | None = None,
|
|
caption: str | None = None,
|
|
) -> int:
|
|
"""Send one or more media attachments, returning the lead message id."""
|
|
|
|
if self._bot is None:
|
|
raise RuntimeError("TelegramForwarder is not started")
|
|
if not attachments:
|
|
raise ValueError("attachments must not be empty")
|
|
|
|
allow_without_reply = reply_to_message_id is not None
|
|
|
|
if len(attachments) == 1:
|
|
attachment = attachments[0]
|
|
if attachment.kind == "photo":
|
|
message = await self._bot.send_photo(
|
|
chat_id=self._chat_id,
|
|
photo=attachment.media,
|
|
caption=caption,
|
|
parse_mode="MARKDOWN" if caption else None,
|
|
reply_to_message_id=reply_to_message_id,
|
|
allow_sending_without_reply=allow_without_reply,
|
|
)
|
|
elif attachment.kind == "video":
|
|
message = await self._bot.send_video(
|
|
chat_id=self._chat_id,
|
|
video=attachment.media,
|
|
caption=caption,
|
|
parse_mode="MARKDOWN" if caption else None,
|
|
reply_to_message_id=reply_to_message_id,
|
|
allow_sending_without_reply=allow_without_reply,
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported attachment kind: {attachment.kind}")
|
|
return int(message.message_id)
|
|
|
|
media_group = []
|
|
for index, attachment in enumerate(attachments):
|
|
item_caption = caption if index == 0 else None
|
|
if attachment.kind == "photo":
|
|
media_group.append(
|
|
InputMediaPhoto(
|
|
media=attachment.media,
|
|
caption=item_caption,
|
|
parse_mode="MARKDOWN" if item_caption else None,
|
|
)
|
|
)
|
|
elif attachment.kind == "video":
|
|
media_group.append(
|
|
InputMediaVideo(
|
|
media=attachment.media,
|
|
caption=item_caption,
|
|
parse_mode="MARKDOWN" if item_caption else None,
|
|
)
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported attachment kind: {attachment.kind}")
|
|
|
|
messages = await self._bot.send_media_group(
|
|
chat_id=self._chat_id,
|
|
media=media_group,
|
|
reply_to_message_id=reply_to_message_id,
|
|
allow_sending_without_reply=allow_without_reply,
|
|
)
|
|
return int(messages[0].message_id)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ForwardAttachment:
|
|
kind: Literal["photo", "video"]
|
|
media: str | BufferedInputFile
|
|
|
|
|
|
def format_sender(user: Optional[User], fallback_sender_id: Optional[int]) -> str:
|
|
if user and user.names:
|
|
return user.names[0].name
|
|
if fallback_sender_id is not None:
|
|
return f"user:{fallback_sender_id}"
|
|
return "system"
|
|
|
|
|
|
def iter_attachment_descriptions(message: Message) -> Iterable[str]:
|
|
if not message.attaches:
|
|
return []
|
|
descriptions: list[str] = []
|
|
for attach in message.attaches:
|
|
attach_type = getattr(attach, "type", None)
|
|
if isinstance(attach_type, AttachType):
|
|
type_name = attach_type.value.lower()
|
|
else:
|
|
type_name = str(attach_type)
|
|
|
|
parts = [type_name]
|
|
base_url = getattr(attach, "base_url", None)
|
|
if base_url:
|
|
parts.append(base_url)
|
|
token = getattr(attach, "photo_token", None) or getattr(attach, "token", None)
|
|
if token:
|
|
parts.append(f"token={token}")
|
|
descriptions.append(" ".join(parts))
|
|
return descriptions
|
|
|
|
|
|
def build_forward_text(
|
|
message: Message,
|
|
sender: Optional[User],
|
|
*,
|
|
include_attachment_descriptions: bool = True,
|
|
) -> str:
|
|
sender_name = format_sender(sender, message.sender)
|
|
|
|
parts: list[str] = [f"От {sender_name}:"]
|
|
text = (message.text or "").strip()
|
|
if text:
|
|
parts.append(text)
|
|
attachments: list[str] = []
|
|
if include_attachment_descriptions:
|
|
attachments = list(iter_attachment_descriptions(message))
|
|
if attachments:
|
|
parts.append("Attachments:")
|
|
parts.extend(f"- {item}" for item in attachments)
|
|
if not text and not attachments:
|
|
parts.append("(empty message)")
|
|
return "\n".join(parts)
|