All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 4m50s
285 lines
9.9 KiB
Python
285 lines
9.9 KiB
Python
"""Core bridge runtime for syncing messages from Max to Telegram."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import logging
|
|
import signal
|
|
from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse
|
|
|
|
from aiogram.types import BufferedInputFile
|
|
|
|
from pymax import MaxClient
|
|
from pymax.types import Message, PhotoAttach, User, VideoAttach
|
|
|
|
from .config import Config
|
|
from .forwarding import (
|
|
ForwardAttachment,
|
|
MessageDeduplicator,
|
|
TelegramForwarder,
|
|
build_forward_text,
|
|
)
|
|
from .storage import ForwardedMessageStore
|
|
|
|
|
|
def _decode_preview_image(preview_data: str | None) -> bytes | None:
|
|
if not preview_data:
|
|
return None
|
|
payload = preview_data
|
|
if preview_data.startswith("data:"):
|
|
_, _, payload = preview_data.partition(",")
|
|
try:
|
|
return base64.b64decode(payload)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _build_photo_url(attach: PhotoAttach) -> str | None:
|
|
base_url = getattr(attach, "base_url", None)
|
|
token = getattr(attach, "photo_token", None)
|
|
if not base_url or not token:
|
|
return None
|
|
|
|
if token in base_url:
|
|
return base_url
|
|
if "{token}" in base_url:
|
|
return base_url.replace("{token}", token)
|
|
if "{photoToken}" in base_url:
|
|
return base_url.replace("{photoToken}", token)
|
|
|
|
encoded_token = quote(token, safe="")
|
|
if base_url.endswith(("/", "=", "?", ":")):
|
|
return base_url + encoded_token
|
|
|
|
parsed = urlparse(base_url)
|
|
if parsed.query:
|
|
if parsed.query.endswith("="):
|
|
return base_url + encoded_token
|
|
query_params = parse_qsl(parsed.query, keep_blank_values=True)
|
|
if not any(key == "token" for key, _ in query_params):
|
|
query_params.append(("token", token))
|
|
new_query = urlencode(query_params, doseq=True)
|
|
return urlunparse(parsed._replace(query=new_query))
|
|
return base_url
|
|
|
|
return f"{base_url.rstrip('/')}/{encoded_token}"
|
|
|
|
|
|
async def _collect_forward_attachments(
|
|
client: MaxClient,
|
|
message: Message,
|
|
logger: logging.Logger,
|
|
) -> list[ForwardAttachment]:
|
|
attachments: list[ForwardAttachment] = []
|
|
for index, attach in enumerate(message.attaches or []):
|
|
if isinstance(attach, PhotoAttach):
|
|
url = _build_photo_url(attach)
|
|
if url:
|
|
attachments.append(ForwardAttachment(kind="photo", media=url))
|
|
continue
|
|
|
|
preview_bytes = _decode_preview_image(getattr(attach, "preview_data", None))
|
|
if preview_bytes:
|
|
filename = f"max_photo_{attach.photo_id or message.id}_{index}.jpg"
|
|
attachments.append(
|
|
ForwardAttachment(
|
|
kind="photo",
|
|
media=BufferedInputFile(preview_bytes, filename=filename),
|
|
)
|
|
)
|
|
continue
|
|
|
|
logger.debug("Skipping photo attachment without usable data: %s", attach)
|
|
continue
|
|
|
|
if isinstance(attach, VideoAttach):
|
|
video_id = getattr(attach, "video_id", None)
|
|
if video_id is None:
|
|
logger.debug("Video attachment missing video_id: %s", attach)
|
|
continue
|
|
try:
|
|
video_request = await client.get_video_by_id(
|
|
message.chat_id, message.id, video_id
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to resolve video attachment %s for message %s",
|
|
video_id,
|
|
message.id,
|
|
)
|
|
continue
|
|
if video_request and getattr(video_request, "url", None):
|
|
attachments.append(
|
|
ForwardAttachment(kind="video", media=video_request.url)
|
|
)
|
|
else:
|
|
logger.debug("Skipping video attachment without URL: %s", attach)
|
|
continue
|
|
|
|
logger.debug("Unsupported attachment type: %s", getattr(attach, "type", None))
|
|
|
|
return attachments
|
|
|
|
|
|
async def list_chats_and_exit(client: MaxClient) -> None:
|
|
if not client.chats and not client.dialogs and not client.channels:
|
|
logging.info("No chats available yet; waiting for initial sync")
|
|
|
|
for chat in client.chats:
|
|
logging.info("Group chat %s | id=%s", chat.title or "<no title>", chat.id)
|
|
for chat in client.channels:
|
|
logging.info("Channel %s | id=%s", chat.title or "<no title>", chat.id)
|
|
for dialog in client.dialogs:
|
|
if dialog.contact:
|
|
logging.info(
|
|
"Dialog with %s | id=%s",
|
|
dialog.contact.names[0].name if dialog.contact.names else dialog.id,
|
|
dialog.id,
|
|
)
|
|
else:
|
|
logging.info("Dialog id=%s", dialog.id)
|
|
|
|
|
|
async def run_bridge(config: Config, list_chats: bool) -> None:
|
|
logger = logging.getLogger("bridge")
|
|
|
|
client_kwargs: dict[str, object] = {
|
|
"phone": config.max_phone,
|
|
"work_dir": config.work_dir,
|
|
"send_fake_telemetry": not config.disable_fake_telemetry,
|
|
}
|
|
if config.max_token:
|
|
client_kwargs["token"] = config.max_token
|
|
if config.max_ws_uri:
|
|
client_kwargs["uri"] = config.max_ws_uri
|
|
|
|
client = MaxClient(**client_kwargs) # type: ignore[arg-type]
|
|
deduplicator = MessageDeduplicator()
|
|
process_lock = asyncio.Lock()
|
|
stop_event = asyncio.Event()
|
|
|
|
async def shutdown() -> None:
|
|
logger.info("Shutting down")
|
|
await client.close()
|
|
stop_event.set()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
|
|
except NotImplementedError:
|
|
# add_signal_handler is not available on Windows event loop policy
|
|
signal.signal(sig, lambda _sig, _frame: asyncio.create_task(shutdown()))
|
|
|
|
forwarder: TelegramForwarder | None = None
|
|
store: ForwardedMessageStore | None = None
|
|
if not list_chats:
|
|
assert config.telegram_bot_token is not None
|
|
assert config.telegram_chat_id is not None
|
|
forwarder = TelegramForwarder(
|
|
config.telegram_bot_token, config.telegram_chat_id, config.telegram_test_dc
|
|
)
|
|
await forwarder.start()
|
|
store = ForwardedMessageStore(config.db_path)
|
|
try:
|
|
await store.initialize()
|
|
except Exception:
|
|
logger.exception("Failed to initialize forwarded message store")
|
|
await forwarder.close()
|
|
raise
|
|
|
|
@client.on_start
|
|
async def _on_start() -> None:
|
|
logger.info(
|
|
"Connected to Max as %s. Loaded %d chats, %d dialogs, %d channels.",
|
|
config.max_phone,
|
|
len(client.chats),
|
|
len(client.dialogs),
|
|
len(client.channels),
|
|
)
|
|
if list_chats:
|
|
await list_chats_and_exit(client)
|
|
await shutdown()
|
|
|
|
@client.on_message()
|
|
async def _on_message(message: Message) -> None:
|
|
if list_chats:
|
|
return
|
|
async with process_lock:
|
|
if message.chat_id is None:
|
|
logger.debug("Skipping message without chat_id: %s", message)
|
|
return
|
|
target_chat_id = config.max_chat_id
|
|
if target_chat_id is None:
|
|
logger.debug("No target chat configured; skipping message %s", message.id)
|
|
return
|
|
if message.chat_id != target_chat_id:
|
|
return
|
|
if not deduplicator.add(message.id):
|
|
logger.debug("Skipping duplicate message id=%s", message.id)
|
|
return
|
|
|
|
sender_user: User | None = None
|
|
if message.sender is not None:
|
|
try:
|
|
sender_user = await client.get_user(message.sender)
|
|
except Exception:
|
|
logger.exception("Failed to resolve sender %s", message.sender)
|
|
|
|
attachments_to_forward = await _collect_forward_attachments(
|
|
client, message, logger
|
|
)
|
|
text = build_forward_text(
|
|
message,
|
|
sender_user,
|
|
include_attachment_descriptions=not attachments_to_forward,
|
|
)
|
|
|
|
reply_to_message_id: int | None = None
|
|
if (
|
|
store is not None
|
|
and message.link is not None
|
|
and getattr(message.link, "message", None) is not None
|
|
):
|
|
linked_message = message.link.message
|
|
if isinstance(linked_message, Message):
|
|
reply_to_message_id = await store.lookup_telegram_message_id(
|
|
target_chat_id, linked_message.id
|
|
)
|
|
if reply_to_message_id is None:
|
|
logger.debug(
|
|
"No Telegram mapping found for reply target %s", linked_message.id
|
|
)
|
|
|
|
assert forwarder is not None
|
|
try:
|
|
telegram_message_id = await forwarder.forward(
|
|
text,
|
|
attachments_to_forward,
|
|
reply_to_message_id=reply_to_message_id,
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to forward message %s", message.id)
|
|
return
|
|
|
|
if store is not None:
|
|
try:
|
|
await store.record_forwarded(
|
|
target_chat_id, message.id, telegram_message_id
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to record forwarded message mapping for %s", message.id
|
|
)
|
|
|
|
try:
|
|
await client.start()
|
|
except Exception:
|
|
logger.exception("Max client stopped unexpectedly")
|
|
finally:
|
|
if forwarder is not None:
|
|
await forwarder.close()
|
|
stop_event.set()
|
|
|
|
await stop_event.wait()
|