This commit is contained in:
root
2025-12-14 10:39:18 +03:00
commit 639f4e2b4e
179 changed files with 21065 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
# utils package

144
rodchenko/app/utils/art.py Executable file
View File

@@ -0,0 +1,144 @@
import json
import random
def generate_suprematist_art():
palettes = [
['#E41E26', '#000000', '#1A1A1A', '#FFD100'],
['#E41E26', '#000000', '#FFFFFF', '#003366'],
['#E41E26', '#FF6B35', '#000000', '#FFD100', '#1A1A1A'],
['#003366', '#000000', '#4A90D9', '#1A1A1A', '#708090'],
['#E41E26', '#FF6B35', '#FFD100', '#000000'],
]
palette = random.choice(palettes)
composition = []
comp_type = random.choice(['diagonal', 'centered', 'scattered', 'layered', 'cross'])
if comp_type == 'diagonal':
composition.append({
'type': 'rotated_rect',
'color': palette[0],
'x': random.randint(5, 20),
'y': random.randint(30, 50),
'width': random.randint(60, 80),
'height': random.randint(8, 15),
'angle': random.randint(-45, -25)
})
for _ in range(random.randint(2, 4)):
composition.append({
'type': random.choice(['rotated_rect', 'rectangle']),
'color': random.choice(palette),
'x': random.randint(10, 70),
'y': random.randint(10, 70),
'width': random.randint(15, 40),
'height': random.randint(5, 15),
'angle': random.randint(-60, 60)
})
elif comp_type == 'centered':
main_shape = random.choice(['rectangle', 'circle'])
composition.append({
'type': main_shape,
'color': palette[0],
'x': random.randint(25, 35),
'y': random.randint(20, 35),
'width': random.randint(30, 45),
'height': random.randint(25, 40),
'angle': 0
})
for _ in range(random.randint(3, 6)):
composition.append({
'type': random.choice(['rectangle', 'circle', 'triangle']),
'color': random.choice(palette[1:]),
'x': random.randint(5, 85),
'y': random.randint(5, 80),
'width': random.randint(8, 20),
'height': random.randint(8, 20),
'angle': random.randint(-30, 30)
})
elif comp_type == 'scattered':
for _ in range(random.randint(5, 9)):
shape_type = random.choice(['rectangle', 'circle', 'triangle', 'rotated_rect'])
size = random.randint(10, 30)
composition.append({
'type': shape_type,
'color': random.choice(palette),
'x': random.randint(5, 75),
'y': random.randint(5, 70),
'width': size,
'height': size if shape_type == 'circle' else random.randint(8, 30),
'angle': random.randint(-45, 45) if 'rect' in shape_type else 0
})
elif comp_type == 'layered':
base_x, base_y = random.randint(15, 30), random.randint(15, 30)
for i in range(random.randint(3, 5)):
offset = i * random.randint(8, 15)
composition.append({
'type': 'rectangle',
'color': palette[i % len(palette)],
'x': base_x + offset,
'y': base_y + offset // 2,
'width': random.randint(25, 45),
'height': random.randint(20, 35),
'angle': 0
})
for _ in range(random.randint(1, 3)):
composition.append({
'type': 'circle',
'color': palette[0],
'x': random.randint(50, 80),
'y': random.randint(10, 60),
'width': random.randint(10, 20),
'height': random.randint(10, 20),
'angle': 0
})
else:
center_x, center_y = random.randint(35, 50), random.randint(30, 45)
composition.append({
'type': 'rectangle',
'color': palette[0],
'x': 5,
'y': center_y,
'width': 90,
'height': random.randint(10, 18),
'angle': 0
})
composition.append({
'type': 'rectangle',
'color': random.choice(palette[1:3]),
'x': center_x,
'y': 5,
'width': random.randint(12, 20),
'height': 85,
'angle': 0
})
for _ in range(random.randint(2, 4)):
composition.append({
'type': random.choice(['circle', 'rectangle', 'triangle']),
'color': random.choice(palette),
'x': random.randint(5, 80),
'y': random.randint(5, 75),
'width': random.randint(8, 18),
'height': random.randint(8, 18),
'angle': random.randint(-20, 20)
})
return json.dumps(composition)
def generate_artwork_title():
prefixes = [
'Супрематическая композиция',
'Динамические формы',
'Геометрическая абстракция',
'Цветовой контраст',
'Пространственная структура'
]
return f"{random.choice(prefixes)}{random.randint(1, 1000)}"

475
rodchenko/app/utils/db.py Executable file
View File

@@ -0,0 +1,475 @@
import sqlite3
import hashlib
import ipaddress
from typing import Dict, List, Optional, Tuple
DATABASE = 'data/auction.db'
def get_db():
return sqlite3.connect(DATABASE)
def _table_has_column(cursor, table, column):
cursor.execute(f"PRAGMA table_info({table})")
return any(row[1] == column for row in cursor.fetchall())
def _ensure_created_at(conn, table, backfill_age_minutes=None):
c = conn.cursor()
if not _table_has_column(c, table, "created_at"):
c.execute(
f"ALTER TABLE {table} ADD COLUMN created_at TIMESTAMP"
)
if backfill_age_minutes is None:
c.execute(
f"UPDATE {table} SET created_at = CURRENT_TIMESTAMP WHERE created_at IS NULL"
)
else:
c.execute(
f"UPDATE {table} SET created_at = datetime('now', ?) WHERE created_at IS NULL",
(f"-{backfill_age_minutes} minutes",),
)
conn.commit()
return True
return False
def _delete_by_age(cursor, table, column, max_age_minutes, extra_where=None):
cutoff_delta = f"-{max_age_minutes} minutes"
extra = f" AND {extra_where}" if extra_where else ""
cursor.execute(
f"DELETE FROM {table} WHERE datetime({column}) < datetime('now', ?){extra}",
(cutoff_delta,),
)
return cursor.rowcount
def cleanup_expired_records(max_age_minutes=7):
conn = get_db()
c = conn.cursor()
try:
for table in ("users", "artworks", "transactions", "artwork_settings"):
# Backfill to "old enough" so legacy rows get cleaned on the next cycle.
_ensure_created_at(conn, table, backfill_age_minutes=max_age_minutes + 1)
deletions = {}
for table in ("artworks", "transactions", "artwork_settings", "users"):
if _table_has_column(c, table, "created_at"):
extra_where = "username != 'admin'" if table == "users" else None
deletions[table] = _delete_by_age(
c,
table=table,
column="created_at",
max_age_minutes=max_age_minutes,
extra_where=extra_where,
)
else:
deletions[table] = 0
c.execute(
"DELETE FROM artwork_settings WHERE artwork_id NOT IN (SELECT id FROM artworks)"
)
deleted_settings = c.rowcount
conn.commit()
return {
"artworks": deletions.get("artworks", 0),
"transactions": deletions.get("transactions", 0),
"artwork_settings": deletions.get("artwork_settings", 0)
+ deleted_settings,
"users": deletions.get("users", 0),
}
finally:
conn.close()
def init_db():
conn = get_db()
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(id INTEGER PRIMARY KEY, username TEXT UNIQUE, password TEXT, balance INTEGER DEFAULT 1000,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS artworks
(id INTEGER PRIMARY KEY, title TEXT, data TEXT, price INTEGER,
owner_id INTEGER, is_private INTEGER DEFAULT 0, signature TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS transactions
(id INTEGER PRIMARY KEY, artwork_id INTEGER, buyer_id INTEGER,
seller_id INTEGER, amount INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS artwork_settings
(id INTEGER PRIMARY KEY, artwork_id INTEGER, settings_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
# Schema guards for older databases
for table in ("users", "artworks", "transactions", "artwork_settings"):
_ensure_created_at(conn, table)
c.execute("INSERT OR IGNORE INTO users (username, password, balance) VALUES (?, ?, ?)",
('admin', hashlib.md5('hJAED8FUUoj6tYbyQkRkAqni'.encode()).hexdigest(), 999999))
conn.commit()
conn.close()
def get_user_balance(user_id):
conn = get_db()
c = conn.cursor()
c.execute("SELECT balance FROM users WHERE id = ?", (user_id,))
result = c.fetchone()
conn.close()
return result[0] if result else 0
def fetch_recent_artworks_for_user(user_id: int, limit: int = 20) -> List[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"""
SELECT a.id, a.title, a.data, a.price, a.owner_id, a.is_private,
a.signature, a.created_at, u.username
FROM artworks a
LEFT JOIN users u ON a.owner_id = u.id
WHERE a.is_private = 0 OR a.owner_id = ?
ORDER BY a.created_at DESC
LIMIT ?
""",
(user_id, limit),
)
artworks_data = c.fetchall()
return [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"is_private": art[5],
"signature": art[6] or "",
"created_at": art[7],
"owner_name": art[8],
}
for art in artworks_data
]
finally:
conn.close()
def authenticate_user(username: str, password: str) -> Optional[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"SELECT id, username, balance FROM users WHERE username = ? AND password = ?",
(username, hashlib.md5(password.encode()).hexdigest()),
)
user = c.fetchone()
if user:
return {"id": user[0], "username": user[1], "balance": user[2]}
return None
finally:
conn.close()
def session_user(username: str) -> Optional[Dict]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"SELECT id, username, balance FROM users WHERE username = ?",
[username],
)
user = c.fetchone()
if user:
return None
return True
finally:
conn.close()
def create_user(username: str, password: str) -> Tuple[bool, Optional[int]]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id FROM users WHERE username = ?", (username,))
if c.fetchone():
return False, None
c.execute(
"INSERT INTO users (username, password) VALUES (?, ?)",
(username, hashlib.md5(password.encode()).hexdigest()),
)
conn.commit()
return True, c.lastrowid
finally:
conn.close()
def create_artwork_record(
owner_id: int,
title: str,
data: str,
price: int,
is_private: int,
signature: str,
settings_data: Optional[str] = None,
created_at: Optional[str] = None,
) -> int:
conn = get_db()
c = conn.cursor()
try:
if created_at:
c.execute(
"""
INSERT INTO artworks (title, data, price, owner_id, is_private, signature, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(title, data, price, owner_id, is_private, signature, created_at),
)
else:
c.execute(
"""
INSERT INTO artworks (title, data, price, owner_id, is_private, signature)
VALUES (?, ?, ?, ?, ?, ?)
""",
(title, data, price, owner_id, is_private, signature),
)
artwork_id = c.lastrowid
if settings_data:
c.execute(
"INSERT INTO artwork_settings (artwork_id, settings_data) VALUES (?, ?)",
(artwork_id, settings_data),
)
conn.commit()
return artwork_id
finally:
conn.close()
def get_user_profile(user_id: int) -> Tuple[Optional[Dict], List[Dict]]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id, username, balance FROM users WHERE id = ?", (user_id,))
user_data = c.fetchone()
if not user_data:
return None, []
c.execute(
"""
SELECT id, title, data, price, owner_id, is_private, signature, created_at
FROM artworks
WHERE owner_id = ?
ORDER BY created_at DESC
""",
(user_id,),
)
artworks_data = c.fetchall()
artworks = [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"is_private": art[5],
"signature": art[6] or "",
"created_at": art[7],
}
for art in artworks_data
]
user = {"id": user_data[0], "username": user_data[1], "balance": user_data[2]}
return user, artworks
finally:
conn.close()
def get_artwork_with_settings(artwork_id: int) -> Optional[Tuple[Dict, Optional[str]]]:
conn = get_db()
c = conn.cursor()
try:
c.execute(
"""
SELECT a.id, a.title, a.data, a.price, a.owner_id, a.is_private,
a.signature, a.created_at, s.settings_data
FROM artworks a
LEFT JOIN artwork_settings s ON a.id = s.artwork_id
WHERE a.id = ?
""",
(artwork_id,),
)
row = c.fetchone()
if not row:
return None
artwork = {
"id": row[0],
"title": row[1],
"data": row[2],
"price": row[3],
"owner_id": row[4],
"is_private": row[5],
"signature": row[6] or "",
"created_at": row[7],
}
return artwork, row[8]
finally:
conn.close()
def save_artwork_settings(artwork_id: int, settings_data: str) -> None:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT id FROM artwork_settings WHERE artwork_id = ?", (artwork_id,))
existing = c.fetchone()
if existing:
c.execute(
"UPDATE artwork_settings SET settings_data = ? WHERE artwork_id = ?",
(settings_data, artwork_id),
)
else:
c.execute(
"INSERT INTO artwork_settings (artwork_id, settings_data) VALUES (?, ?)",
(artwork_id, settings_data),
)
conn.commit()
finally:
conn.close()
def get_artwork_owner_id(artwork_id: int) -> Optional[int]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT owner_id FROM artworks WHERE id = ?", (artwork_id,))
row = c.fetchone()
return row[0] if row else None
finally:
conn.close()
def purchase_artwork(buyer_id: int, artwork_id: int) -> Tuple[bool, str]:
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT * FROM artworks WHERE id = ?", (artwork_id,))
artwork = c.fetchone()
if not artwork:
return False, "not_found"
price = artwork[3]
owner_id = artwork[4]
if owner_id == buyer_id:
return False, "same_owner"
c.execute("SELECT balance FROM users WHERE id = ?", (buyer_id,))
buyer = c.fetchone()
if not buyer:
return False, "buyer_missing"
buyer_balance = buyer[0]
if buyer_balance < price:
return False, "insufficient"
c.execute(
"UPDATE users SET balance = balance - ? WHERE id = ?",
(price, buyer_id),
)
c.execute(
"UPDATE users SET balance = balance + ? WHERE id = ?",
(price, owner_id),
)
c.execute(
"UPDATE artworks SET owner_id = ? WHERE id = ?",
(buyer_id, artwork_id),
)
c.execute(
"INSERT INTO transactions (artwork_id, buyer_id, seller_id, amount) VALUES (?, ?, ?, ?)",
(artwork_id, buyer_id, owner_id, price),
)
conn.commit()
return True, "ok"
finally:
conn.close()
def search_artworks(query: str) -> List[Dict]:
conn = get_db()
c = conn.cursor()
try:
search_query = f"SELECT * FROM artworks WHERE title LIKE '%{query}%' OR data LIKE '%{query}%'"
c.execute(search_query)
results_data = c.fetchall()
return [
{
"id": art[0],
"title": art[1],
"data": art[2],
"price": art[3],
"owner_id": art[4],
"created_at": art[5],
}
for art in results_data
]
finally:
conn.close()
def import_artwork_record(
owner_id: int, title: str, shapes_json: str, price: int, signature: str = ""
) -> int:
return create_artwork_record(
owner_id=owner_id,
title=title,
data=shapes_json,
price=price,
is_private=0,
signature=signature,
)
def check_connect(ip):
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT COUNT(*) FROM users")
users_count = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM artworks")
artworks_count = c.fetchone()[0]
c.execute(
"""
SELECT a.id, a.title, a.signature, a.is_private, u.username, a.created_at
FROM artworks a
JOIN users u ON a.owner_id = u.id
ORDER BY a.created_at DESC
""",
)
recent_data = c.fetchall()
recent = [
{
"id": art[0],
"title": art[1],
"signature": art[2] or "",
"is_private": art[3],
"owner": art[4],
"created_at": art[5],
}
for art in recent_data
]
parts = ["OK"]
if ipaddress.ip_address(ip).is_loopback:
parts.append(recent)
return parts
finally:
conn.close()

105
rodchenko/app/utils/security.py Executable file
View File

@@ -0,0 +1,105 @@
import socket
import pickle
import base64
import ipaddress
from urllib.parse import urlparse
def load_artwork_settings(settings_data):
try:
if not settings_data:
return None
try:
padding = len(settings_data) % 4
if padding:
settings_data_padded = settings_data + '=' * (4 - padding)
else:
settings_data_padded = settings_data
raw = base64.b64decode(settings_data_padded)
if raw[:2] in (b'\x80\x03', b'\x80\x04', b'\x80\x05', b'\x80\x02'):
settings = pickle.loads(raw)
if hasattr(settings, '__dict__'):
return settings.__dict__
elif isinstance(settings, dict):
return settings
else:
return {'data': str(settings)}
except:
pass
return {'description': settings_data}
except Exception as e:
return {'error': str(e)}
def save_artwork_description(description):
if not description:
return None
try:
padding = len(description) % 4
if padding:
padded = description + '=' * (4 - padding)
else:
padded = description
raw = base64.b64decode(padded)
if raw[:2] in (b'\x80\x03', b'\x80\x04', b'\x80\x05', b'\x80\x02'):
return description
except:
pass
return description
class ArtworkConfig:
def __init__(self, colors=None, animation=False, public=True):
self.colors = colors or ["#FF0000", "#00FF00", "#0000FF"]
self.animation = animation
self.public = public
def __repr__(self):
return f"ArtworkConfig(colors={self.colors}, animation={self.animation}, public={self.public})"
def __str__(self):
return self.__repr__()
def __reduce__(self):
return (self.__class__, (self.colors, self.animation, self.public))
def is_safe_url(url: str):
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False, "403"
hostname = parsed.hostname
if not hostname:
return False, "403"
try:
ip_str = socket.gethostbyname(hostname)
except socket.gaierror:
return False, "403"
try:
ip = ipaddress.ip_address(ip_str)
except ValueError:
return False, "403"
if (
ip.is_loopback
or ip.is_private
or ip.is_link_local
or ip.is_unspecified
):
return False, "403"
return True, ip_str
except Exception:
return False, "403"