Files
M-CTF-2025/rodchenko/app/app.py

353 lines
12 KiB
Python
Raw Normal View History

2025-12-14 10:39:18 +03:00
"""
Rodchenko
Suprematist art auction
"""
from flask import Flask, render_template, request, redirect, url_for, session, flash
from dotenv import load_dotenv
import requests
import json
import os
import threading
import time
import logging
from utils.db import (
authenticate_user,
cleanup_expired_records,
create_artwork_record,
create_user,
session_user,
fetch_recent_artworks_for_user,
get_artwork_owner_id,
get_artwork_with_settings,
check_connect,
get_user_balance,
get_user_profile,
import_artwork_record,
init_db,
purchase_artwork,
save_artwork_settings,
search_artworks,
)
from utils.art import generate_suprematist_art, generate_artwork_title
from utils.security import load_artwork_settings, save_artwork_description, is_safe_url
app = Flask(__name__)
load_dotenv()
2025-12-14 10:44:48 +03:00
app.secret_key = os.environ.get("SECRET_KEY", "super_secret_key_123ZZZZZZZZZPWNEDBYKIDZZZ")
2025-12-14 10:39:18 +03:00
_cleaner_started = False
CLEANER_INTERVAL_SECONDS = 7 * 60
CLEANER_MAX_AGE_MINUTES = 7
logger = logging.getLogger("db-cleaner")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def start_db_cleaner():
global _cleaner_started
if _cleaner_started:
return
_cleaner_started = True
logger.info("Starting DB cleaner thread with interval=%ss window=%smin",
CLEANER_INTERVAL_SECONDS, CLEANER_MAX_AGE_MINUTES)
def _worker():
while True:
try:
result = cleanup_expired_records(max_age_minutes=CLEANER_MAX_AGE_MINUTES)
logger.info("DB cleanup run: %s", result)
except Exception as exc:
logger.exception("DB cleanup failed: %s", exc)
time.sleep(CLEANER_INTERVAL_SECONDS)
thread = threading.Thread(target=_worker, name="db-cleaner", daemon=True)
thread.start()
init_db()
start_db_cleaner()
@app.route('/')
def index():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
artworks = fetch_recent_artworks_for_user(session['user_id'])
user_balance = get_user_balance(session['user_id'])
return render_template('index.html', artworks=artworks, balance=user_balance)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = authenticate_user(username, password)
if user:
session['user_id'] = user["id"]
session['username'] = user["username"]
flash('Успешный вход!', 'success')
return redirect(url_for('index'))
else:
flash('Неверные учетные данные!', 'error')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('Заполните все поля!', 'error')
return render_template('register.html')
if len(username) < 3:
flash('Имя пользователя должно быть не менее 3 символов!', 'error')
return render_template('register.html')
created, _ = create_user(username, password)
if not created:
flash('Пользователь с таким именем уже существует!', 'error')
return render_template('register.html')
flash('Регистрация успешна!', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.clear()
flash('Вы вышли из системы', 'success')
return redirect(url_for('login'))
@app.route('/create_artwork', methods=['POST'])
def create_artwork():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
price = request.form.get('price', 100)
description = request.form.get('description', '')
is_private = 1 if request.form.get('is_private') else 0
signature = request.form.get('signature', '')
title = generate_artwork_title()
art_data = generate_suprematist_art()
settings_data = save_artwork_description(description) if description else None
create_artwork_record(
owner_id=session['user_id'],
title=title,
data=art_data,
price=price,
is_private=is_private,
signature=signature,
settings_data=settings_data,
)
if settings_data:
_ = load_artwork_settings(settings_data)
flash('Картина создана!', 'success')
return redirect(url_for('index'))
@app.route('/buy/<int:artwork_id>')
def buy_artwork(artwork_id):
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
success, status = purchase_artwork(session['user_id'], artwork_id)
if success:
flash('Покупка успешна!', 'success')
else:
if status == "not_found":
flash('Картина не найдена!', 'error')
elif status == "same_owner":
flash('Нельзя купить свою картину!', 'error')
elif status == "insufficient":
flash('Недостаточно средств!', 'error')
else:
flash('Не удалось завершить покупку.', 'error')
return redirect(url_for('index'))
@app.route('/profile')
def profile():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
user_id = session['user_id']
user, artworks = get_user_profile(user_id)
if not user:
flash('Пользователь не найден', 'error')
return redirect(url_for('logout'))
return render_template('profile.html', user=user, artworks=artworks, balance=user['balance'])
@app.route('/artwork_settings/<int:artwork_id>')
def artwork_settings(artwork_id):
if session_user(session['username']):
return redirect(url_for('login'))
artwork_data = get_artwork_with_settings(artwork_id)
if not artwork_data:
flash('Артворк не найден!', 'error')
return redirect(url_for('index'))
artwork, settings_data = artwork_data
if artwork['owner_id'] != session['user_id']:
flash('Нет доступа!', 'error')
return redirect(url_for('index'))
settings = None
if settings_data:
settings = load_artwork_settings(settings_data)
balance = get_user_balance(session['user_id'])
return render_template('artwork_settings.html', artwork=artwork, settings=settings, balance=balance)
@app.route('/update_settings', methods=['POST'])
def update_settings():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
artwork_id = request.form.get('artwork_id')
description = request.form.get('description', '')
try:
artwork_id_int = int(artwork_id)
except (TypeError, ValueError):
flash('Недостаточно прав!', 'error')
return redirect(url_for('index'))
owner_id = get_artwork_owner_id(artwork_id_int)
if owner_id and owner_id == session['user_id']:
if description:
settings_data = save_artwork_description(description)
save_artwork_settings(artwork_id_int, settings_data)
_ = load_artwork_settings(settings_data)
flash('Описание обновлено!', 'success')
else:
flash('Введите описание!', 'error')
else:
flash('Недостаточно прав!', 'error')
return redirect(url_for('artwork_settings', artwork_id=artwork_id_int))
@app.route('/search')
def search():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
query = request.args.get('q', '')
balance = get_user_balance(session['user_id'])
results = search_artworks(query) if query else []
return render_template('search.html', results=results, query=query, balance=balance)
@app.route('/import_artwork', methods=['GET', 'POST'])
def import_artwork():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
preview_content = None
error = None
success = None
fetched_url = None
balance = get_user_balance(session['user_id'])
if request.method == 'POST':
artwork_url = request.form.get('artwork_url', '').strip()
if artwork_url:
is_safe, result = is_safe_url(artwork_url)
if not is_safe:
error = f"Небезопасный URL: {result}"
else:
try:
resp = requests.get(
artwork_url,
timeout=5,
allow_redirects=True,
headers={'User-Agent': 'Rodchenko-Gallery/1.0'}
)
if resp.status_code == 200:
preview_content = resp.text[:10000]
fetched_url = resp.url
try:
data = resp.json()
if isinstance(data, dict) and 'shapes' in data:
title = data.get('title', 'Импортированная композиция')
shapes_json = json.dumps(data['shapes'])
price = data.get('price', 100)
import_artwork_record(
owner_id=session['user_id'],
title=title,
shapes_json=shapes_json,
price=price,
signature=data.get('signature', ''),
)
success = f"Картина '{title}' успешно импортирована!"
except (json.JSONDecodeError, KeyError):
pass
else:
error = f"Сервер вернул статус {resp.status_code}"
except requests.exceptions.Timeout:
error = "Таймаут при загрузке"
except requests.exceptions.RequestException as e:
error = f"Ошибка загрузки: {str(e)}"
return render_template('import_artwork.html',
preview_content=preview_content,
error=error,
success=success,
fetched_url=fetched_url,
balance=balance)
@app.route('/healthcheck')
def healthcheck():
return check_connect(request.remote_addr), 200