From c12beb95e204f53540c3173ffe40a8da9e80be64 Mon Sep 17 00:00:00 2001 From: n08i40k Date: Sat, 1 Nov 2025 03:10:21 +0400 Subject: [PATCH] feat: initial commit --- .dockerignore | 15 ++ .github/workflows/docker-publish.yml | 45 +++++ .gitignore | 15 ++ Dockerfile | 23 +++ bridge.py | 29 +++ bridge_app/__init__.py | 12 ++ bridge_app/config.py | 159 +++++++++++++++ bridge_app/forwarding.py | 226 +++++++++++++++++++++ bridge_app/max_bridge.py | 284 +++++++++++++++++++++++++++ bridge_app/storage.py | 105 ++++++++++ requirements.txt | 2 + 11 files changed, 915 insertions(+) create mode 100644 .dockerignore create mode 100644 .github/workflows/docker-publish.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 bridge.py create mode 100644 bridge_app/__init__.py create mode 100644 bridge_app/config.py create mode 100644 bridge_app/forwarding.py create mode 100644 bridge_app/max_bridge.py create mode 100644 bridge_app/storage.py create mode 100644 requirements.txt diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..485e5d7 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,15 @@ +.git +.github +.venv +__pycache__ +*.pyc +*.pyo +*.pyd +*.sqlite3 +*.db +max_session +forwarded_messages.db +.pytest_cache +.mypy_cache +dist +build diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 0000000..c2486d8 --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,45 @@ +name: Build and Push Docker Image + +on: + push: + branches: + - master + workflow_dispatch: + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to container registry + uses: docker/login-action@v3 + with: + registry: registry.n08i40k.ru + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Extract metadata for image tags + id: meta + uses: docker/metadata-action@v5 + with: + images: registry.n08i40k.ru/n08i40k/max-messages-forwarder + + - name: Build and push image + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + + - name: Deploy + run: curl ${{ secrets.DEPLOY_URL }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485e5d7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +.git +.github +.venv +__pycache__ +*.pyc +*.pyo +*.pyd +*.sqlite3 +*.db +max_session +forwarded_messages.db +.pytest_cache +.mypy_cache +dist +build diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0c8b6df --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +# syntax=docker/dockerfile:1 + +FROM python:3.11-slim AS runtime + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + VIRTUAL_ENV=/opt/venv + +RUN python -m venv "$VIRTUAL_ENV" +ENV PATH="$VIRTUAL_ENV/bin:$PATH" + +WORKDIR /app + +COPY requirements.txt ./ +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -r requirements.txt + +COPY bridge.py ./ +COPY bridge_app ./bridge_app + +VOLUME ["/app/max_session"] + +ENTRYPOINT ["python", "bridge.py"] diff --git a/bridge.py b/bridge.py new file mode 100644 index 0000000..d33b549 --- /dev/null +++ b/bridge.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +"""Bridge messages from a Max group chat to a Telegram group using bot tokens.""" +from __future__ import annotations + +import asyncio +import logging + +from bridge_app import parse_args, run_bridge + + +def configure_logging(debug: bool) -> None: + level = logging.DEBUG if debug else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + +def main() -> None: + config, list_chats, debug = parse_args() + configure_logging(debug) + try: + asyncio.run(run_bridge(config, list_chats)) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/bridge_app/__init__.py b/bridge_app/__init__.py new file mode 100644 index 0000000..608b5ed --- /dev/null +++ b/bridge_app/__init__.py @@ -0,0 +1,12 @@ +"""Reusable components for the Max ↔ Telegram bridge CLI.""" +from .config import Config, parse_args +from .max_bridge import list_chats_and_exit, run_bridge +from .storage import ForwardedMessageStore + +__all__ = [ + "Config", + "parse_args", + "list_chats_and_exit", + "run_bridge", + "ForwardedMessageStore", +] diff --git a/bridge_app/config.py b/bridge_app/config.py new file mode 100644 index 0000000..67e951b --- /dev/null +++ b/bridge_app/config.py @@ -0,0 +1,159 @@ +"""Configuration models and CLI argument parsing for the Max ↔ Telegram bridge.""" +from __future__ import annotations + +import argparse +import os +from dataclasses import dataclass +from pathlib import Path + + +def _parse_bool(value: str) -> bool: + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "y", "on"}: + return True + if normalized in {"0", "false", "no", "n", "off"}: + return False + raise ValueError(value) + + +@dataclass(slots=True) +class Config: + max_phone: str + max_chat_id: int | None + telegram_bot_token: str | None + telegram_chat_id: int | None + telegram_test_dc: bool + max_token: str | None = None + max_ws_uri: str | None = None + work_dir: str = "./max_session" + disable_fake_telemetry: bool = False + db_path: str = "" + + +def parse_args() -> tuple[Config, bool, bool]: + """Parse CLI arguments and environment variables into a Config.""" + parser = argparse.ArgumentParser( + description=( + "Forward messages from a Max group chat to a Telegram chat. " + "Configuration can come from CLI flags or environment variables." + ) + ) + parser.add_argument( + "--max-phone", + default=os.getenv("MAX_PHONE"), + help="Phone number used to authenticate in Max (env: MAX_PHONE)", + ) + parser.add_argument( + "--max-token", + default=os.getenv("MAX_TOKEN"), + help="Optional auth token for Max session (env: MAX_TOKEN)", + ) + parser.add_argument( + "--max-chat-id", + type=int, + default=int(os.getenv("MAX_CHAT_ID")) if os.getenv("MAX_CHAT_ID") else None, + help="ID of the Max group chat to mirror (env: MAX_CHAT_ID)", + ) + parser.add_argument( + "--max-ws-uri", + default=os.getenv("MAX_WS_URI"), + help="Override the default Max WebSocket URI (env: MAX_WS_URI)", + ) + parser.add_argument( + "--work-dir", + default=os.getenv("MAX_WORKDIR", "./max_session"), + help="Directory to store Max session database (env: MAX_WORKDIR)", + ) + parser.add_argument( + "--telegram-bot-token", + default=os.getenv("TELEGRAM_BOT_TOKEN"), + help="Telegram bot token (env: TELEGRAM_BOT_TOKEN)", + ) + parser.add_argument( + "--telegram-chat-id", + type=int, + default=( + int(os.getenv("TELEGRAM_CHAT_ID")) + if os.getenv("TELEGRAM_CHAT_ID") + else None + ), + help="Telegram chat ID that should receive forwarded messages (env: TELEGRAM_CHAT_ID)", + ) + telegram_test_dc_default = False + telegram_test_dc_env = os.getenv("TELEGRAM_TEST_DC") + if telegram_test_dc_env is not None: + try: + telegram_test_dc_default = _parse_bool(telegram_test_dc_env) + except ValueError: + parser.error( + "Invalid boolean value for TELEGRAM_TEST_DC: " + f"{telegram_test_dc_env}" + ) + parser.add_argument( + "--telegram-test-dc", + dest="telegram_test_dc", + action="store_true", + help="Use Telegram test data-center for bot (env: TELEGRAM_TEST_DC)", + ) + parser.add_argument( + "--no-telegram-test-dc", + dest="telegram_test_dc", + action="store_false", + help=argparse.SUPPRESS, + ) + parser.set_defaults(telegram_test_dc=telegram_test_dc_default) + parser.add_argument( + "--db-path", + default=os.getenv("MAX_BRIDGE_DB_PATH"), + help=( + "Path to SQLite database storing forwarded message mapping " + "(env: MAX_BRIDGE_DB_PATH). Defaults to /forwarded_messages.db" + ), + ) + parser.add_argument( + "--disable-fake-telemetry", + action="store_true", + help="Disable sending fake telemetry pings required by the Max client", + ) + parser.add_argument( + "--list-chats", + action="store_true", + help="List available Max chats and exit without forwarding", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + if not args.max_phone: + parser.error("--max-phone or MAX_PHONE environment variable is required") + + if not args.list_chats: + missing: list[str] = [] + if args.max_chat_id is None: + missing.append("--max-chat-id / MAX_CHAT_ID") + if not args.telegram_bot_token: + missing.append("--telegram-bot-token / TELEGRAM_BOT_TOKEN") + if args.telegram_chat_id is None: + missing.append("--telegram-chat-id / TELEGRAM_CHAT_ID") + if missing: + parser.error("Missing required options: " + ", ".join(missing)) + + db_path = args.db_path or str(Path(args.work_dir) / "forwarded_messages.db") + + config = Config( + max_phone=args.max_phone, + max_chat_id=args.max_chat_id, + telegram_bot_token=args.telegram_bot_token, + telegram_chat_id=args.telegram_chat_id, + telegram_test_dc=args.telegram_test_dc, + max_token=args.max_token, + max_ws_uri=args.max_ws_uri, + work_dir=args.work_dir, + disable_fake_telemetry=args.disable_fake_telemetry, + db_path=db_path, + ) + return config, bool(args.list_chats), bool(args.debug) diff --git a/bridge_app/forwarding.py b/bridge_app/forwarding.py new file mode 100644 index 0000000..6f63de4 --- /dev/null +++ b/bridge_app/forwarding.py @@ -0,0 +1,226 @@ +"""Tools for deduplicating and forwarding Max messages to Telegram.""" +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass +from typing import Iterable, Literal, Optional, Sequence + +from aiogram import Bot +from aiogram.client.session.aiohttp import AiohttpSession +from aiogram.client.telegram import PRODUCTION, TEST, TelegramAPIServer +from aiogram.types import BufferedInputFile, InputMediaPhoto, InputMediaVideo + +from pymax.static import AttachType +from pymax.types import Message, User + + +class MessageDeduplicator: + """Keep a bounded set of processed message IDs.""" + + def __init__(self, limit: int = 1024) -> None: + self._limit = limit + self._order: deque[int] = deque() + self._seen: set[int] = set() + + def add(self, message_id: int) -> bool: + if message_id in self._seen: + return False + self._seen.add(message_id) + self._order.append(message_id) + if len(self._order) > self._limit: + old = self._order.popleft() + self._seen.discard(old) + return True + + +class TelegramForwarder: + def __init__(self, bot_token: str, chat_id: int, test_dc: bool) -> None: + self._bot_token = bot_token + self._chat_id = chat_id + self._test_dc = test_dc + self._bot: Bot | None = None + + async def start(self) -> None: + if self._bot is not None: + return + server: TelegramAPIServer = TEST if self._test_dc else PRODUCTION + self._bot = Bot(self._bot_token, session=AiohttpSession(api=server)) + + async def close(self) -> None: + if self._bot is not None: + await self._bot.session.close() + self._bot = None + + async def send_message( + self, text: str, *, reply_to_message_id: int | None = None + ) -> int: + if self._bot is None: + raise RuntimeError("TelegramForwarder is not started") + message = await self._bot.send_message( + chat_id=self._chat_id, + text=text, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=reply_to_message_id is not None, + parse_mode="MARKDOWN", + ) + return int(message.message_id) + + async def forward( + self, + text: str, + attachments: Sequence["ForwardAttachment"], + *, + reply_to_message_id: int | None = None, + ) -> int: + """Send text and optional media, returning the primary Telegram message id.""" + + if not attachments: + return await self.send_message( + text, reply_to_message_id=reply_to_message_id + ) + + caption = text or None + if caption and len(caption) > 1024: + primary_id = await self.send_message( + text, reply_to_message_id=reply_to_message_id + ) + await self.send_attachments( + attachments, + reply_to_message_id=primary_id, + ) + return primary_id + + return await self.send_attachments( + attachments, + reply_to_message_id=reply_to_message_id, + caption=caption, + ) + + async def send_attachments( + self, + attachments: Sequence["ForwardAttachment"], + *, + reply_to_message_id: int | None = None, + caption: str | None = None, + ) -> int: + """Send one or more media attachments, returning the lead message id.""" + + if self._bot is None: + raise RuntimeError("TelegramForwarder is not started") + if not attachments: + raise ValueError("attachments must not be empty") + + allow_without_reply = reply_to_message_id is not None + + if len(attachments) == 1: + attachment = attachments[0] + if attachment.kind == "photo": + message = await self._bot.send_photo( + chat_id=self._chat_id, + photo=attachment.media, + caption=caption, + parse_mode="MARKDOWN" if caption else None, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_without_reply, + ) + elif attachment.kind == "video": + message = await self._bot.send_video( + chat_id=self._chat_id, + video=attachment.media, + caption=caption, + parse_mode="MARKDOWN" if caption else None, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_without_reply, + ) + else: + raise ValueError(f"Unsupported attachment kind: {attachment.kind}") + return int(message.message_id) + + media_group = [] + for index, attachment in enumerate(attachments): + item_caption = caption if index == 0 else None + if attachment.kind == "photo": + media_group.append( + InputMediaPhoto( + media=attachment.media, + caption=item_caption, + parse_mode="MARKDOWN" if item_caption else None, + ) + ) + elif attachment.kind == "video": + media_group.append( + InputMediaVideo( + media=attachment.media, + caption=item_caption, + parse_mode="MARKDOWN" if item_caption else None, + ) + ) + else: + raise ValueError(f"Unsupported attachment kind: {attachment.kind}") + + messages = await self._bot.send_media_group( + chat_id=self._chat_id, + media=media_group, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_without_reply, + ) + return int(messages[0].message_id) + + +@dataclass(slots=True) +class ForwardAttachment: + kind: Literal["photo", "video"] + media: str | BufferedInputFile + + +def format_sender(user: Optional[User], fallback_sender_id: Optional[int]) -> str: + if user and user.names: + return user.names[0].name + if fallback_sender_id is not None: + return f"user:{fallback_sender_id}" + return "system" + + +def iter_attachment_descriptions(message: Message) -> Iterable[str]: + if not message.attaches: + return [] + descriptions: list[str] = [] + for attach in message.attaches: + attach_type = getattr(attach, "type", None) + if isinstance(attach_type, AttachType): + type_name = attach_type.value.lower() + else: + type_name = str(attach_type) + + parts = [type_name] + base_url = getattr(attach, "base_url", None) + if base_url: + parts.append(base_url) + token = getattr(attach, "photo_token", None) or getattr(attach, "token", None) + if token: + parts.append(f"token={token}") + descriptions.append(" ".join(parts)) + return descriptions + + +def build_forward_text( + message: Message, + sender: Optional[User], + *, + include_attachment_descriptions: bool = True, +) -> str: + sender_name = format_sender(sender, message.sender) + + parts: list[str] = [f"От {sender_name}:"] + text = (message.text or "").strip() + if text: + parts.append(text) + attachments: list[str] = [] + if include_attachment_descriptions: + attachments = list(iter_attachment_descriptions(message)) + if attachments: + parts.append("Attachments:") + parts.extend(f"- {item}" for item in attachments) + if not text and not attachments: + parts.append("(empty message)") + return "\n".join(parts) diff --git a/bridge_app/max_bridge.py b/bridge_app/max_bridge.py new file mode 100644 index 0000000..da2e4b7 --- /dev/null +++ b/bridge_app/max_bridge.py @@ -0,0 +1,284 @@ +"""Core bridge runtime for syncing messages from Max to Telegram.""" +from __future__ import annotations + +import asyncio +import base64 +import logging +import signal +from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse + +from aiogram.types import BufferedInputFile + +from pymax import MaxClient +from pymax.types import Message, PhotoAttach, User, VideoAttach + +from .config import Config +from .forwarding import ( + ForwardAttachment, + MessageDeduplicator, + TelegramForwarder, + build_forward_text, +) +from .storage import ForwardedMessageStore + + +def _decode_preview_image(preview_data: str | None) -> bytes | None: + if not preview_data: + return None + payload = preview_data + if preview_data.startswith("data:"): + _, _, payload = preview_data.partition(",") + try: + return base64.b64decode(payload) + except (ValueError, TypeError): + return None + + +def _build_photo_url(attach: PhotoAttach) -> str | None: + base_url = getattr(attach, "base_url", None) + token = getattr(attach, "photo_token", None) + if not base_url or not token: + return None + + if token in base_url: + return base_url + if "{token}" in base_url: + return base_url.replace("{token}", token) + if "{photoToken}" in base_url: + return base_url.replace("{photoToken}", token) + + encoded_token = quote(token, safe="") + if base_url.endswith(("/", "=", "?", ":")): + return base_url + encoded_token + + parsed = urlparse(base_url) + if parsed.query: + if parsed.query.endswith("="): + return base_url + encoded_token + query_params = parse_qsl(parsed.query, keep_blank_values=True) + if not any(key == "token" for key, _ in query_params): + query_params.append(("token", token)) + new_query = urlencode(query_params, doseq=True) + return urlunparse(parsed._replace(query=new_query)) + return base_url + + return f"{base_url.rstrip('/')}/{encoded_token}" + + +async def _collect_forward_attachments( + client: MaxClient, + message: Message, + logger: logging.Logger, +) -> list[ForwardAttachment]: + attachments: list[ForwardAttachment] = [] + for index, attach in enumerate(message.attaches or []): + if isinstance(attach, PhotoAttach): + url = _build_photo_url(attach) + if url: + attachments.append(ForwardAttachment(kind="photo", media=url)) + continue + + preview_bytes = _decode_preview_image(getattr(attach, "preview_data", None)) + if preview_bytes: + filename = f"max_photo_{attach.photo_id or message.id}_{index}.jpg" + attachments.append( + ForwardAttachment( + kind="photo", + media=BufferedInputFile(preview_bytes, filename=filename), + ) + ) + continue + + logger.debug("Skipping photo attachment without usable data: %s", attach) + continue + + if isinstance(attach, VideoAttach): + video_id = getattr(attach, "video_id", None) + if video_id is None: + logger.debug("Video attachment missing video_id: %s", attach) + continue + try: + video_request = await client.get_video_by_id( + message.chat_id, message.id, video_id + ) + except Exception: + logger.exception( + "Failed to resolve video attachment %s for message %s", + video_id, + message.id, + ) + continue + if video_request and getattr(video_request, "url", None): + attachments.append( + ForwardAttachment(kind="video", media=video_request.url) + ) + else: + logger.debug("Skipping video attachment without URL: %s", attach) + continue + + logger.debug("Unsupported attachment type: %s", getattr(attach, "type", None)) + + return attachments + + +async def list_chats_and_exit(client: MaxClient) -> None: + if not client.chats and not client.dialogs and not client.channels: + logging.info("No chats available yet; waiting for initial sync") + + for chat in client.chats: + logging.info("Group chat %s | id=%s", chat.title or "", chat.id) + for chat in client.channels: + logging.info("Channel %s | id=%s", chat.title or "", chat.id) + for dialog in client.dialogs: + if dialog.contact: + logging.info( + "Dialog with %s | id=%s", + dialog.contact.names[0].name if dialog.contact.names else dialog.id, + dialog.id, + ) + else: + logging.info("Dialog id=%s", dialog.id) + + +async def run_bridge(config: Config, list_chats: bool) -> None: + logger = logging.getLogger("bridge") + + client_kwargs: dict[str, object] = { + "phone": config.max_phone, + "work_dir": config.work_dir, + "send_fake_telemetry": not config.disable_fake_telemetry, + } + if config.max_token: + client_kwargs["token"] = config.max_token + if config.max_ws_uri: + client_kwargs["uri"] = config.max_ws_uri + + client = MaxClient(**client_kwargs) # type: ignore[arg-type] + deduplicator = MessageDeduplicator() + process_lock = asyncio.Lock() + stop_event = asyncio.Event() + + async def shutdown() -> None: + logger.info("Shutting down") + await client.close() + stop_event.set() + + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown())) + except NotImplementedError: + # add_signal_handler is not available on Windows event loop policy + signal.signal(sig, lambda _sig, _frame: asyncio.create_task(shutdown())) + + forwarder: TelegramForwarder | None = None + store: ForwardedMessageStore | None = None + if not list_chats: + assert config.telegram_bot_token is not None + assert config.telegram_chat_id is not None + forwarder = TelegramForwarder( + config.telegram_bot_token, config.telegram_chat_id, config.telegram_test_dc + ) + await forwarder.start() + store = ForwardedMessageStore(config.db_path) + try: + await store.initialize() + except Exception: + logger.exception("Failed to initialize forwarded message store") + await forwarder.close() + raise + + @client.on_start + async def _on_start() -> None: + logger.info( + "Connected to Max as %s. Loaded %d chats, %d dialogs, %d channels.", + config.max_phone, + len(client.chats), + len(client.dialogs), + len(client.channels), + ) + if list_chats: + await list_chats_and_exit(client) + await shutdown() + + @client.on_message() + async def _on_message(message: Message) -> None: + if list_chats: + return + async with process_lock: + if message.chat_id is None: + logger.debug("Skipping message without chat_id: %s", message) + return + target_chat_id = config.max_chat_id + if target_chat_id is None: + logger.debug("No target chat configured; skipping message %s", message.id) + return + if message.chat_id != target_chat_id: + return + if not deduplicator.add(message.id): + logger.debug("Skipping duplicate message id=%s", message.id) + return + + sender_user: User | None = None + if message.sender is not None: + try: + sender_user = await client.get_user(message.sender) + except Exception: + logger.exception("Failed to resolve sender %s", message.sender) + + attachments_to_forward = await _collect_forward_attachments( + client, message, logger + ) + text = build_forward_text( + message, + sender_user, + include_attachment_descriptions=not attachments_to_forward, + ) + + reply_to_message_id: int | None = None + if ( + store is not None + and message.link is not None + and getattr(message.link, "message", None) is not None + ): + linked_message = message.link.message + if isinstance(linked_message, Message): + reply_to_message_id = await store.lookup_telegram_message_id( + target_chat_id, linked_message.id + ) + if reply_to_message_id is None: + logger.debug( + "No Telegram mapping found for reply target %s", linked_message.id + ) + + assert forwarder is not None + try: + telegram_message_id = await forwarder.forward( + text, + attachments_to_forward, + reply_to_message_id=reply_to_message_id, + ) + except Exception: + logger.exception("Failed to forward message %s", message.id) + return + + if store is not None: + try: + await store.record_forwarded( + target_chat_id, message.id, telegram_message_id + ) + except Exception: + logger.exception( + "Failed to record forwarded message mapping for %s", message.id + ) + + try: + await client.start() + except Exception: + logger.exception("Max client stopped unexpectedly") + finally: + if forwarder is not None: + await forwarder.close() + stop_event.set() + + await stop_event.wait() diff --git a/bridge_app/storage.py b/bridge_app/storage.py new file mode 100644 index 0000000..5d012f3 --- /dev/null +++ b/bridge_app/storage.py @@ -0,0 +1,105 @@ +"""SQLite-backed storage for forwarded message metadata.""" +from __future__ import annotations + +import asyncio +import sqlite3 +from pathlib import Path +from typing import Optional + + +class ForwardedMessageStore: + """Persist mappings of Max message IDs to Telegram message IDs.""" + + def __init__(self, db_path: str) -> None: + self._path = Path(db_path) + self._initialized = False + self._init_lock = asyncio.Lock() + + async def initialize(self) -> None: + """Ensure the database and table exist.""" + if self._initialized: + return + async with self._init_lock: + if self._initialized: + return + + def setup() -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(self._path) + try: + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS forwarded_messages ( + max_chat_id INTEGER NOT NULL, + max_message_id INTEGER NOT NULL, + telegram_message_id INTEGER NOT NULL, + forwarded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (max_chat_id, max_message_id) + ) + """ + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_forwarded_telegram + ON forwarded_messages (telegram_message_id) + """ + ) + conn.commit() + finally: + conn.close() + + await asyncio.to_thread(setup) + self._initialized = True + + async def record_forwarded( + self, max_chat_id: int, max_message_id: int, telegram_message_id: int + ) -> None: + """Store a mapping for a successfully forwarded message.""" + await self.initialize() + + def insert() -> None: + conn = sqlite3.connect(self._path) + try: + conn.execute( + """ + INSERT OR REPLACE INTO forwarded_messages ( + max_chat_id, + max_message_id, + telegram_message_id + ) VALUES (?, ?, ?) + """, + (max_chat_id, max_message_id, telegram_message_id), + ) + conn.commit() + finally: + conn.close() + + await asyncio.to_thread(insert) + + async def lookup_telegram_message_id( + self, max_chat_id: int, max_message_id: int + ) -> Optional[int]: + """Return the Telegram message ID for a given Max message, if known.""" + await self.initialize() + + def query() -> Optional[int]: + conn = sqlite3.connect(self._path) + try: + cur = conn.execute( + """ + SELECT telegram_message_id + FROM forwarded_messages + WHERE max_chat_id = ? AND max_message_id = ? + """, + (max_chat_id, max_message_id), + ) + row = cur.fetchone() + return int(row[0]) if row else None + finally: + conn.close() + + return await asyncio.to_thread(query) + + +__all__ = ["ForwardedMessageStore"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e10c5c3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +aiogram==3.22.0 +maxapi-python==1.1.13