2025-12-02 14:01:34 +03:00
"""
Scoreboard Injector for ForcAD
2025-12-03 11:03:37 +03:00
Monitors Socket . IO events for attacks and alerts on critical situations
2025-12-02 14:01:34 +03:00
"""
import os
import json
import asyncio
from datetime import datetime , timedelta
from typing import Optional , Dict , Any
2025-12-03 10:56:33 +03:00
import socketio
2025-12-02 14:01:34 +03:00
from fastapi import FastAPI , HTTPException , Depends , Header
from pydantic import BaseModel
import asyncpg
from contextlib import asynccontextmanager
# Configuration
DATABASE_URL = os . getenv ( " DATABASE_URL " , " postgresql://adctrl:adctrl@postgres:5432/adctrl " )
SECRET_TOKEN = os . getenv ( " SECRET_TOKEN " , " change-me-in-production " )
2025-12-03 11:03:37 +03:00
SCOREBOARD_URL = os . getenv ( " SCOREBOARD_URL " , " http://10.60.0.1:8080 " )
2025-12-02 14:01:34 +03:00
OUR_TEAM_ID = int ( os . getenv ( " OUR_TEAM_ID " , " 1 " ) )
ALERT_THRESHOLD_POINTS = float ( os . getenv ( " ALERT_THRESHOLD_POINTS " , " 100 " ) )
ALERT_THRESHOLD_TIME = int ( os . getenv ( " ALERT_THRESHOLD_TIME " , " 300 " ) ) # seconds
TELEGRAM_API_URL = os . getenv ( " TELEGRAM_API_URL " , " http://tg-bot:8003/send " )
# Database pool
db_pool = None
ws_task = None
class AttackStats ( BaseModel ) :
total_attacks : int
attacks_by_us : int
attacks_to_us : int
recent_attacks : int
critical_alerts : int
# Auth dependency
async def verify_token ( authorization : str = Header ( None ) ) :
if not authorization or not authorization . startswith ( " Bearer " ) :
raise HTTPException ( status_code = 401 , detail = " Missing or invalid authorization header " )
token = authorization . replace ( " Bearer " , " " )
if token != SECRET_TOKEN :
raise HTTPException ( status_code = 403 , detail = " Invalid token " )
return token
# Database functions
async def get_db ( ) :
return await db_pool . acquire ( )
async def release_db ( conn ) :
await db_pool . release ( conn )
async def process_attack_event ( event : Dict [ str , Any ] ) :
""" Process attack event from scoreboard """
conn = await db_pool . acquire ( )
try :
# Extract attack information from event
2025-12-03 10:22:19 +03:00
# Handle multiple possible event formats from ForcAD
event_type = event . get ( ' type ' , ' unknown ' )
# Try to extract attacker/victim IDs from various possible fields
attacker_id = event . get ( ' attacker_id ' ) or event . get ( ' team_id ' ) or event . get ( ' attacker ' )
victim_id = event . get ( ' victim_id ' ) or event . get ( ' target_id ' ) or event . get ( ' victim ' ) or event . get ( ' target ' )
# Skip if we don't have both attacker and victim
if attacker_id is None or victim_id is None :
print ( f " Skipping event with missing attacker/victim: { event } " )
return
# Convert to integers if they're strings
try :
attacker_id = int ( attacker_id )
victim_id = int ( victim_id )
except ( ValueError , TypeError ) :
print ( f " Invalid team IDs in event: attacker= { attacker_id } , victim= { victim_id } " )
return
service_name = event . get ( ' service ' ) or event . get ( ' service_name ' ) or event . get ( ' task_name ' ) or ' unknown '
2025-12-02 14:01:34 +03:00
flag = event . get ( ' flag ' , ' ' )
2025-12-03 10:22:19 +03:00
# Handle timestamp
time_str = event . get ( ' time ' ) or event . get ( ' timestamp ' )
if time_str :
try :
# Try parsing ISO format
timestamp = datetime . fromisoformat ( time_str . replace ( ' Z ' , ' +00:00 ' ) )
except ( ValueError , AttributeError ) :
# Try parsing as Unix timestamp
try :
timestamp = datetime . fromtimestamp ( float ( time_str ) )
except ( ValueError , TypeError ) :
timestamp = datetime . utcnow ( )
else :
timestamp = datetime . utcnow ( )
# Extract points (might be in different fields)
points = float ( event . get ( ' points ' , 0 ) or event . get ( ' score ' , 0 ) or 1.0 )
# Generate unique attack ID
round_num = event . get ( ' round ' , event . get ( ' round_id ' , 0 ) )
attack_id = event . get ( ' id ' ) or f " { round_num } _ { attacker_id } _ { victim_id } _ { service_name } _ { int ( timestamp . timestamp ( ) ) } "
2025-12-02 14:01:34 +03:00
is_our_attack = attacker_id == OUR_TEAM_ID
is_attack_to_us = victim_id == OUR_TEAM_ID
2025-12-03 10:22:19 +03:00
# Only log if it involves our team
if is_our_attack or is_attack_to_us :
# Store attack in database
inserted = await conn . fetchval ( """
INSERT INTO attacks ( attack_id , attacker_team_id , victim_team_id , service_name , flag , timestamp , points , is_our_attack , is_attack_to_us )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 , $ 7 , $ 8 , $ 9 )
ON CONFLICT ( attack_id ) DO NOTHING
RETURNING id
""" , attack_id, attacker_id, victim_id, service_name, flag, timestamp, points, is_our_attack, is_attack_to_us)
if inserted :
print ( f " [ { event_type } ] Logged attack: Team { attacker_id } -> Team { victim_id } | { service_name } | { points } pts " )
# Check for alert conditions if attack is against us
if is_attack_to_us :
await check_and_create_alerts ( conn , attacker_id , service_name )
2025-12-02 14:01:34 +03:00
2025-12-03 10:22:19 +03:00
except Exception as e :
print ( f " Error processing attack event: { e } " )
print ( f " Event data: { event } " )
2025-12-02 14:01:34 +03:00
finally :
await db_pool . release ( conn )
async def check_and_create_alerts ( conn , attacker_id : int , service_name : str ) :
""" Check if we should create an alert for attacks against us """
threshold_time = datetime . utcnow ( ) - timedelta ( seconds = ALERT_THRESHOLD_TIME )
# Check total points lost from this attacker in threshold time
result = await conn . fetchrow ( """
SELECT COUNT ( * ) as attack_count , COALESCE ( SUM ( points ) , 0 ) as total_points
FROM attacks
WHERE is_attack_to_us = true
AND attacker_team_id = $ 1
AND service_name = $ 2
AND timestamp > $ 3
""" , attacker_id, service_name, threshold_time)
if result and result [ ' total_points ' ] > = ALERT_THRESHOLD_POINTS :
# Create alert
alert_message = f " CRITICAL: Team { attacker_id } has stolen { result [ ' total_points ' ] : .2f } points from service { service_name } in the last { ALERT_THRESHOLD_TIME } s ( { result [ ' attack_count ' ] } attacks) "
# Check if we already alerted recently
recent_alert = await conn . fetchrow ( """
SELECT id FROM attack_alerts
WHERE alert_type = ' high_point_loss '
AND message LIKE $ 1
AND created_at > $ 2
""" , f " % Team {attacker_id} % {service_name} % " , threshold_time)
if not recent_alert :
alert_id = await conn . fetchval ( """
INSERT INTO attack_alerts ( attack_id , alert_type , severity , message )
VALUES (
( SELECT id FROM attacks WHERE attacker_team_id = $ 1 AND service_name = $ 2 ORDER BY timestamp DESC LIMIT 1 ) ,
' high_point_loss ' ,
' critical ' ,
$ 3
)
RETURNING id
""" , attacker_id, service_name, alert_message)
# Send to telegram
await send_telegram_alert ( alert_message )
# Mark as notified
await conn . execute ( " UPDATE attack_alerts SET notified = true WHERE id = $1 " , alert_id )
async def send_telegram_alert ( message : str ) :
""" Send alert to telegram bot """
2025-12-03 11:03:37 +03:00
import aiohttp
2025-12-02 14:01:34 +03:00
try :
async with aiohttp . ClientSession ( ) as session :
async with session . post (
TELEGRAM_API_URL ,
json = { " message " : message } ,
headers = { " Authorization " : f " Bearer { SECRET_TOKEN } " }
) as resp :
if resp . status != 200 :
print ( f " Failed to send telegram alert: { await resp . text ( ) } " )
except Exception as e :
print ( f " Error sending telegram alert: { e } " )
2025-12-03 10:56:33 +03:00
async def socketio_listener ( ) :
""" Listen to ForcAD scoreboard using Socket.IO """
sio = socketio . AsyncClient ( logger = False , engineio_logger = False )
@sio.event ( namespace = ' /game_events ' )
async def update_scoreboard ( data ) :
""" Handle scoreboard update events """
try :
event_data = data . get ( ' data ' , { } )
round_num = event_data . get ( ' round ' , 0 )
team_tasks = event_data . get ( ' team_tasks ' , [ ] )
print ( f " 📊 Round { round_num } - Processing { len ( team_tasks ) } team updates " )
for team_task in team_tasks :
team_id = team_task . get ( ' team_id ' )
task_id = team_task . get ( ' task_id ' )
stolen = team_task . get ( ' stolen ' , 0 )
lost = team_task . get ( ' lost ' , 0 )
if team_id == OUR_TEAM_ID and ( stolen > 0 or lost > 0 ) :
print ( f " Team { team_id } Task { task_id } : stolen= { stolen } , lost= { lost } " )
except Exception as e :
print ( f " Error processing update_scoreboard: { e } " )
@sio.event ( namespace = ' /game_events ' )
async def init_scoreboard ( data ) :
""" Handle initial scoreboard data """
try :
print ( " 📡 Received initial scoreboard data " )
event_data = data . get ( ' data ' , { } )
teams = event_data . get ( ' teams ' , [ ] )
tasks = event_data . get ( ' tasks ' , [ ] )
2025-12-03 10:59:25 +03:00
team_names = ' , ' . join ( [ f " { t . get ( ' name ' ) } (ID: { t . get ( ' id ' ) } ) " for t in teams ] )
task_names = ' , ' . join ( [ t . get ( ' name ' ) for t in tasks ] )
print ( f " Teams: { team_names } " )
print ( f " Tasks: { task_names } " )
2025-12-03 10:56:33 +03:00
except Exception as e :
print ( f " Error processing init_scoreboard: { e } " )
@sio.event
async def connect ( ) :
print ( " ✅ Connected to ForcAD scoreboard Socket.IO " )
@sio.event
async def disconnect ( ) :
print ( " ❌ Disconnected from scoreboard " )
while True :
try :
print ( f " Connecting to { SCOREBOARD_URL } /socket.io ... " )
await sio . connect (
SCOREBOARD_URL ,
namespaces = [ ' /game_events ' ] ,
transports = [ ' websocket ' ]
)
await sio . wait ( )
except Exception as e :
print ( f " Socket.IO error: { e } " )
print ( " Reconnecting in 5 seconds... " )
await asyncio . sleep ( 5 )
2025-12-02 14:01:34 +03:00
# Lifespan context
@asynccontextmanager
async def lifespan ( app : FastAPI ) :
global db_pool , ws_task
db_pool = await asyncpg . create_pool ( DATABASE_URL , min_size = 2 , max_size = 10 )
2025-12-03 11:03:37 +03:00
print ( f " Starting Socket.IO listener " )
print ( f " Scoreboard URL: { SCOREBOARD_URL } " )
2025-12-03 10:56:33 +03:00
print ( f " Our team ID: { OUR_TEAM_ID } " )
2025-12-03 11:03:37 +03:00
ws_task = asyncio . create_task ( socketio_listener ( ) )
2025-12-02 14:01:34 +03:00
yield
# Cleanup
if ws_task :
ws_task . cancel ( )
try :
await ws_task
except asyncio . CancelledError :
pass
await db_pool . close ( )
app = FastAPI ( title = " Scoreboard Injector " , lifespan = lifespan )
# API Endpoints
@app.get ( " /health " )
async def health_check ( ) :
2025-12-03 10:31:37 +03:00
return {
" status " : " ok " ,
" timestamp " : datetime . utcnow ( ) . isoformat ( ) ,
" team_id " : OUR_TEAM_ID ,
2025-12-03 11:03:37 +03:00
" mode " : " socketio " ,
" scoreboard_url " : SCOREBOARD_URL
2025-12-03 10:31:37 +03:00
}
2025-12-02 14:01:34 +03:00
@app.get ( " /stats " , dependencies = [ Depends ( verify_token ) ] )
async def get_stats ( ) :
""" Get attack statistics """
conn = await get_db ( )
try :
total = await conn . fetchval ( " SELECT COUNT(*) FROM attacks " )
attacks_by_us = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE is_our_attack = true " )
attacks_to_us = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE is_attack_to_us = true " )
threshold_time = datetime . utcnow ( ) - timedelta ( minutes = 5 )
recent = await conn . fetchval ( " SELECT COUNT(*) FROM attacks WHERE timestamp > $1 " , threshold_time )
critical_alerts = await conn . fetchval (
" SELECT COUNT(*) FROM attack_alerts WHERE severity = ' critical ' AND created_at > $1 " ,
threshold_time
)
return {
" total_attacks " : total ,
" attacks_by_us " : attacks_by_us ,
" attacks_to_us " : attacks_to_us ,
" recent_attacks_5min " : recent ,
" critical_alerts_5min " : critical_alerts
}
finally :
await release_db ( conn )
@app.get ( " /attacks " , dependencies = [ Depends ( verify_token ) ] )
async def get_attacks ( limit : int = 100 , our_attacks : Optional [ bool ] = None , attacks_to_us : Optional [ bool ] = None ) :
""" Get recent attacks """
conn = await get_db ( )
try :
query = " SELECT * FROM attacks WHERE 1=1 "
params = [ ]
param_count = 0
if our_attacks is not None :
param_count + = 1
query + = f " AND is_our_attack = $ { param_count } "
params . append ( our_attacks )
if attacks_to_us is not None :
param_count + = 1
query + = f " AND is_attack_to_us = $ { param_count } "
params . append ( attacks_to_us )
param_count + = 1
query + = f " ORDER BY timestamp DESC LIMIT $ { param_count } "
params . append ( limit )
rows = await conn . fetch ( query , * params )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.get ( " /alerts " , dependencies = [ Depends ( verify_token ) ] )
async def get_alerts ( limit : int = 50 , unnotified : bool = False ) :
""" Get alerts """
conn = await get_db ( )
try :
if unnotified :
query = " SELECT * FROM attack_alerts WHERE notified = false ORDER BY created_at DESC LIMIT $1 "
else :
query = " SELECT * FROM attack_alerts ORDER BY created_at DESC LIMIT $1 "
rows = await conn . fetch ( query , limit )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.post ( " /alerts/ {alert_id} /acknowledge " , dependencies = [ Depends ( verify_token ) ] )
async def acknowledge_alert ( alert_id : int ) :
""" Mark alert as acknowledged """
conn = await get_db ( )
try :
await conn . execute ( " UPDATE attack_alerts SET notified = true WHERE id = $1 " , alert_id )
return { " status " : " acknowledged " , " alert_id " : alert_id }
finally :
await release_db ( conn )
@app.get ( " /attacks/by-service " , dependencies = [ Depends ( verify_token ) ] )
async def get_attacks_by_service ( ) :
""" Get attack statistics grouped by service """
conn = await get_db ( )
try :
rows = await conn . fetch ( """
SELECT
service_name ,
COUNT ( * ) as total_attacks ,
COUNT ( * ) FILTER ( WHERE is_our_attack = true ) as our_attacks ,
COUNT ( * ) FILTER ( WHERE is_attack_to_us = true ) as attacks_to_us ,
COALESCE ( SUM ( points ) FILTER ( WHERE is_our_attack = true ) , 0 ) as points_gained ,
COALESCE ( SUM ( points ) FILTER ( WHERE is_attack_to_us = true ) , 0 ) as points_lost
FROM attacks
GROUP BY service_name
ORDER BY total_attacks DESC
""" )
return [ dict ( row ) for row in rows ]
finally :
await release_db ( conn )
@app.post ( " /settings/team-id " , dependencies = [ Depends ( verify_token ) ] )
async def set_team_id ( team_id : int ) :
""" Update our team ID """
global OUR_TEAM_ID
OUR_TEAM_ID = team_id
conn = await get_db ( )
try :
await conn . execute (
" INSERT INTO settings (key, value) VALUES ( ' our_team_id ' , $1) ON CONFLICT (key) DO UPDATE SET value = $1 " ,
str ( team_id )
)
return { " team_id " : team_id }
finally :
await release_db ( conn )
2025-12-03 10:22:19 +03:00
@app.get ( " /settings/team-id " , dependencies = [ Depends ( verify_token ) ] )
async def get_team_id ( ) :
""" Get current team ID setting """
return { " team_id " : OUR_TEAM_ID }
@app.post ( " /test/inject-attack " , dependencies = [ Depends ( verify_token ) ] )
async def inject_test_attack ( attacker_id : int , victim_id : int , service : str = " test-service " , points : float = 10.0 ) :
""" Manually inject a test attack event for debugging """
test_event = {
" type " : " attack " ,
" attacker_id " : attacker_id ,
" victim_id " : victim_id ,
" service " : service ,
" flag " : " TEST_FLAG_ " + datetime . utcnow ( ) . isoformat ( ) ,
" points " : points ,
" time " : datetime . utcnow ( ) . isoformat ( ) ,
" round " : 1
}
await process_attack_event ( test_event )
return { " status " : " injected " , " event " : test_event }
2025-12-03 10:31:37 +03:00
@app.get ( " /debug/scoreboard " , dependencies = [ Depends ( verify_token ) ] )
async def debug_scoreboard ( ) :
2025-12-03 11:03:37 +03:00
""" Check if scoreboard is reachable and show connection info """
import aiohttp
2025-12-03 10:31:37 +03:00
results = {
2025-12-03 11:03:37 +03:00
" mode " : " socketio " ,
" config " : {
" scoreboard_url " : SCOREBOARD_URL ,
" our_team_id " : OUR_TEAM_ID
} ,
2025-12-03 10:31:37 +03:00
" endpoints_tested " : [ ]
}
try :
async with aiohttp . ClientSession ( ) as session :
2025-12-03 11:03:37 +03:00
# Test Socket.IO endpoint
socketio_url = f " { SCOREBOARD_URL } /socket.io/?EIO=4&transport=polling "
2025-12-03 10:31:37 +03:00
try :
2025-12-03 11:03:37 +03:00
async with session . get ( socketio_url , timeout = aiohttp . ClientTimeout ( total = 5 ) ) as resp :
results [ " socketio_status " ] = {
" url " : socketio_url ,
" status " : resp . status ,
" reachable " : resp . status == 200 ,
" response_preview " : ( await resp . text ( ) ) [ : 200 ] if resp . status == 200 else None
}
2025-12-03 10:31:37 +03:00
except Exception as e :
2025-12-03 11:03:37 +03:00
results [ " socketio_status " ] = {
" url " : socketio_url ,
" reachable " : False ,
" error " : str ( e )
}
2025-12-03 10:31:37 +03:00
2025-12-03 11:03:37 +03:00
# Test base scoreboard URL
try :
async with session . get ( SCOREBOARD_URL , timeout = aiohttp . ClientTimeout ( total = 5 ) ) as resp :
results [ " base_url_status " ] = {
" url " : SCOREBOARD_URL ,
" status " : resp . status ,
" reachable " : resp . status == 200
}
except Exception as e :
results [ " base_url_status " ] = {
" url " : SCOREBOARD_URL ,
" reachable " : False ,
" error " : str ( e )
}
2025-12-03 10:31:37 +03:00
2025-12-03 11:03:37 +03:00
# Test attack_data endpoint (for reference only)
attack_data_url = f " { SCOREBOARD_URL } /api/client/attack_data "
try :
async with session . get ( attack_data_url , timeout = aiohttp . ClientTimeout ( total = 5 ) ) as resp :
result = {
" url " : attack_data_url ,
" status " : resp . status ,
" reachable " : resp . status == 200 ,
" content_type " : resp . headers . get ( ' Content-Type ' , ' ' ) ,
" note " : " Contains exploit credentials, not attack events "
}
if resp . status == 200 and ' application/json ' in resp . headers . get ( ' Content-Type ' , ' ' ) :
data = await resp . json ( )
result [ " services " ] = list ( data . keys ( ) ) if isinstance ( data , dict ) else None
results [ " endpoints_tested " ] . append ( result )
except Exception as e :
results [ " endpoints_tested " ] . append ( {
" url " : attack_data_url ,
" reachable " : False ,
" error " : str ( e )
} )
2025-12-03 10:31:37 +03:00
except Exception as e :
results [ " error " ] = str ( e )
return results
2025-12-02 14:01:34 +03:00
if __name__ == " __main__ " :
import uvicorn
uvicorn . run ( app , host = " 0.0.0.0 " , port = 8002 )