This commit is contained in:
pwn
2025-12-14 13:09:18 +03:00
parent 1c6cd17e63
commit 0520f7182f

View File

@@ -5,13 +5,25 @@ logging in, restoring accounts, and managing credentials.
"""
import hashlib
import hmac
import secrets
import sqlite3
import time
from dataclasses import dataclass, field
from typing import List, Optional
from typing import Dict, List, Optional, Tuple
DEFAULT_USERNAME = "default"
USERNAME_MAX_LENGTH = 64
PASSWORD_MAX_LENGTH = 128
IMPLANT_NAME_MAX_LENGTH = 64
IMPLANT_INFO_MAX_LENGTH = 256
SECURITY_CODE_LENGTH = 6
PASSWORD_SALT_BYTES = 16
PBKDF2_ITERATIONS = 200_000
RESTORE_ATTEMPT_LIMIT = 5
RESTORE_WINDOW_SECONDS = 60
RESTORE_LOCK_SECONDS = 30
@dataclass
@@ -44,6 +56,7 @@ class SecurityService:
"""
)
self._conn.commit()
self._restore_attempts: Dict[str, Dict[str, float]] = {}
# Utility helpers -----------------------------------------------------
@staticmethod
@@ -65,17 +78,56 @@ class SecurityService:
@staticmethod
def generate_code() -> str:
return "".join(SecurityService._generate_random_digit() for _ in range(3))
return "".join(
SecurityService._generate_random_digit() for _ in range(SECURITY_CODE_LENGTH)
)
@staticmethod
def hash_password(password: str) -> str:
digest = hashlib.sha256(password.encode("utf-8")).hexdigest()
return digest
salt = secrets.token_bytes(PASSWORD_SALT_BYTES)
digest = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return f"{salt.hex()}${digest.hex()}"
@staticmethod
def _legacy_sha256(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
@staticmethod
def is_modern_hash(value: str) -> bool:
return "$" in value and len(value.split("$", 1)[0]) == PASSWORD_SALT_BYTES * 2
@staticmethod
def verify_password(stored_value: str, password: str) -> bool:
if SecurityService.is_modern_hash(stored_value):
salt_hex, hash_hex = stored_value.split("$", 1)
salt = bytes.fromhex(salt_hex)
candidate = hashlib.pbkdf2_hmac(
"sha256", password.encode("utf-8"), salt, PBKDF2_ITERATIONS
)
return hmac.compare_digest(candidate.hex(), hash_hex)
legacy_sha = SecurityService._legacy_sha256(password)
if hmac.compare_digest(stored_value, legacy_sha):
return True
return hmac.compare_digest(stored_value, password)
@staticmethod
def print_system_data(message: str) -> None:
print(message)
@staticmethod
def _validate_length(value: str, maximum: int, field_name: str) -> Optional[str]:
trimmed = value.strip()
if not trimmed:
print(f"{field_name} cannot be empty.")
return None
if len(trimmed) > maximum:
print(f"{field_name} is too long (max {maximum} characters).")
return None
return trimmed
# Database operations -------------------------------------------------
def _user_exists(self, username: str) -> bool:
row = self._conn.execute(
@@ -95,13 +147,24 @@ class SecurityService:
return False
return True
def _set_password_hash(self, username: str, password_hash: str) -> None:
self._conn.execute(
"UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username)
)
self._conn.commit()
def _check_user(self, username: str, password: str) -> bool:
password_hash = self.hash_password(password)
row = self._conn.execute(
"SELECT 1 FROM users WHERE username = ? AND password_hash = ?",
(username, password_hash),
"SELECT password_hash FROM users WHERE username = ?", (username,)
).fetchone()
return row is not None
if row is None:
return False
stored_hash = row[0]
if self.verify_password(stored_hash, password):
if not self.is_modern_hash(stored_hash):
self._set_password_hash(username, self.hash_password(password))
return True
return False
def _check_restore_user(self, username: str, code: str) -> bool:
row = self._conn.execute(
@@ -162,11 +225,19 @@ class SecurityService:
# User flows ----------------------------------------------------------
def register_user(self) -> None:
raw_username = input("\nEnter username: ").strip()
password = input("Enter password: ").strip()
raw_username = input("\nEnter username: ")
username = self._validate_length(raw_username, USERNAME_MAX_LENGTH, "Username")
if username is None:
return
password_input = input("Enter password: ")
password = self._validate_length(
password_input, PASSWORD_MAX_LENGTH, "Password"
)
if password is None:
return
self.print_system_data("Creating new account. Please wait...")
candidate_username = self.make_unique_username(raw_username)
candidate_username = self.make_unique_username(username)
security_code = self.generate_code()
if self._user_exists(candidate_username):
@@ -183,8 +254,18 @@ class SecurityService:
self.print_system_data("Failed to create user.")
def login_user(self) -> None:
username = input("\nEnter username: ").strip()
password = input("Enter password: ").strip()
username_input = input("\nEnter username: ")
username = self._validate_length(
username_input, USERNAME_MAX_LENGTH, "Username"
)
if username is None:
return
password_input = input("Enter password: ")
password = self._validate_length(
password_input, PASSWORD_MAX_LENGTH, "Password"
)
if password is None:
return
self.print_system_data("Trying to log in...")
if self._check_user(username, password):
@@ -194,14 +275,28 @@ class SecurityService:
self.print_system_data("Failed to log in.")
def restore_user(self) -> None:
username = input("\nEnter username: ").strip()
username_input = input("\nEnter username: ")
username = self._validate_length(
username_input, USERNAME_MAX_LENGTH, "Username"
)
if username is None:
return
allowed, wait_time = self._can_attempt_restore(username)
if not allowed:
self.print_system_data(
f"Too many attempts. Try again in {int(wait_time) + 1} seconds."
)
return
code = input("Enter security code: ").strip()
self.print_system_data("Trying to find user...")
if not self._check_restore_user(username, code):
self._record_failed_restore(username)
self.print_system_data("Failed to find user.")
return
self._clear_restore_attempts(username)
self.print_system_data("Successfully found user.")
new_password = self.make_new_password()
if self._change_password(username, new_password):
@@ -220,9 +315,11 @@ class SecurityService:
self.print_system_data("You need to log in first.")
return
new_password = input("Enter a new password: ").strip()
if not new_password:
self.print_system_data("Password cannot be empty.")
new_pass_input = input("Enter a new password: ")
new_password = self._validate_length(
new_pass_input, PASSWORD_MAX_LENGTH, "Password"
)
if new_password is None:
return
if self._change_password(self.username, new_password):
@@ -246,9 +343,18 @@ class SecurityService:
self.print_system_data("You need to log in first.")
return
name = input("\nEnter implant name: ").strip()
info = input("\nEnter implant info: ").strip()
if not name or not info:
name_input = input("\nEnter implant name: ")
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
info_input = input("\nEnter implant info: ")
info = self._validate_length(
info_input, IMPLANT_INFO_MAX_LENGTH, "Implant info"
)
if info is None:
self.print_system_data("Invalid option selected.")
return
@@ -271,8 +377,11 @@ class SecurityService:
for idx, implant in enumerate(implants, start=1):
print(f"{idx}. {implant}")
name = input("\nEnter implant name: ").strip()
if not name:
name_input = input("\nEnter implant name: ")
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
@@ -300,8 +409,11 @@ class SecurityService:
for idx, implant in enumerate(implants, start=1):
print(f"{idx}. {implant}")
name = input("\nEnter implant name: ").strip()
if not name:
name_input = input("\nEnter implant name: ")
name = self._validate_length(
name_input, IMPLANT_NAME_MAX_LENGTH, "Implant name"
)
if name is None:
self.print_system_data("Invalid option selected.")
return
@@ -394,6 +506,37 @@ class SecurityService:
else:
self.print_system_data("Invalid option selected.")
# Rate limiting -------------------------------------------------------
def _can_attempt_restore(self, username: str) -> Tuple[bool, float]:
now = time.time()
stats = self._restore_attempts.get(username)
if not stats:
return True, 0.0
locked_until = stats.get("locked_until", 0.0)
if locked_until > now:
return False, locked_until - now
window_start = stats.get("window_start", now)
if now - window_start > RESTORE_WINDOW_SECONDS:
self._restore_attempts.pop(username, None)
return True, 0.0
return True, 0.0
def _record_failed_restore(self, username: str) -> None:
now = time.time()
stats = self._restore_attempts.get(username)
if not stats or now - stats.get("window_start", now) > RESTORE_WINDOW_SECONDS:
stats = {"count": 0, "window_start": now, "locked_until": 0.0}
stats["count"] += 1
stats["window_start"] = stats.get("window_start", now)
if stats["count"] >= RESTORE_ATTEMPT_LIMIT:
stats["locked_until"] = now + RESTORE_LOCK_SECONDS
stats["count"] = 0
stats["window_start"] = stats["locked_until"]
self._restore_attempts[username] = stats
def _clear_restore_attempts(self, username: str) -> None:
self._restore_attempts.pop(username, None)
def main() -> None:
service = SecurityService()