diff --git a/neuralink/build/service/security_service.py b/neuralink/build/service/security_service.py index d0dac47..8dbec78 100644 --- a/neuralink/build/service/security_service.py +++ b/neuralink/build/service/security_service.py @@ -5,13 +5,25 @@ logging in, restoring accounts, and managing credentials. """ import hashlib +import hmac import secrets import sqlite3 +import time from dataclasses import dataclass, field -from typing import List, Optional +from typing import Dict, List, Optional, Tuple DEFAULT_USERNAME = "default" +USERNAME_MAX_LENGTH = 64 +PASSWORD_MAX_LENGTH = 128 +IMPLANT_NAME_MAX_LENGTH = 64 +IMPLANT_INFO_MAX_LENGTH = 256 +SECURITY_CODE_LENGTH = 6 +PASSWORD_SALT_BYTES = 16 +PBKDF2_ITERATIONS = 200_000 +RESTORE_ATTEMPT_LIMIT = 5 +RESTORE_WINDOW_SECONDS = 60 +RESTORE_LOCK_SECONDS = 30 @dataclass @@ -44,6 +56,7 @@ class SecurityService: """ ) self._conn.commit() + self._restore_attempts: Dict[str, Dict[str, float]] = {} # Utility helpers ----------------------------------------------------- @staticmethod @@ -65,17 +78,56 @@ class SecurityService: @staticmethod def generate_code() -> str: - return "".join(SecurityService._generate_random_digit() for _ in range(3)) + return "".join( + SecurityService._generate_random_digit() for _ in range(SECURITY_CODE_LENGTH) + ) @staticmethod def hash_password(password: str) -> str: - digest = hashlib.sha256(password.encode("utf-8")).hexdigest() - return digest + salt = secrets.token_bytes(PASSWORD_SALT_BYTES) + digest = hashlib.pbkdf2_hmac( + "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS + ) + return f"{salt.hex()}${digest.hex()}" + + @staticmethod + def _legacy_sha256(password: str) -> str: + return hashlib.sha256(password.encode("utf-8")).hexdigest() + + @staticmethod + def is_modern_hash(value: str) -> bool: + return "$" in value and len(value.split("$", 1)[0]) == PASSWORD_SALT_BYTES * 2 + + @staticmethod + def verify_password(stored_value: str, password: str) -> bool: + if SecurityService.is_modern_hash(stored_value): + salt_hex, hash_hex = stored_value.split("$", 1) + salt = bytes.fromhex(salt_hex) + candidate = hashlib.pbkdf2_hmac( + "sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS + ) + return hmac.compare_digest(candidate.hex(), hash_hex) + + legacy_sha = SecurityService._legacy_sha256(password) + if hmac.compare_digest(stored_value, legacy_sha): + return True + return hmac.compare_digest(stored_value, password) @staticmethod def print_system_data(message: str) -> None: print(message) + @staticmethod + def _validate_length(value: str, maximum: int, field_name: str) -> Optional[str]: + trimmed = value.strip() + if not trimmed: + print(f"{field_name} cannot be empty.") + return None + if len(trimmed) > maximum: + print(f"{field_name} is too long (max {maximum} characters).") + return None + return trimmed + # Database operations ------------------------------------------------- def _user_exists(self, username: str) -> bool: row = self._conn.execute( @@ -95,13 +147,24 @@ class SecurityService: return False return True + def _set_password_hash(self, username: str, password_hash: str) -> None: + self._conn.execute( + "UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username) + ) + self._conn.commit() + def _check_user(self, username: str, password: str) -> bool: - password_hash = self.hash_password(password) row = self._conn.execute( - "SELECT 1 FROM users WHERE username = ? AND password_hash = ?", - (username, password_hash), + "SELECT password_hash FROM users WHERE username = ?", (username,) ).fetchone() - return row is not None + if row is None: + return False + stored_hash = row[0] + if self.verify_password(stored_hash, password): + if not self.is_modern_hash(stored_hash): + self._set_password_hash(username, self.hash_password(password)) + return True + return False def _check_restore_user(self, username: str, code: str) -> bool: row = self._conn.execute( @@ -162,11 +225,19 @@ class SecurityService: # User flows ---------------------------------------------------------- def register_user(self) -> None: - raw_username = input("\nEnter username: ").strip() - password = input("Enter password: ").strip() + raw_username = input("\nEnter username: ") + username = self._validate_length(raw_username, USERNAME_MAX_LENGTH, "Username") + if username is None: + return + password_input = input("Enter password: ") + password = self._validate_length( + password_input, PASSWORD_MAX_LENGTH, "Password" + ) + if password is None: + return self.print_system_data("Creating new account. Please wait...") - candidate_username = self.make_unique_username(raw_username) + candidate_username = self.make_unique_username(username) security_code = self.generate_code() if self._user_exists(candidate_username): @@ -183,8 +254,18 @@ class SecurityService: self.print_system_data("Failed to create user.") def login_user(self) -> None: - username = input("\nEnter username: ").strip() - password = input("Enter password: ").strip() + username_input = input("\nEnter username: ") + username = self._validate_length( + username_input, USERNAME_MAX_LENGTH, "Username" + ) + if username is None: + return + password_input = input("Enter password: ") + password = self._validate_length( + password_input, PASSWORD_MAX_LENGTH, "Password" + ) + if password is None: + return self.print_system_data("Trying to log in...") if self._check_user(username, password): @@ -194,14 +275,28 @@ class SecurityService: self.print_system_data("Failed to log in.") def restore_user(self) -> None: - username = input("\nEnter username: ").strip() + username_input = input("\nEnter username: ") + username = self._validate_length( + username_input, USERNAME_MAX_LENGTH, "Username" + ) + if username is None: + return + allowed, wait_time = self._can_attempt_restore(username) + if not allowed: + self.print_system_data( + f"Too many attempts. Try again in {int(wait_time) + 1} seconds." + ) + return + code = input("Enter security code: ").strip() self.print_system_data("Trying to find user...") if not self._check_restore_user(username, code): + self._record_failed_restore(username) self.print_system_data("Failed to find user.") return + self._clear_restore_attempts(username) self.print_system_data("Successfully found user.") new_password = self.make_new_password() if self._change_password(username, new_password): @@ -220,9 +315,11 @@ class SecurityService: self.print_system_data("You need to log in first.") return - new_password = input("Enter a new password: ").strip() - if not new_password: - self.print_system_data("Password cannot be empty.") + new_pass_input = input("Enter a new password: ") + new_password = self._validate_length( + new_pass_input, PASSWORD_MAX_LENGTH, "Password" + ) + if new_password is None: return if self._change_password(self.username, new_password): @@ -246,9 +343,18 @@ class SecurityService: self.print_system_data("You need to log in first.") return - name = input("\nEnter implant name: ").strip() - info = input("\nEnter implant info: ").strip() - if not name or not info: + name_input = input("\nEnter implant name: ") + name = self._validate_length( + name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" + ) + if name is None: + self.print_system_data("Invalid option selected.") + return + info_input = input("\nEnter implant info: ") + info = self._validate_length( + info_input, IMPLANT_INFO_MAX_LENGTH, "Implant info" + ) + if info is None: self.print_system_data("Invalid option selected.") return @@ -271,8 +377,11 @@ class SecurityService: for idx, implant in enumerate(implants, start=1): print(f"{idx}. {implant}") - name = input("\nEnter implant name: ").strip() - if not name: + name_input = input("\nEnter implant name: ") + name = self._validate_length( + name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" + ) + if name is None: self.print_system_data("Invalid option selected.") return @@ -300,8 +409,11 @@ class SecurityService: for idx, implant in enumerate(implants, start=1): print(f"{idx}. {implant}") - name = input("\nEnter implant name: ").strip() - if not name: + name_input = input("\nEnter implant name: ") + name = self._validate_length( + name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name" + ) + if name is None: self.print_system_data("Invalid option selected.") return @@ -394,6 +506,37 @@ class SecurityService: else: self.print_system_data("Invalid option selected.") + # Rate limiting ------------------------------------------------------- + def _can_attempt_restore(self, username: str) -> Tuple[bool, float]: + now = time.time() + stats = self._restore_attempts.get(username) + if not stats: + return True, 0.0 + locked_until = stats.get("locked_until", 0.0) + if locked_until > now: + return False, locked_until - now + window_start = stats.get("window_start", now) + if now - window_start > RESTORE_WINDOW_SECONDS: + self._restore_attempts.pop(username, None) + return True, 0.0 + return True, 0.0 + + def _record_failed_restore(self, username: str) -> None: + now = time.time() + stats = self._restore_attempts.get(username) + if not stats or now - stats.get("window_start", now) > RESTORE_WINDOW_SECONDS: + stats = {"count": 0, "window_start": now, "locked_until": 0.0} + stats["count"] += 1 + stats["window_start"] = stats.get("window_start", now) + if stats["count"] >= RESTORE_ATTEMPT_LIMIT: + stats["locked_until"] = now + RESTORE_LOCK_SECONDS + stats["count"] = 0 + stats["window_start"] = stats["locked_until"] + self._restore_attempts[username] = stats + + def _clear_restore_attempts(self, username: str) -> None: + self._restore_attempts.pop(username, None) + def main() -> None: service = SecurityService()