Skip to content
Commits on Source (3)
.venv
\ No newline at end of file
from flask import Flask, request as flask_request, jsonify
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import ApplicationBuilder, CallbackQueryHandler, ContextTypes
import asyncio
import os
import time
import threading
import asyncio
from typing import Dict, Any
BOT_TOKEN = "7755933545:AAFJrL3ueX4E1y1GxkYR73HegWmsVZD-eFc"
pending_requests = {}
event_loop = None
from flask import Flask, request as flask_request, jsonify
import requests
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
)
# ======================
# Настройки
# ======================
BOT_TOKEN = os.getenv("BOT_TOKEN", "7755933545:AAFJrL3ueX4E1y1GxkYR73HegWmsVZD-eFc")
BACKEND_BASE = os.getenv("BACKEND_BASE", "http://localhost:4000") # твой Phoenix
BIND_CONFIRM_PATH = "/api/telegram_link_confirm"
LOGIN_CONFIRM_PATH = "/api/telegram_login_confirm"
# TTL для висящих запросов (чтоб не копились навечно), секунд
PENDING_TTL = 10 * 60
# ======================
# Хранилища в памяти
# ======================
# pending_usernames: ключ — нормализованный tg username, значение — { app_display, expires_at }
pending_usernames: Dict[str, Dict[str, Any]] = {}
# pending_requests: ключ — telegram_id, значение — { request_id, expires_at }
pending_requests: Dict[int, Dict[str, Any]] = {}
# Текущий event loop бота (нужен, чтобы из Flask-треда запускать корутины)
event_loop: asyncio.AbstractEventLoop | None = None
# ======================
# Утилиты
# ======================
def normalize(name: str | None) -> str:
return (name or "").lstrip("@").strip().lower()
def now() -> float:
return time.time()
def with_ttl(payload: Dict[str, Any], ttl=PENDING_TTL) -> Dict[str, Any]:
payload["expires_at"] = now() + ttl
return payload
def sweep_expired():
t = now()
# чистим просроченные привязки
for k in list(pending_usernames.keys()):
if pending_usernames[k]["expires_at"] < t:
pending_usernames.pop(k, None)
# чистим просроченные логины
for k in list(pending_requests.keys()):
if pending_requests[k]["expires_at"] < t:
pending_requests.pop(k, None)
# Периодический таск на очистку (внутри бота)
async def janitor():
while True:
sweep_expired()
await asyncio.sleep(30)
# ======================
# Flask
# ======================
app = Flask(__name__)
@app.route("/notify_telegram", methods=["POST"])
def notify():
@app.route("/notify_bind", methods=["POST"])
def notify_bind():
"""
Триггер из приложения: пользователь нажал «Привязать» и ввёл свой @username.
body: { "tg_username": "...", "app_username": "..." }
"""
data = flask_request.json or {}
tg_username_raw = data.get("tg_username")
app_username = data.get("app_username")
if not tg_username_raw or not app_username:
return jsonify({"error": "tg_username and app_username are required"}), 400
key = normalize(tg_username_raw)
pending_usernames[key] = with_ttl({"app_display": app_username})
return jsonify({"status": "pending"})
@app.route("/notify_login", methods=["POST"])
def notify_login():
"""
Триггер из приложения: для уже привязанного пользователя пришёл запрос на вход.
body: { "telegram_id": 123, "username": "app_nick", "request_id": "uuid" }
"""
global event_loop
data = flask_request.json or {}
data = flask_request.json
telegram_id = data["telegram_id"]
try:
telegram_id = int(data["telegram_id"])
username = data["username"]
request_id = data["request_id"]
except Exception:
return jsonify({"error": "telegram_id, username, request_id are required"}), 400
pending_requests[telegram_id] = request_id
pending_requests[telegram_id] = with_ttl({"request_id": request_id})
if event_loop:
asyncio.run_coroutine_threadsafe(
send_confirm(telegram_id, username),
send_login_confirm(telegram_id, username),
event_loop
)
return jsonify({"status": "sent"})
else:
return jsonify({"error": "event loop not ready"}), 503
async def send_confirm(telegram_id, username):
keyboard = [
[
InlineKeyboardButton("✅ Да", callback_data="approve"),
InlineKeyboardButton("❌ Нет", callback_data="reject")
]
]
markup = InlineKeyboardMarkup(keyboard)
# ======================
# Telegram Bot
# ======================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
/start — проверяем, есть ли ожидание привязки для username юзера.
"""
tg_username_key = normalize(getattr(update.effective_user, "username", None))
entry = pending_usernames.get(tg_username_key)
if entry:
app_username_display = entry["app_display"]
keyboard = [[
InlineKeyboardButton("✅ Да", callback_data="bind_approve"),
InlineKeyboardButton("❌ Нет", callback_data="bind_reject"),
]]
await update.message.reply_text(
f"Это ты в Suggero с ником {app_username_display} пытаешься привязать Telegram?",
reply_markup=InlineKeyboardMarkup(keyboard)
)
else:
await update.message.reply_text(
"Чтобы привязать Telegram, вернись в приложение, нажми «Привязать» и укажи свой Telegram‑ник."
)
async def send_login_confirm(telegram_id: int, username: str):
"""
Отправляем запрос на подтверждение входа уже привязанному пользователю.
"""
keyboard = [[
InlineKeyboardButton("✅ Да", callback_data="approve"),
InlineKeyboardButton("❌ Нет", callback_data="reject"),
]]
await bot_app.bot.send_message(
chat_id=telegram_id,
text=f"Пользователь {username} пытается войти. Подтвердите вход:",
reply_markup=markup
text=f"Пользователь {username} пытается войти. Подтвердить вход?",
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Единый обработчик всех inline-кнопок:
- bind_approve / bind_reject — подтверждение привязки учётки
- approve / reject — подтверждение входа
"""
query = update.callback_query
await query.answer()
telegram_id = query.from_user.id
request_id = pending_requests.get(telegram_id)
confirmed = query.data == "approve"
# --- ветка привязки (по username) ---
if query.data in ("bind_approve", "bind_reject"):
tg_username_key = normalize(getattr(update.effective_user, "username", None))
entry = pending_usernames.get(tg_username_key)
confirmed = (query.data == "bind_approve")
if entry:
app_username_display = entry["app_display"]
try:
requests.post(
f"{BACKEND_BASE}{BIND_CONFIRM_PATH}",
json={
"username": app_username_display, # ник из твоего приложения
"telegram_id": update.effective_user.id,
"confirmed": confirmed,
},
timeout=5,
)
except Exception:
# логирование по вкусу
pass
import requests
requests.post("http://localhost:4000/api/telegram_login_confirm", json={
"request_id": request_id,
"confirmed": confirmed
})
await query.edit_message_text("✅ Привязка подтверждена" if confirmed else "❌ Привязка отменена")
pending_usernames.pop(tg_username_key, None)
else:
await query.edit_message_text("Сессия привязки не найдена. Нажми «Привязать» в приложении.")
return
# --- ветка логина (по telegram_id) ---
if query.data in ("approve", "reject"):
telegram_id = query.from_user.id
slot = pending_requests.get(telegram_id)
confirmed = (query.data == "approve")
if slot:
request_id = slot["request_id"]
try:
requests.post(
f"{BACKEND_BASE}{LOGIN_CONFIRM_PATH}",
json={"request_id": request_id, "confirmed": confirmed},
timeout=5,
)
except Exception:
pass
await query.edit_message_text("✅ Вход подтверждён" if confirmed else "❌ Вход отменён")
pending_requests.pop(telegram_id, None)
else:
await query.edit_message_text("Запрос на вход не найден или устарел.")
return
# На случай неожиданных payload’ов
await query.edit_message_text("Неизвестное действие.")
# Инициализация PTB‑приложения
bot_app = ApplicationBuilder().token(BOT_TOKEN).build()
bot_app.add_handler(CommandHandler("start", start))
bot_app.add_handler(CallbackQueryHandler(handle_callback))
# ======================
# Bootstrap
# ======================
def run_flask():
app.run(host="0.0.0.0", port=8000)
# Можно включить threaded=True; у нас всё равно «огорожено» очередью в телеграм через event_loop
app.run(host="0.0.0.0", port=8000, threaded=True)
async def run_bot():
global event_loop
event_loop = asyncio.get_running_loop()
# запуск фонового чистильщика TTL без JobQueue
asyncio.create_task(janitor())
await bot_app.initialize()
await bot_app.start()
await bot_app.updater.start_polling()
await asyncio.Event().wait()
if __name__ == "__main__":
threading.Thread(target=run_flask, daemon=True).start()
asyncio.run(run_bot())