Files
M-CTF-2025/rodchenko/app/utils/db.py
2025-12-14 11:09:33 +03:00

478 lines
14 KiB
Python
Executable File

import sqlite3
import hashlib
import ipaddress
from typing import Dict, List, Optional, Tuple
DATABASE = 'data/auction.db'
def get_db():
return sqlite3.connect(DATABASE)
def _table_has_column(cursor, table, column):
cursor.execute(f"PRAGMA table_info({table})")
return any(row[1] == column for row in cursor.fetchall())
def _ensure_created_at(conn, table, backfill_age_minutes=None):
c = conn.cursor()
if not _table_has_column(c, table, "created_at"):
c.execute(
f"ALTER TABLE {table} ADD COLUMN created_at TIMESTAMP"
)
if backfill_age_minutes is None:
c.execute(
f"UPDATE {table} SET created_at = CURRENT_TIMESTAMP WHERE created_at IS NULL"
)
else:
c.execute(
f"UPDATE {table} SET created_at = datetime('now', ?) WHERE created_at IS NULL",
(f"-{backfill_age_minutes} minutes",),
)
conn.commit()
return True
return False
def _delete_by_age(cursor, table, column, max_age_minutes, extra_where=None):
cutoff_delta = f"-{max_age_minutes} minutes"
extra = f" AND {extra_where}" if extra_where else ""
cursor.execute(
f"DELETE FROM {table} WHERE datetime({column}) < datetime('now', ?){extra}",
(cutoff_delta,),
)
return cursor.rowcount
def cleanup_expired_records(max_age_minutes=7):
conn = get_db()
c = conn.cursor()
try:
for table in ("users", "artworks", "transactions", "artwork_settings"):
# Backfill to "old enough" so legacy rows get cleaned on the next cycle.
_ensure_created_at(conn, table, backfill_age_minutes=max_age_minutes + 1)
deletions = {}
for table in ("artworks", "transactions", "artwork_settings", "users"):
if _table_has_column(c, table, "created_at"):
extra_where = "username != 'admin'" if table == "users" else None
deletions[table] = _delete_by_age(
c,
table=table,
column="created_at",
max_age_minutes=max_age_minutes,
extra_where=extra_where,
)
else:
deletions[table] = 0
c.execute(
"DELETE FROM artwork_settings WHERE artwork_id NOT IN (SELECT id FROM artworks)"
)
deleted_settings = c.rowcount
conn.commit()
return {
"artworks": deletions.get("artworks", 0),
"transactions": deletions.get("transactions", 0),
"artwork_settings": deletions.get("artwork_settings", 0)
+ deleted_settings,
"users": deletions.get("users", 0),
}
finally:
conn.close()
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(id INTEGER PRIMARY KEY, username TEXT UNIQUE, password TEXT, balance INTEGER DEFAULT 1000,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS artworks
(id INTEGER PRIMARY KEY, title TEXT, data TEXT, price INTEGER,
owner_id INTEGER, is_private INTEGER DEFAULT 0, signature TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS transactions
(id INTEGER PRIMARY KEY, artwork_id INTEGER, buyer_id INTEGER,
seller_id INTEGER, amount INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS artwork_settings
(id INTEGER PRIMARY KEY, artwork_id INTEGER, settings_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
# Schema guards for older databases
for table in ("users", "artworks", "transactions", "artwork_settings"):
_ensure_created_at(conn, table)
c.execute("INSERT OR IGNORE INTO users (username, password, balance) VALUES (?, ?, ?)",
('admin', hashlib.md5('MFKJGOIH3249738547IJTIRTJHOIJJ'.encode()).hexdigest(), 999999))
conn.commit()
conn.close()
def get_user_balance(user_id):
conn = get_db()
c = conn.cursor()
c.execute("SELECT balance FROM users WHERE id = ?", (user_id,))
result = c.fetchone()
conn.close()
return result[0] if result else 0
def fetch_recent_artworks_for_user(user_id: int, limit: int = 20) -> List[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"""
SELECT a.id, a.title, a.data, a.price, a.owner_id, a.is_private,
a.signature, a.created_at, u.username
FROM artworks a
LEFT JOIN users u ON a.owner_id = u.id
WHERE a.is_private = 0 OR a.owner_id = ?
ORDER BY a.created_at DESC
LIMIT ?
""",
(user_id, limit),
)
artworks_data = c.fetchall()
return [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"is_private": art[5],
"signature": art[6] or "",
"created_at": art[7],
"owner_name": art[8],
}
for art in artworks_data
]
finally:
conn.close()
def authenticate_user(username: str, password: str) -> Optional[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"SELECT id, username, balance FROM users WHERE username = ? AND password = ?",
(username, hashlib.md5(password.encode()).hexdigest()),
)
user = c.fetchone()
if user:
return {"id": user[0], "username": user[1], "balance": user[2]}
return None
finally:
conn.close()
def session_user(username: str) -> Optional[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"SELECT id, username, balance FROM users WHERE username = ?",
[username],
)
user = c.fetchone()
if user:
return None
return True
finally:
conn.close()
def create_user(username: str, password: str) -> Tuple[bool, Optional[int]]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id FROM users WHERE username = ?", (username,))
if c.fetchone():
return False, None
c.execute(
"INSERT INTO users (username, password) VALUES (?, ?)",
(username, hashlib.md5(password.encode()).hexdigest()),
)
conn.commit()
return True, c.lastrowid
finally:
conn.close()
def create_artwork_record(
owner_id: int,
title: str,
data: str,
price: int,
is_private: int,
signature: str,
settings_data: Optional[str] = None,
created_at: Optional[str] = None,
) -> int:
conn = get_db()
c = conn.cursor()
try:
if created_at:
c.execute(
"""
INSERT INTO artworks (title, data, price, owner_id, is_private, signature, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(title, data, price, owner_id, is_private, signature, created_at),
)
else:
c.execute(
"""
INSERT INTO artworks (title, data, price, owner_id, is_private, signature)
VALUES (?, ?, ?, ?, ?, ?)
""",
(title, data, price, owner_id, is_private, signature),
)
artwork_id = c.lastrowid
if settings_data:
c.execute(
"INSERT INTO artwork_settings (artwork_id, settings_data) VALUES (?, ?)",
(artwork_id, settings_data),
)
conn.commit()
return artwork_id
finally:
conn.close()
def get_user_profile(user_id: int) -> Tuple[Optional[Dict], List[Dict]]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id, username, balance FROM users WHERE id = ?", (user_id,))
user_data = c.fetchone()
if not user_data:
return None, []
c.execute(
"""
SELECT id, title, data, price, owner_id, is_private, signature, created_at
FROM artworks
WHERE owner_id = ?
ORDER BY created_at DESC
""",
(user_id,),
)
artworks_data = c.fetchall()
artworks = [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"is_private": art[5],
"signature": art[6] or "",
"created_at": art[7],
}
for art in artworks_data
]
user = {"id": user_data[0], "username": user_data[1], "balance": user_data[2]}
return user, artworks
finally:
conn.close()
def get_artwork_with_settings(artwork_id: int) -> Optional[Tuple[Dict, Optional[str]]]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"""
SELECT a.id, a.title, a.data, a.price, a.owner_id, a.is_private,
a.signature, a.created_at, s.settings_data
FROM artworks a
LEFT JOIN artwork_settings s ON a.id = s.artwork_id
WHERE a.id = ?
""",
(artwork_id,),
)
row = c.fetchone()
if not row:
return None
artwork = {
"id": row[0],
"title": row[1],
"data": row[2],
"price": row[3],
"owner_id": row[4],
"is_private": row[5],
"signature": row[6] or "",
"created_at": row[7],
}
return artwork, row[8]
finally:
conn.close()
def save_artwork_settings(artwork_id: int, settings_data: str) -> None:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id FROM artwork_settings WHERE artwork_id = ?", (artwork_id,))
existing = c.fetchone()
if existing:
c.execute(
"UPDATE artwork_settings SET settings_data = ? WHERE artwork_id = ?",
(settings_data, artwork_id),
)
else:
c.execute(
"INSERT INTO artwork_settings (artwork_id, settings_data) VALUES (?, ?)",
(artwork_id, settings_data),
)
conn.commit()
finally:
conn.close()
def get_artwork_owner_id(artwork_id: int) -> Optional[int]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT owner_id FROM artworks WHERE id = ?", (artwork_id,))
row = c.fetchone()
return row[0] if row else None
finally:
conn.close()
def purchase_artwork(buyer_id: int, artwork_id: int) -> Tuple[bool, str]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT * FROM artworks WHERE id = ?", (artwork_id,))
artwork = c.fetchone()
if not artwork:
return False, "not_found"
price = artwork[3]
owner_id = artwork[4]
if owner_id == buyer_id:
return False, "same_owner"
c.execute("SELECT balance FROM users WHERE id = ?", (buyer_id,))
buyer = c.fetchone()
if not buyer:
return False, "buyer_missing"
buyer_balance = buyer[0]
if buyer_balance < price:
return False, "insufficient"
c.execute(
"UPDATE users SET balance = balance - ? WHERE id = ?",
(price, buyer_id),
)
c.execute(
"UPDATE users SET balance = balance + ? WHERE id = ?",
(price, owner_id),
)
c.execute(
"UPDATE artworks SET owner_id = ? WHERE id = ?",
(buyer_id, artwork_id),
)
c.execute(
"INSERT INTO transactions (artwork_id, buyer_id, seller_id, amount) VALUES (?, ?, ?, ?)",
(artwork_id, buyer_id, owner_id, price),
)
conn.commit()
return True, "ok"
finally:
conn.close()
def search_artworks(query: str) -> List[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"SELECT * FROM artworks WHERE title LIKE ? OR data LIKE ?",
(f"%{query}%", f"%{query}%")
)
results_data = c.fetchall()
return [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"created_at": art[5],
}
for art in results_data
]
finally:
conn.close()
def import_artwork_record(
owner_id: int, title: str, shapes_json: str, price: int, signature: str = ""
) -> int:
return create_artwork_record(
owner_id=owner_id,
title=title,
data=shapes_json,
price=price,
is_private=0,
signature=signature,
)
def check_connect(ip):
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT COUNT(*) FROM users")
users_count = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM artworks")
artworks_count = c.fetchone()[0]
c.execute(
"""
SELECT a.id, a.title, a.signature, a.is_private, u.username, a.created_at
FROM artworks a
JOIN users u ON a.owner_id = u.id
ORDER BY a.created_at DESC
""",
)
recent_data = c.fetchall()
recent = [
{
"id": art[0],
"title": art[1],
"signature": art[2] or "",
"is_private": art[3],
"owner": art[4],
"created_at": art[5],
}
for art in recent_data
]
parts = ["OK"]
if ipaddress.ip_address(ip).is_loopback:
parts.append(recent)
return parts
finally:
conn.close()