2025-12-02 14:01:34 +03:00
"""
Scoreboard Injector for ForcAD
Monitors WebSocket for attacks and alerts on critical situations
"""
import os
import json
import asyncio
from datetime import datetime , timedelta
from typing import Optional , Dict , Any
import aiohttp
from fastapi import FastAPI , HTTPException , Depends , Header
from pydantic import BaseModel
import asyncpg
from contextlib import asynccontextmanager
# Configuration
DATABASE_URL = os . getenv ( " DATABASE_URL " , " postgresql://adctrl:adctrl@postgres:5432/adctrl " )
SECRET_TOKEN = os . getenv ( " SECRET_TOKEN " , " change-me-in-production " )
SCOREBOARD_WS_URL = os . getenv ( " SCOREBOARD_WS_URL " , " ws://scoreboard:8080/api/events " )
OUR_TEAM_ID = int ( os . getenv ( " OUR_TEAM_ID " , " 1 " ) )
ALERT_THRESHOLD_POINTS = float ( os . getenv ( " ALERT_THRESHOLD_POINTS " , " 100 " ) )
ALERT_THRESHOLD_TIME = int ( os . getenv ( " ALERT_THRESHOLD_TIME " , " 300 " ) ) # seconds
TELEGRAM_API_URL = os . getenv ( " TELEGRAM_API_URL " , " http://tg-bot:8003/send " )
# Database pool
db_pool = None
ws_task = None
class AttackStats ( BaseModel ) :
total_attacks : int
attacks_by_us : int
attacks_to_us : int
recent_attacks : int
critical_alerts : int
# Auth dependency
async def verify_token ( authorization : str = Header ( None ) ) :
if not authorization or not authorization . startswith ( " Bearer " ) :
raise HTTPException ( status_code = 401 , detail = " Missing or invalid authorization header " )
token = authorization . replace ( " Bearer " , " " )
if token != SECRET_TOKEN :
raise HTTPException ( status_code = 403 , detail = " Invalid token " )
return token
# Database functions
async def get_db ( ) :
return await db_pool . acquire ( )
async def release_db ( conn ) :
await db_pool . release ( conn )
async def process_attack_event ( event : Dict [ str , Any ] ) :
""" Process attack event from scoreboard """
conn = await db_pool . acquire ( )
try :
# Extract attack information from event
2025-12-03 10:22:19 +03:00
# Handle multiple possible event formats from ForcAD
event_type = event . get ( ' type ' , ' unknown ' )
# Try to extract attacker/victim IDs from various possible fields
attacker_id = event . get ( ' attacker_id ' ) or event . get ( ' team_id ' ) or event . get ( ' attacker ' )
victim_id = event . get ( ' victim_id ' ) or event . get ( ' target_id ' ) or event . get ( ' victim ' ) or event . get ( ' target ' )
# Skip if we don't have both attacker and victim
if attacker_id is None or victim_id is None :
print ( f " Skipping event with missing attacker/victim: { event } " )
return
# Convert to integers if they're strings
try :
attacker_id = int ( attacker_id )
victim_id = int ( victim_id )
except ( ValueError , TypeError ) :
print ( f " Invalid team IDs in event: attacker= { attacker_id } , victim= { victim_id } " )
return
service_name = event . get ( ' service ' ) or event . get ( ' service_name ' ) or event . get ( ' task_name ' ) or ' unknown '
2025-12-02 14:01:34 +03:00
flag = event . get ( ' flag ' , ' ' )
2025-12-03 10:22:19 +03:00
# Handle timestamp
time_str = event . get ( ' time ' ) or event . get ( ' timestamp ' )
if time_str :
try :
# Try parsing ISO format
timestamp = datetime . fromisoformat ( time_str . replace ( ' Z ' , ' +00:00 ' ) )
except ( ValueError , AttributeError ) :
# Try parsing as Unix timestamp
try :
timestamp = datetime . fromtimestamp ( float ( time_str ) )
except ( ValueError , TypeError ) :
timestamp = datetime . utcnow ( )
else :
timestamp = datetime . utcnow ( )
# Extract points (might be in different fields)
points = float ( event . get ( ' points ' , 0 ) or event . get ( ' score ' , 0 ) or 1.0 )
# Generate unique attack ID
round_num = event . get ( ' round ' , event . get ( ' round_id ' , 0 ) )
attack_id = event . get ( ' id ' ) or f " { round_num } _ { attacker_id } _ { victim_id } _ { service_name } _ { int ( timestamp . timestamp ( ) ) } "
2025-12-02 14:01:34 +03:00
is_our_attack = attacker_id == OUR_TEAM_ID
is_attack_to_us = victim_id == OUR_TEAM_ID
2025-12-03 10:22:19 +03:00
# Only log if it involves our team
if is_our_attack or is_attack_to_us :
# Store attack in database
inserted = await conn . fetchval ( """
INSERT INTO attacks ( attack_id , attacker_team_id , victim_team_id , service_name , flag , timestamp , points , is_our_attack , is_attack_to_us )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 , $ 7 , $ 8 , $ 9 )
ON CONFLICT ( attack_id ) DO NOTHING
RETURNING id
""" , attack_id, attacker_id, victim_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us)
if inserted :
print ( f " [ { event_type } ] Logged attack: Team { attacker_id } -> Team { victim_id } | { service_name } | { points } pts " )
# Check for alert conditions if attack is against us
if is_attack_to_us :
await check_and_create_alerts ( conn , attacker_id , service_name )
2025-12-02 14:01:34 +03:00
2025-12-03 10:22:19 +03:00
except Exception as e :
print ( f " Error processing attack event: { e } " )
print ( f " Event data: { event } " )
2025-12-02 14:01:34 +03:00
finally :
await db_pool . release ( conn )
async def check_and_create_alerts ( conn , attacker_id : int , service_name : str ) :
""" Check if we should create an alert for attacks against us """
threshold_time = datetime . utcnow ( ) - timedelta ( seconds = ALERT_THRESHOLD_TIME )
# Check total points lost from this attacker in threshold time
result = await conn . fetchrow ( """
SELECT COUNT ( * ) as attack_count , COALESCE ( SUM ( points ) , 0 ) as total_points
FROM attacks
WHERE is_attack_to_us = true
AND attacker_team_id = $ 1
AND service_name = $ 2
AND timestamp > $ 3
""" , attacker_id, service_name, threshold_time)
if result and result [ ' total_points ' ] > = ALERT_THRESHOLD_POINTS :
# Create alert
alert_message = f " CRITICAL: Team { attacker_id } has stolen { result [ ' total_points ' ] : .2f } points from service { service_name } in the last { ALERT_THRESHOLD_TIME } s ( { result [ ' attack_count ' ] } attacks) "
# Check if we already alerted recently
recent_alert = await conn . fetchrow ( """
SELECT id FROM attack_alerts
WHERE alert_type = ' high_point_loss '
AND message LIKE $ 1
AND created_at > $ 2
""" , f " % Team {attacker_id} % {service_name} % " , threshold_time)
if not recent_alert :
alert_id = await conn . fetchval ( """
INSERT INTO attack_alerts ( attack_id , alert_type , severity , message )
VALUES (
( SELECT id FROM attacks WHERE attacker_team_id = $ 1 AND service_name = $ 2 ORDER BY timestamp DESC LIMIT 1 ) ,
' high_point_loss ' ,
' critical ' ,
$ 3
)
RETURNING id
""" , attacker_id, service_name, alert_message)
# Send to telegram
await send_telegram_alert ( alert_message )
# Mark as notified
await conn . execute ( " UPDATE attack_alerts SET notified = true WHERE id = $1 " , alert_id )
async def send_telegram_alert ( message : str ) :
""" Send alert to telegram bot """
try :
async with aiohttp . ClientSession ( ) as session :
async with session . post (
TELEGRAM_API_URL ,
json = { " message " : message } ,
headers = { " Authorization " : f " Bearer { SECRET_TOKEN } " }
) as resp :
if resp . status != 200 :
print ( f " Failed to send telegram alert: { await resp . text ( ) } " )
except Exception as e :
print ( f " Error sending telegram alert: { e } " )
async def websocket_listener ( ) :
""" Listen to scoreboard WebSocket for events """
2025-12-03 10:22:19 +03:00
reconnect_delay = 5
max_reconnect_delay = 60
2025-12-02 14:01:34 +03:00
while True :
try :
2025-12-03 10:22:19 +03:00
print ( f " Attempting to connect to scoreboard WebSocket: { SCOREBOARD_WS_URL } " )
2025-12-02 14:01:34 +03:00
async with aiohttp . ClientSession ( ) as session :
2025-12-03 10:22:19 +03:00
try :
async with session . ws_connect (
SCOREBOARD_WS_URL ,
timeout = aiohttp . ClientTimeout ( total = 10 )
) as ws :
print ( f " ✓ Connected to scoreboard WebSocket " )
reconnect_delay = 5 # Reset delay on successful connection
async for msg in ws :
if msg . type == aiohttp . WSMsgType . TEXT :
try :
event = json . loads ( msg . data )
event_type = event . get ( ' type ' , ' unknown ' )
# Process attack-related events
# ForcAD can send: 'attack', 'flag_stolen', 'stolen_flag', 'flag_submit'
if event_type in [ ' attack ' , ' flag_stolen ' , ' stolen_flag ' , ' flag_submit ' , ' flag ' ] :
await process_attack_event ( event )
# Optionally log other event types for debugging
# else:
# print(f"Received event type: {event_type}")
except json . JSONDecodeError as e :
print ( f " Failed to decode WebSocket message: { msg . data [ : 200 ] } " )
except Exception as e :
print ( f " Error processing event: { e } " )
elif msg . type == aiohttp . WSMsgType . CLOSED :
print ( " WebSocket connection closed by server " )
break
elif msg . type == aiohttp . WSMsgType . ERROR :
print ( f " WebSocket error: { ws . exception ( ) } " )
break
except aiohttp . ClientError as e :
print ( f " WebSocket client error: { e } " )
except asyncio . TimeoutError :
print ( f " Connection timeout to { SCOREBOARD_WS_URL } " )
2025-12-02 14:01:34 +03:00
except Exception as e :
print ( f " WebSocket connection error: { e } " )
2025-12-03 10:22:19 +03:00
print ( f " Reconnecting in { reconnect_delay } seconds... " )
await asyncio . sleep ( reconnect_delay )
# Exponential backoff
reconnect_delay = min ( reconnect_delay * 1.5 , max_reconnect_delay )
2025-12-02 14:01:34 +03:00
# Lifespan context
@asynccontextmanager
async def lifespan ( app : FastAPI ) :
global db_pool , ws_task
db_pool = await asyncpg . create_pool ( DATABASE_URL , min_size = 2 , max_size = 10 )
# Start WebSocket listener
ws_task = asyncio . create_task ( websocket_listener ( ) )
yield
# Cleanup
if ws_task :
ws_task . cancel ( )
try :
await ws_task
except asyncio . CancelledError :
pass
await db_pool . close ( )
app = FastAPI ( title = " Scoreboard Injector " , lifespan = lifespan )
# API Endpoints
@app.get ( " /health " )
async def health_check ( ) :
return { " status " : " ok " , " timestamp " : datetime . utcnow ( ) . isoformat ( ) }
@app.get ( " /stats " , dependencies = [ Depends ( verify_token ) ] )
async def get_stats ( ) :
""" Get attack statistics """
conn = await get_db ( )
try :
total = await conn . fetchval ( " SELECT COUNT(*) FROM attacks " )
attacks_by_us = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE is_our_attack = true " )
attacks_to_us = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE is_attack_to_us = true " )
threshold_time = datetime . utcnow ( ) - timedelta ( minutes = 5 )
recent = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE timestamp > $1 " , threshold_time )
critical_alerts = await conn . fetchval (
" SELECT COUNT(*) FROM attack_alerts WHERE severity = ' critical ' AND created_at > $1 " ,
threshold_time
)
return {
" total_attacks " : total ,
" attacks_by_us " : attacks_by_us ,
" attacks_to_us " : attacks_to_us ,
" recent_attacks_5min " : recent ,
" critical_alerts_5min " : critical_alerts
}
finally :
await release_db ( conn )
@app.get ( " /attacks " , dependencies = [ Depends ( verify_token ) ] )
async def get_attacks ( limit : int = 100 , our_attacks : Optional [ bool ] = None , attacks_to_us : Optional [ bool ] = None ) :
""" Get recent attacks """
conn = await get_db ( )
try :
query = " SELECT * FROM attacks WHERE 1=1 "
params = [ ]
param_count = 0
if our_attacks is not None :
param_count + = 1
query + = f " AND is_our_attack = $ { param_count } "
params . append ( our_attacks )
if attacks_to_us is not None :
param_count + = 1
query + = f " AND is_attack_to_us = $ { param_count } "
params . append ( attacks_to_us )
param_count + = 1
query + = f " ORDER BY timestamp DESC LIMIT $ { param_count } "
params . append ( limit )
rows = await conn . fetch ( query , * params )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.get ( " /alerts " , dependencies = [ Depends ( verify_token ) ] )
async def get_alerts ( limit : int = 50 , unnotified : bool = False ) :
""" Get alerts """
conn = await get_db ( )
try :
if unnotified :
query = " SELECT * FROM attack_alerts WHERE notified = false ORDER BY created_at DESC LIMIT $1 "
else :
query = " SELECT * FROM attack_alerts ORDER BY created_at DESC LIMIT $1 "
rows = await conn . fetch ( query , limit )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.post ( " /alerts/ {alert_id} /acknowledge " , dependencies = [ Depends ( verify_token ) ] )
async def acknowledge_alert ( alert_id : int ) :
""" Mark alert as acknowledged """
conn = await get_db ( )
try :
await conn . execute ( " UPDATE attack_alerts SET notified = true WHERE id = $1 " , alert_id )
return { " status " : " acknowledged " , " alert_id " : alert_id }
finally :
await release_db ( conn )
@app.get ( " /attacks/by-service " , dependencies = [ Depends ( verify_token ) ] )
async def get_attacks_by_service ( ) :
""" Get attack statistics grouped by service """
conn = await get_db ( )
try :
rows = await conn . fetch ( """
SELECT
service_name ,
COUNT ( * ) as total_attacks ,
COUNT ( * ) FILTER ( WHERE is_our_attack = true ) as our_attacks ,
COUNT ( * ) FILTER ( WHERE is_attack_to_us = true ) as attacks_to_us ,
COALESCE ( SUM ( points ) FILTER ( WHERE is_our_attack = true ) , 0 ) as points_gained ,
COALESCE ( SUM ( points ) FILTER ( WHERE is_attack_to_us = true ) , 0 ) as points_lost
FROM attacks
GROUP BY service_name
ORDER BY total_attacks DESC
""" )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.post ( " /settings/team-id " , dependencies = [ Depends ( verify_token ) ] )
async def set_team_id ( team_id : int ) :
""" Update our team ID """
global OUR_TEAM_ID
OUR_TEAM_ID = team_id
conn = await get_db ( )
try :
await conn . execute (
" INSERT INTO settings (key, value) VALUES ( ' our_team_id ' , $1) ON CONFLICT (key) DO UPDATE SET value = $1 " ,
str ( team_id )
)
return { " team_id " : team_id }
finally :
await release_db ( conn )
2025-12-03 10:22:19 +03:00
@app.get ( " /settings/team-id " , dependencies = [ Depends ( verify_token ) ] )
async def get_team_id ( ) :
""" Get current team ID setting """
return { " team_id " : OUR_TEAM_ID }
@app.post ( " /test/inject-attack " , dependencies = [ Depends ( verify_token ) ] )
async def inject_test_attack ( attacker_id : int , victim_id : int , service : str = " test-service " , points : float = 10.0 ) :
""" Manually inject a test attack event for debugging """
test_event = {
" type " : " attack " ,
" attacker_id " : attacker_id ,
" victim_id " : victim_id ,
" service " : service ,
" flag " : " TEST_FLAG_ " + datetime . utcnow ( ) . isoformat ( ) ,
" points " : points ,
" time " : datetime . utcnow ( ) . isoformat ( ) ,
" round " : 1
}
await process_attack_event ( test_event )
return { " status " : " injected " , " event " : test_event }
2025-12-02 14:01:34 +03:00
if __name__ == " __main__ " :
import uvicorn
uvicorn . run ( app , host = " 0.0.0.0 " , port = 8002 )