Files
ad-infr-control/tg-bot/main.py

425 lines
17 KiB
Python
Raw Normal View History

2025-12-02 14:01:34 +03:00
"""
Telegram Bot for A/D Infrastructure
Sends notifications to group chat
"""
import os
2025-12-04 13:32:41 +03:00
import asyncio
import aiohttp
2025-12-02 14:01:34 +03:00
from datetime import datetime
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import asyncpg
2025-12-04 13:26:39 +03:00
from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update
2025-12-02 14:01:34 +03:00
from telegram.error import TelegramError
from contextlib import asynccontextmanager
# Configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://adctrl:adctrl@postgres:5432/adctrl")
SECRET_TOKEN = os.getenv("SECRET_TOKEN", "change-me-in-production")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
2025-12-04 13:32:41 +03:00
CONTROLLER_API = os.getenv("CONTROLLER_API", "http://controller:8001")
2025-12-02 14:01:34 +03:00
# Database pool and bot
db_pool = None
bot = None
2025-12-04 13:32:41 +03:00
update_offset = 0
polling_task = None
2025-12-04 13:26:39 +03:00
2025-12-04 13:32:41 +03:00
async def handle_button_click(callback_data: str, chat_id: int, message_id: int):
"""Handle inline button click"""
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Received callback: {callback_data} from chat {chat_id}")
2025-12-04 13:32:41 +03:00
if not callback_data.startswith("service_"):
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Invalid callback prefix: {callback_data}")
2025-12-04 13:32:41 +03:00
return
2025-12-04 13:26:39 +03:00
2025-12-04 14:19:15 +03:00
# Parse: service_{action}_{id_or_name}
parts = callback_data.split("_", 2)
if len(parts) != 3:
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Invalid callback format: {callback_data} (got {len(parts)} parts)")
2025-12-04 13:32:41 +03:00
return
2025-12-04 13:26:39 +03:00
2025-12-04 14:19:15 +03:00
action = parts[1] # start, stop, restart
identifier = parts[2] # numeric service_id or 'name_{service_name}'
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Action: {action}, Identifier: {identifier}")
2025-12-04 14:19:15 +03:00
# Determine if identifier is service_id (numeric) or service_name (prefixed with 'name_')
if identifier.startswith('name_'):
# Extract service name from identifier
service_name = identifier[5:] # Remove 'name_' prefix
service_id = None
2025-12-04 14:36:23 +03:00
print(f"[BUTTON] Looking up service by name or alias: {service_name}")
2025-12-04 14:19:15 +03:00
# Look up service_id from controller API
try:
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Contacting controller at {CONTROLLER_API}/services")
2025-12-04 14:19:15 +03:00
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {SECRET_TOKEN}"}
async with session.get(f"{CONTROLLER_API}/services", headers=headers) as resp:
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Controller response: HTTP {resp.status}")
2025-12-04 14:19:15 +03:00
if resp.status == 200:
services = await resp.json()
2025-12-04 14:36:23 +03:00
print(f"[BUTTON] Found {len(services)} services")
# Find service by name or alias
2025-12-04 14:19:15 +03:00
for svc in services:
2025-12-04 14:36:23 +03:00
if svc.get('name') == service_name or svc.get('alias') == service_name:
2025-12-04 14:19:15 +03:00
service_id = svc.get('id')
2025-12-04 14:36:23 +03:00
matched_by = "name" if svc.get('name') == service_name else "alias"
print(f"[BUTTON] Matched service '{service_name}' to ID {service_id} (by {matched_by})")
2025-12-04 14:19:15 +03:00
break
if not service_id:
2025-12-04 14:36:23 +03:00
error_msg = f"❌ Service '{service_name}' not registered in controller\nPlease add alias in web panel"
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] {error_msg}")
2025-12-04 14:19:15 +03:00
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
2025-12-04 14:26:15 +03:00
text=error_msg
2025-12-04 14:19:15 +03:00
)
return
else:
2025-12-04 14:26:15 +03:00
error_msg = f"❌ Failed to fetch services (HTTP {resp.status})"
print(f"[BUTTON] {error_msg}")
2025-12-04 14:19:15 +03:00
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
2025-12-04 14:26:15 +03:00
text=error_msg
2025-12-04 14:19:15 +03:00
)
return
except Exception as e:
2025-12-04 14:26:15 +03:00
error_msg = f"❌ Error: {str(e)[:100]}"
print(f"[BUTTON] Exception during service lookup: {str(e)}")
2025-12-04 14:19:15 +03:00
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
2025-12-04 14:26:15 +03:00
text=error_msg
2025-12-04 14:19:15 +03:00
)
await log_message(chat_id, f"Service lookup error", False, str(e))
return
else:
# Identifier is numeric service_id
try:
service_id = int(identifier)
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Using numeric service_id: {service_id}")
2025-12-04 14:19:15 +03:00
except ValueError:
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Failed to parse service_id from: {identifier}")
2025-12-04 14:19:15 +03:00
return
2025-12-04 13:32:41 +03:00
try:
2025-12-04 14:26:15 +03:00
api_url = f"{CONTROLLER_API}/services/{service_id}/action"
print(f"[BUTTON] Executing {action} on service {service_id} at {api_url}")
2025-12-04 13:32:41 +03:00
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {SECRET_TOKEN}"}
data = {"action": action}
2025-12-04 14:26:15 +03:00
print(f"[BUTTON] Sending POST request with data: {data}")
2025-12-04 13:32:41 +03:00
async with session.post(api_url, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
2025-12-04 14:26:15 +03:00
response_text = await resp.text()
print(f"[BUTTON] Controller response: HTTP {resp.status}")
print(f"[BUTTON] Response body: {response_text[:200]}")
2025-12-04 13:32:41 +03:00
if resp.status == 200:
2025-12-04 14:26:15 +03:00
success_msg = f"✅ Service action '{action}' executed successfully"
print(f"[BUTTON] Success: {success_msg}")
2025-12-04 13:32:41 +03:00
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
2025-12-04 14:26:15 +03:00
text=success_msg
2025-12-04 13:32:41 +03:00
)
await log_message(chat_id, f"Action: {action} on service {service_id}", True)
else:
2025-12-04 14:26:15 +03:00
error_msg = f"❌ Failed: {response_text[:100]}"
print(f"[BUTTON] Failed: {error_msg}")
2025-12-04 13:32:41 +03:00
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
2025-12-04 14:26:15 +03:00
text=error_msg
2025-12-04 13:32:41 +03:00
)
2025-12-04 14:26:15 +03:00
await log_message(chat_id, f"Action failed: {action}", False, response_text)
2025-12-04 13:32:41 +03:00
except Exception as e:
2025-12-04 14:26:15 +03:00
error_msg = f"❌ Exception: {str(e)[:100]}"
print(f"[BUTTON] Exception during action execution: {str(e)}")
print(f"[BUTTON] Full traceback:", exc_info=True)
2025-12-04 13:32:41 +03:00
await log_message(chat_id, f"Action error: {action}", False, str(e))
async def poll_updates():
"""Poll Telegram for updates (button clicks)"""
global update_offset
if not bot:
2025-12-04 14:26:15 +03:00
print("[POLLING] Bot not configured, skipping polling")
2025-12-04 13:32:41 +03:00
return
2025-12-04 14:26:15 +03:00
print("[POLLING] Starting update polling")
2025-12-04 14:29:35 +03:00
retry_count = 0
max_retries = 5
2025-12-04 13:32:41 +03:00
while True:
2025-12-04 13:26:39 +03:00
try:
2025-12-04 14:29:35 +03:00
retry_count = 0 # Reset on successful request
print("[POLLING] Calling get_updates...")
updates = await bot.get_updates(offset=update_offset, timeout=30, allowed_updates=["callback_query"])
2025-12-04 13:26:39 +03:00
2025-12-04 14:26:15 +03:00
if updates:
print(f"[POLLING] Received {len(updates)} updates")
2025-12-04 13:32:41 +03:00
for update in updates:
update_offset = update.update_id + 1
2025-12-04 13:26:39 +03:00
2025-12-04 13:32:41 +03:00
if update.callback_query:
query = update.callback_query
2025-12-04 14:26:15 +03:00
print(f"[POLLING] Processing callback query: {query.data}")
2025-12-04 13:32:41 +03:00
await handle_button_click(
query.data,
query.message.chat_id,
query.message.message_id
)
await bot.answer_callback_query(query.id)
2025-12-04 14:26:15 +03:00
print(f"[POLLING] Callback processed successfully")
2025-12-04 14:29:35 +03:00
else:
print(f"[POLLING] Update ID {update.update_id} is not a callback_query")
2025-12-04 13:32:41 +03:00
except asyncio.CancelledError:
2025-12-04 14:26:15 +03:00
print("[POLLING] Polling task cancelled")
2025-12-04 13:32:41 +03:00
break
2025-12-04 13:26:39 +03:00
except Exception as e:
2025-12-04 14:29:35 +03:00
retry_count += 1
print(f"[POLLING] Error in polling (attempt {retry_count}/{max_retries}): {type(e).__name__}: {e}")
2025-12-04 14:26:15 +03:00
import traceback
traceback.print_exc()
2025-12-04 14:29:35 +03:00
if "Conflict" in str(e) or "terminated by other getUpdates" in str(e):
print("[POLLING] CONFLICT: Another bot instance is polling the same token!")
print("[POLLING] Check if another tg-bot container/process is running")
print("[POLLING] Waiting 10 seconds before retry...")
await asyncio.sleep(10)
else:
await asyncio.sleep(5)
2025-12-02 14:01:34 +03:00
class MessageRequest(BaseModel):
message: str
2025-12-04 13:26:39 +03:00
chat_id: str = None
service_id: int = None
service_name: str = None
2025-12-02 14:01:34 +03:00
class BulkMessageRequest(BaseModel):
messages: list[str]
chat_id: str = None
# Auth dependency
async def verify_token(authorization: str = Header(None)):
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization header")
token = authorization.replace("Bearer ", "")
if token != SECRET_TOKEN:
raise HTTPException(status_code=403, detail="Invalid token")
return token
# Database functions
async def get_db():
return await db_pool.acquire()
async def release_db(conn):
await db_pool.release(conn)
async def log_message(chat_id: int, message: str, success: bool, error_message: str = None):
"""Log sent message to database"""
conn = await db_pool.acquire()
try:
await conn.execute(
"INSERT INTO telegram_messages (chat_id, message, success, error_message) VALUES ($1, $2, $3, $4)",
chat_id, message, success, error_message
)
finally:
await db_pool.release(conn)
# Lifespan context
@asynccontextmanager
async def lifespan(app: FastAPI):
2025-12-04 13:32:41 +03:00
global db_pool, bot, polling_task
2025-12-04 14:26:15 +03:00
print("[STARTUP] Initializing Telegram Bot API")
print(f"[STARTUP] DATABASE_URL: {DATABASE_URL}")
print(f"[STARTUP] CONTROLLER_API: {CONTROLLER_API}")
print(f"[STARTUP] TELEGRAM_BOT_TOKEN configured: {bool(TELEGRAM_BOT_TOKEN)}")
print(f"[STARTUP] TELEGRAM_CHAT_ID: {TELEGRAM_CHAT_ID}")
2025-12-02 14:01:34 +03:00
db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
2025-12-04 14:26:15 +03:00
print("[STARTUP] Database pool created")
2025-12-02 14:01:34 +03:00
if TELEGRAM_BOT_TOKEN:
bot = Bot(token=TELEGRAM_BOT_TOKEN)
2025-12-04 14:26:15 +03:00
print("[STARTUP] Telegram Bot initialized")
2025-12-04 13:32:41 +03:00
polling_task = asyncio.create_task(poll_updates())
2025-12-04 14:26:15 +03:00
print("[STARTUP] Polling task created")
else:
print("[STARTUP] WARNING: TELEGRAM_BOT_TOKEN not configured!")
2025-12-02 14:01:34 +03:00
yield
2025-12-04 14:26:15 +03:00
print("[SHUTDOWN] Shutting down Telegram Bot API")
2025-12-04 13:32:41 +03:00
if polling_task:
polling_task.cancel()
try:
await polling_task
except asyncio.CancelledError:
pass
2025-12-04 13:26:39 +03:00
2025-12-02 14:01:34 +03:00
await db_pool.close()
2025-12-04 14:26:15 +03:00
print("[SHUTDOWN] Database pool closed")
2025-12-02 14:01:34 +03:00
app = FastAPI(title="Telegram Bot API", lifespan=lifespan)
# API Endpoints
@app.get("/health")
async def health_check():
return {
"status": "ok",
"bot_configured": bot is not None,
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/send", dependencies=[Depends(verify_token)])
async def send_message(request: MessageRequest):
2025-12-04 13:26:39 +03:00
"""Send a message to telegram chat with optional service control buttons"""
2025-12-04 14:29:35 +03:00
print(f"[SEND] Received message request: {request.message[:50]}...")
print(f"[SEND] service_id={request.service_id}, service_name={request.service_name}")
2025-12-02 14:01:34 +03:00
if not bot:
2025-12-04 14:29:35 +03:00
print("[SEND] ERROR: Bot not configured")
2025-12-02 14:01:34 +03:00
raise HTTPException(status_code=503, detail="Telegram bot not configured")
chat_id = request.chat_id or TELEGRAM_CHAT_ID
if not chat_id:
2025-12-04 14:29:35 +03:00
print("[SEND] ERROR: No chat_id provided")
2025-12-02 14:01:34 +03:00
raise HTTPException(status_code=400, detail="No chat_id provided and no default configured")
try:
2025-12-04 13:26:39 +03:00
kwargs = {
"chat_id": int(chat_id),
"text": request.message,
"parse_mode": "HTML"
}
2025-12-02 14:01:34 +03:00
2025-12-04 14:19:15 +03:00
# Add inline buttons for service control if service_id or service_name is provided
if request.service_id or request.service_name:
# Use service_id if available, otherwise use service_name prefixed with 'name_'
identifier = str(request.service_id) if request.service_id else f"name_{request.service_name}"
2025-12-04 14:29:35 +03:00
print(f"[SEND] Adding buttons with identifier: {identifier}")
2025-12-04 13:26:39 +03:00
keyboard = [
[
2025-12-04 14:19:15 +03:00
InlineKeyboardButton("▶️ Start", callback_data=f"service_start_{identifier}"),
InlineKeyboardButton("⏹️ Stop", callback_data=f"service_stop_{identifier}"),
InlineKeyboardButton("🔄 Restart", callback_data=f"service_restart_{identifier}")
2025-12-04 13:26:39 +03:00
]
]
kwargs["reply_markup"] = InlineKeyboardMarkup(keyboard)
2025-12-04 14:29:35 +03:00
print(f"[SEND] Buttons added to message")
else:
print(f"[SEND] No buttons - no service_id or service_name provided")
2025-12-04 13:26:39 +03:00
2025-12-04 14:29:35 +03:00
print(f"[SEND] Sending message to chat {chat_id}")
2025-12-04 13:26:39 +03:00
message = await bot.send_message(**kwargs)
2025-12-02 14:01:34 +03:00
await log_message(int(chat_id), request.message, True)
2025-12-04 14:29:35 +03:00
print(f"[SEND] Message sent successfully, message_id={message.message_id}")
2025-12-02 14:01:34 +03:00
return {
"status": "sent",
"message_id": message.message_id,
2025-12-04 14:29:35 +03:00
"chat_id": chat_id,
"has_buttons": bool(request.service_id or request.service_name)
2025-12-02 14:01:34 +03:00
}
except TelegramError as e:
2025-12-04 14:29:35 +03:00
print(f"[SEND] TelegramError: {str(e)}")
2025-12-02 14:01:34 +03:00
await log_message(int(chat_id), request.message, False, str(e))
raise HTTPException(status_code=500, detail=f"Failed to send message: {str(e)}")
@app.post("/send-bulk", dependencies=[Depends(verify_token)])
async def send_bulk_messages(request: BulkMessageRequest):
"""Send multiple messages to telegram chat"""
if not bot:
raise HTTPException(status_code=503, detail="Telegram bot not configured")
chat_id = request.chat_id or TELEGRAM_CHAT_ID
if not chat_id:
raise HTTPException(status_code=400, detail="No chat_id provided and no default configured")
results = []
for msg in request.messages:
try:
message = await bot.send_message(
chat_id=int(chat_id),
text=msg,
parse_mode='HTML'
)
await log_message(int(chat_id), msg, True)
results.append({
"status": "sent",
"message_id": message.message_id,
"message": msg[:50] + "..." if len(msg) > 50 else msg
})
except TelegramError as e:
await log_message(int(chat_id), msg, False, str(e))
results.append({
"status": "failed",
"error": str(e),
"message": msg[:50] + "..." if len(msg) > 50 else msg
})
return {"results": results, "total": len(results)}
@app.get("/messages", dependencies=[Depends(verify_token)])
async def get_message_history(limit: int = 50):
"""Get message sending history"""
conn = await get_db()
try:
rows = await conn.fetch(
"SELECT * FROM telegram_messages ORDER BY sent_at DESC LIMIT $1",
limit
)
return [dict(row) for row in rows]
finally:
await release_db(conn)
@app.get("/stats", dependencies=[Depends(verify_token)])
async def get_stats():
"""Get message statistics"""
conn = await get_db()
try:
total = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages")
successful = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = true")
failed = await conn.fetchval("SELECT COUNT(*) FROM telegram_messages WHERE success = false")
return {
"total_messages": total,
"successful": successful,
"failed": failed,
"success_rate": (successful / total * 100) if total > 0 else 0
}
finally:
await release_db(conn)
@app.post("/test", dependencies=[Depends(verify_token)])
async def test_connection():
"""Test telegram bot connection"""
if not bot:
raise HTTPException(status_code=503, detail="Telegram bot not configured")
try:
me = await bot.get_me()
return {
"status": "ok",
"bot_username": me.username,
"bot_name": me.first_name,
"bot_id": me.id
}
except TelegramError as e:
raise HTTPException(status_code=500, detail=f"Bot test failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8003)