Files
max-messages-forwarder/bridge_app/max_bridge.py
n08i40k c12beb95e2
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 4m50s
feat: initial commit
2025-11-01 03:10:21 +04:00

285 lines
9.9 KiB
Python

"""Core bridge runtime for syncing messages from Max to Telegram."""
from __future__ import annotations
import asyncio
import base64
import logging
import signal
from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse
from aiogram.types import BufferedInputFile
from pymax import MaxClient
from pymax.types import Message, PhotoAttach, User, VideoAttach
from .config import Config
from .forwarding import (
ForwardAttachment,
MessageDeduplicator,
TelegramForwarder,
build_forward_text,
)
from .storage import ForwardedMessageStore
def _decode_preview_image(preview_data: str | None) -> bytes | None:
if not preview_data:
return None
payload = preview_data
if preview_data.startswith("data:"):
_, _, payload = preview_data.partition(",")
try:
return base64.b64decode(payload)
except (ValueError, TypeError):
return None
def _build_photo_url(attach: PhotoAttach) -> str | None:
base_url = getattr(attach, "base_url", None)
token = getattr(attach, "photo_token", None)
if not base_url or not token:
return None
if token in base_url:
return base_url
if "{token}" in base_url:
return base_url.replace("{token}", token)
if "{photoToken}" in base_url:
return base_url.replace("{photoToken}", token)
encoded_token = quote(token, safe="")
if base_url.endswith(("/", "=", "?", ":")):
return base_url + encoded_token
parsed = urlparse(base_url)
if parsed.query:
if parsed.query.endswith("="):
return base_url + encoded_token
query_params = parse_qsl(parsed.query, keep_blank_values=True)
if not any(key == "token" for key, _ in query_params):
query_params.append(("token", token))
new_query = urlencode(query_params, doseq=True)
return urlunparse(parsed._replace(query=new_query))
return base_url
return f"{base_url.rstrip('/')}/{encoded_token}"
async def _collect_forward_attachments(
client: MaxClient,
message: Message,
logger: logging.Logger,
) -> list[ForwardAttachment]:
attachments: list[ForwardAttachment] = []
for index, attach in enumerate(message.attaches or []):
if isinstance(attach, PhotoAttach):
url = _build_photo_url(attach)
if url:
attachments.append(ForwardAttachment(kind="photo", media=url))
continue
preview_bytes = _decode_preview_image(getattr(attach, "preview_data", None))
if preview_bytes:
filename = f"max_photo_{attach.photo_id or message.id}_{index}.jpg"
attachments.append(
ForwardAttachment(
kind="photo",
media=BufferedInputFile(preview_bytes, filename=filename),
)
)
continue
logger.debug("Skipping photo attachment without usable data: %s", attach)
continue
if isinstance(attach, VideoAttach):
video_id = getattr(attach, "video_id", None)
if video_id is None:
logger.debug("Video attachment missing video_id: %s", attach)
continue
try:
video_request = await client.get_video_by_id(
message.chat_id, message.id, video_id
)
except Exception:
logger.exception(
"Failed to resolve video attachment %s for message %s",
video_id,
message.id,
)
continue
if video_request and getattr(video_request, "url", None):
attachments.append(
ForwardAttachment(kind="video", media=video_request.url)
)
else:
logger.debug("Skipping video attachment without URL: %s", attach)
continue
logger.debug("Unsupported attachment type: %s", getattr(attach, "type", None))
return attachments
async def list_chats_and_exit(client: MaxClient) -> None:
if not client.chats and not client.dialogs and not client.channels:
logging.info("No chats available yet; waiting for initial sync")
for chat in client.chats:
logging.info("Group chat %s | id=%s", chat.title or "<no title>", chat.id)
for chat in client.channels:
logging.info("Channel %s | id=%s", chat.title or "<no title>", chat.id)
for dialog in client.dialogs:
if dialog.contact:
logging.info(
"Dialog with %s | id=%s",
dialog.contact.names[0].name if dialog.contact.names else dialog.id,
dialog.id,
)
else:
logging.info("Dialog id=%s", dialog.id)
async def run_bridge(config: Config, list_chats: bool) -> None:
logger = logging.getLogger("bridge")
client_kwargs: dict[str, object] = {
"phone": config.max_phone,
"work_dir": config.work_dir,
"send_fake_telemetry": not config.disable_fake_telemetry,
}
if config.max_token:
client_kwargs["token"] = config.max_token
if config.max_ws_uri:
client_kwargs["uri"] = config.max_ws_uri
client = MaxClient(**client_kwargs) # type: ignore[arg-type]
deduplicator = MessageDeduplicator()
process_lock = asyncio.Lock()
stop_event = asyncio.Event()
async def shutdown() -> None:
logger.info("Shutting down")
await client.close()
stop_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
except NotImplementedError:
# add_signal_handler is not available on Windows event loop policy
signal.signal(sig, lambda _sig, _frame: asyncio.create_task(shutdown()))
forwarder: TelegramForwarder | None = None
store: ForwardedMessageStore | None = None
if not list_chats:
assert config.telegram_bot_token is not None
assert config.telegram_chat_id is not None
forwarder = TelegramForwarder(
config.telegram_bot_token, config.telegram_chat_id, config.telegram_test_dc
)
await forwarder.start()
store = ForwardedMessageStore(config.db_path)
try:
await store.initialize()
except Exception:
logger.exception("Failed to initialize forwarded message store")
await forwarder.close()
raise
@client.on_start
async def _on_start() -> None:
logger.info(
"Connected to Max as %s. Loaded %d chats, %d dialogs, %d channels.",
config.max_phone,
len(client.chats),
len(client.dialogs),
len(client.channels),
)
if list_chats:
await list_chats_and_exit(client)
await shutdown()
@client.on_message()
async def _on_message(message: Message) -> None:
if list_chats:
return
async with process_lock:
if message.chat_id is None:
logger.debug("Skipping message without chat_id: %s", message)
return
target_chat_id = config.max_chat_id
if target_chat_id is None:
logger.debug("No target chat configured; skipping message %s", message.id)
return
if message.chat_id != target_chat_id:
return
if not deduplicator.add(message.id):
logger.debug("Skipping duplicate message id=%s", message.id)
return
sender_user: User | None = None
if message.sender is not None:
try:
sender_user = await client.get_user(message.sender)
except Exception:
logger.exception("Failed to resolve sender %s", message.sender)
attachments_to_forward = await _collect_forward_attachments(
client, message, logger
)
text = build_forward_text(
message,
sender_user,
include_attachment_descriptions=not attachments_to_forward,
)
reply_to_message_id: int | None = None
if (
store is not None
and message.link is not None
and getattr(message.link, "message", None) is not None
):
linked_message = message.link.message
if isinstance(linked_message, Message):
reply_to_message_id = await store.lookup_telegram_message_id(
target_chat_id, linked_message.id
)
if reply_to_message_id is None:
logger.debug(
"No Telegram mapping found for reply target %s", linked_message.id
)
assert forwarder is not None
try:
telegram_message_id = await forwarder.forward(
text,
attachments_to_forward,
reply_to_message_id=reply_to_message_id,
)
except Exception:
logger.exception("Failed to forward message %s", message.id)
return
if store is not None:
try:
await store.record_forwarded(
target_chat_id, message.id, telegram_message_id
)
except Exception:
logger.exception(
"Failed to record forwarded message mapping for %s", message.id
)
try:
await client.start()
except Exception:
logger.exception("Max client stopped unexpectedly")
finally:
if forwarder is not None:
await forwarder.close()
stop_event.set()
await stop_event.wait()