"""Core bridge runtime for syncing messages from Max to Telegram.""" from __future__ import annotations import asyncio import base64 import logging import signal from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse from aiogram.types import BufferedInputFile from pymax import MaxClient from pymax.types import Message, PhotoAttach, User, VideoAttach from .config import Config from .forwarding import ( ForwardAttachment, MessageDeduplicator, TelegramForwarder, build_forward_text, ) from .storage import ForwardedMessageStore def _decode_preview_image(preview_data: str | None) -> bytes | None: if not preview_data: return None payload = preview_data if preview_data.startswith("data:"): _, _, payload = preview_data.partition(",") try: return base64.b64decode(payload) except (ValueError, TypeError): return None def _build_photo_url(attach: PhotoAttach) -> str | None: base_url = getattr(attach, "base_url", None) token = getattr(attach, "photo_token", None) if not base_url or not token: return None if token in base_url: return base_url if "{token}" in base_url: return base_url.replace("{token}", token) if "{photoToken}" in base_url: return base_url.replace("{photoToken}", token) encoded_token = quote(token, safe="") if base_url.endswith(("/", "=", "?", ":")): return base_url + encoded_token parsed = urlparse(base_url) if parsed.query: if parsed.query.endswith("="): return base_url + encoded_token query_params = parse_qsl(parsed.query, keep_blank_values=True) if not any(key == "token" for key, _ in query_params): query_params.append(("token", token)) new_query = urlencode(query_params, doseq=True) return urlunparse(parsed._replace(query=new_query)) return base_url return f"{base_url.rstrip('/')}/{encoded_token}" async def _collect_forward_attachments( client: MaxClient, message: Message, logger: logging.Logger, ) -> list[ForwardAttachment]: attachments: list[ForwardAttachment] = [] for index, attach in enumerate(message.attaches or []): if isinstance(attach, PhotoAttach): url = _build_photo_url(attach) if url: attachments.append(ForwardAttachment(kind="photo", media=url)) continue preview_bytes = _decode_preview_image(getattr(attach, "preview_data", None)) if preview_bytes: filename = f"max_photo_{attach.photo_id or message.id}_{index}.jpg" attachments.append( ForwardAttachment( kind="photo", media=BufferedInputFile(preview_bytes, filename=filename), ) ) continue logger.debug("Skipping photo attachment without usable data: %s", attach) continue if isinstance(attach, VideoAttach): video_id = getattr(attach, "video_id", None) if video_id is None: logger.debug("Video attachment missing video_id: %s", attach) continue try: video_request = await client.get_video_by_id( message.chat_id, message.id, video_id ) except Exception: logger.exception( "Failed to resolve video attachment %s for message %s", video_id, message.id, ) continue if video_request and getattr(video_request, "url", None): attachments.append( ForwardAttachment(kind="video", media=video_request.url) ) else: logger.debug("Skipping video attachment without URL: %s", attach) continue logger.debug("Unsupported attachment type: %s", getattr(attach, "type", None)) return attachments async def list_chats_and_exit(client: MaxClient) -> None: if not client.chats and not client.dialogs and not client.channels: logging.info("No chats available yet; waiting for initial sync") for chat in client.chats: logging.info("Group chat %s | id=%s", chat.title or "", chat.id) for chat in client.channels: logging.info("Channel %s | id=%s", chat.title or "", chat.id) for dialog in client.dialogs: if dialog.contact: logging.info( "Dialog with %s | id=%s", dialog.contact.names[0].name if dialog.contact.names else dialog.id, dialog.id, ) else: logging.info("Dialog id=%s", dialog.id) async def run_bridge(config: Config, list_chats: bool) -> None: logger = logging.getLogger("bridge") client_kwargs: dict[str, object] = { "phone": config.max_phone, "work_dir": config.work_dir, "send_fake_telemetry": not config.disable_fake_telemetry, } if config.max_token: client_kwargs["token"] = config.max_token if config.max_ws_uri: client_kwargs["uri"] = config.max_ws_uri client = MaxClient(**client_kwargs) # type: ignore[arg-type] deduplicator = MessageDeduplicator() process_lock = asyncio.Lock() stop_event = asyncio.Event() async def shutdown() -> None: logger.info("Shutting down") await client.close() stop_event.set() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown())) except NotImplementedError: # add_signal_handler is not available on Windows event loop policy signal.signal(sig, lambda _sig, _frame: asyncio.create_task(shutdown())) forwarder: TelegramForwarder | None = None store: ForwardedMessageStore | None = None if not list_chats: assert config.telegram_bot_token is not None assert config.telegram_chat_id is not None forwarder = TelegramForwarder( config.telegram_bot_token, config.telegram_chat_id, config.telegram_test_dc ) await forwarder.start() store = ForwardedMessageStore(config.db_path) try: await store.initialize() except Exception: logger.exception("Failed to initialize forwarded message store") await forwarder.close() raise @client.on_start async def _on_start() -> None: logger.info( "Connected to Max as %s. Loaded %d chats, %d dialogs, %d channels.", config.max_phone, len(client.chats), len(client.dialogs), len(client.channels), ) if list_chats: await list_chats_and_exit(client) await shutdown() @client.on_message() async def _on_message(message: Message) -> None: if list_chats: return async with process_lock: if message.chat_id is None: logger.debug("Skipping message without chat_id: %s", message) return target_chat_id = config.max_chat_id if target_chat_id is None: logger.debug("No target chat configured; skipping message %s", message.id) return if message.chat_id != target_chat_id: return if not deduplicator.add(message.id): logger.debug("Skipping duplicate message id=%s", message.id) return sender_user: User | None = None if message.sender is not None: try: sender_user = await client.get_user(message.sender) except Exception: logger.exception("Failed to resolve sender %s", message.sender) attachments_to_forward = await _collect_forward_attachments( client, message, logger ) text = build_forward_text( message, sender_user, include_attachment_descriptions=not attachments_to_forward, ) reply_to_message_id: int | None = None if ( store is not None and message.link is not None and getattr(message.link, "message", None) is not None ): linked_message = message.link.message if isinstance(linked_message, Message): reply_to_message_id = await store.lookup_telegram_message_id( target_chat_id, linked_message.id ) if reply_to_message_id is None: logger.debug( "No Telegram mapping found for reply target %s", linked_message.id ) assert forwarder is not None try: telegram_message_id = await forwarder.forward( text, attachments_to_forward, reply_to_message_id=reply_to_message_id, ) except Exception: logger.exception("Failed to forward message %s", message.id) return if store is not None: try: await store.record_forwarded( target_chat_id, message.id, telegram_message_id ) except Exception: logger.exception( "Failed to record forwarded message mapping for %s", message.id ) try: await client.start() except Exception: logger.exception("Max client stopped unexpectedly") finally: if forwarder is not None: await forwarder.close() stop_event.set() await stop_event.wait()