All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 4m50s
106 lines
3.5 KiB
Python
106 lines
3.5 KiB
Python
"""SQLite-backed storage for forwarded message metadata."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
class ForwardedMessageStore:
|
|
"""Persist mappings of Max message IDs to Telegram message IDs."""
|
|
|
|
def __init__(self, db_path: str) -> None:
|
|
self._path = Path(db_path)
|
|
self._initialized = False
|
|
self._init_lock = asyncio.Lock()
|
|
|
|
async def initialize(self) -> None:
|
|
"""Ensure the database and table exist."""
|
|
if self._initialized:
|
|
return
|
|
async with self._init_lock:
|
|
if self._initialized:
|
|
return
|
|
|
|
def setup() -> None:
|
|
self._path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(self._path)
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS forwarded_messages (
|
|
max_chat_id INTEGER NOT NULL,
|
|
max_message_id INTEGER NOT NULL,
|
|
telegram_message_id INTEGER NOT NULL,
|
|
forwarded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (max_chat_id, max_message_id)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_forwarded_telegram
|
|
ON forwarded_messages (telegram_message_id)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
await asyncio.to_thread(setup)
|
|
self._initialized = True
|
|
|
|
async def record_forwarded(
|
|
self, max_chat_id: int, max_message_id: int, telegram_message_id: int
|
|
) -> None:
|
|
"""Store a mapping for a successfully forwarded message."""
|
|
await self.initialize()
|
|
|
|
def insert() -> None:
|
|
conn = sqlite3.connect(self._path)
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO forwarded_messages (
|
|
max_chat_id,
|
|
max_message_id,
|
|
telegram_message_id
|
|
) VALUES (?, ?, ?)
|
|
""",
|
|
(max_chat_id, max_message_id, telegram_message_id),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
await asyncio.to_thread(insert)
|
|
|
|
async def lookup_telegram_message_id(
|
|
self, max_chat_id: int, max_message_id: int
|
|
) -> Optional[int]:
|
|
"""Return the Telegram message ID for a given Max message, if known."""
|
|
await self.initialize()
|
|
|
|
def query() -> Optional[int]:
|
|
conn = sqlite3.connect(self._path)
|
|
try:
|
|
cur = conn.execute(
|
|
"""
|
|
SELECT telegram_message_id
|
|
FROM forwarded_messages
|
|
WHERE max_chat_id = ? AND max_message_id = ?
|
|
""",
|
|
(max_chat_id, max_message_id),
|
|
)
|
|
row = cur.fetchone()
|
|
return int(row[0]) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
return await asyncio.to_thread(query)
|
|
|
|
|
|
__all__ = ["ForwardedMessageStore"]
|