Files
2025-12-14 10:44:48 +03:00

353 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Rodchenko
Suprematist art auction
"""
from flask import Flask, render_template, request, redirect, url_for, session, flash
from dotenv import load_dotenv
import requests
import json
import os
import threading
import time
import logging
from utils.db import (
authenticate_user,
cleanup_expired_records,
create_artwork_record,
create_user,
session_user,
fetch_recent_artworks_for_user,
get_artwork_owner_id,
get_artwork_with_settings,
check_connect,
get_user_balance,
get_user_profile,
import_artwork_record,
init_db,
purchase_artwork,
save_artwork_settings,
search_artworks,
)
from utils.art import generate_suprematist_art, generate_artwork_title
from utils.security import load_artwork_settings, save_artwork_description, is_safe_url
app = Flask(__name__)
load_dotenv()
app.secret_key = os.environ.get("SECRET_KEY", "super_secret_key_123ZZZZZZZZZPWNEDBYKIDZZZ")
_cleaner_started = False
CLEANER_INTERVAL_SECONDS = 7 * 60
CLEANER_MAX_AGE_MINUTES = 7
logger = logging.getLogger("db-cleaner")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def start_db_cleaner():
global _cleaner_started
if _cleaner_started:
return
_cleaner_started = True
logger.info("Starting DB cleaner thread with interval=%ss window=%smin",
CLEANER_INTERVAL_SECONDS, CLEANER_MAX_AGE_MINUTES)
def _worker():
while True:
try:
result = cleanup_expired_records(max_age_minutes=CLEANER_MAX_AGE_MINUTES)
logger.info("DB cleanup run: %s", result)
except Exception as exc:
logger.exception("DB cleanup failed: %s", exc)
time.sleep(CLEANER_INTERVAL_SECONDS)
thread = threading.Thread(target=_worker, name="db-cleaner", daemon=True)
thread.start()
init_db()
start_db_cleaner()
@app.route('/')
def index():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
artworks = fetch_recent_artworks_for_user(session['user_id'])
user_balance = get_user_balance(session['user_id'])
return render_template('index.html', artworks=artworks, balance=user_balance)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = authenticate_user(username, password)
if user:
session['user_id'] = user["id"]
session['username'] = user["username"]
flash('Успешный вход!', 'success')
return redirect(url_for('index'))
else:
flash('Неверные учетные данные!', 'error')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('Заполните все поля!', 'error')
return render_template('register.html')
if len(username) < 3:
flash('Имя пользователя должно быть не менее 3 символов!', 'error')
return render_template('register.html')
created, _ = create_user(username, password)
if not created:
flash('Пользователь с таким именем уже существует!', 'error')
return render_template('register.html')
flash('Регистрация успешна!', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.clear()
flash('Вы вышли из системы', 'success')
return redirect(url_for('login'))
@app.route('/create_artwork', methods=['POST'])
def create_artwork():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
price = request.form.get('price', 100)
description = request.form.get('description', '')
is_private = 1 if request.form.get('is_private') else 0
signature = request.form.get('signature', '')
title = generate_artwork_title()
art_data = generate_suprematist_art()
settings_data = save_artwork_description(description) if description else None
create_artwork_record(
owner_id=session['user_id'],
title=title,
data=art_data,
price=price,
is_private=is_private,
signature=signature,
settings_data=settings_data,
)
if settings_data:
_ = load_artwork_settings(settings_data)
flash('Картина создана!', 'success')
return redirect(url_for('index'))
@app.route('/buy/<int:artwork_id>')
def buy_artwork(artwork_id):
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
success, status = purchase_artwork(session['user_id'], artwork_id)
if success:
flash('Покупка успешна!', 'success')
else:
if status == "not_found":
flash('Картина не найдена!', 'error')
elif status == "same_owner":
flash('Нельзя купить свою картину!', 'error')
elif status == "insufficient":
flash('Недостаточно средств!', 'error')
else:
flash('Не удалось завершить покупку.', 'error')
return redirect(url_for('index'))
@app.route('/profile')
def profile():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
user_id = session['user_id']
user, artworks = get_user_profile(user_id)
if not user:
flash('Пользователь не найден', 'error')
return redirect(url_for('logout'))
return render_template('profile.html', user=user, artworks=artworks, balance=user['balance'])
@app.route('/artwork_settings/<int:artwork_id>')
def artwork_settings(artwork_id):
if session_user(session['username']):
return redirect(url_for('login'))
artwork_data = get_artwork_with_settings(artwork_id)
if not artwork_data:
flash('Артворк не найден!', 'error')
return redirect(url_for('index'))
artwork, settings_data = artwork_data
if artwork['owner_id'] != session['user_id']:
flash('Нет доступа!', 'error')
return redirect(url_for('index'))
settings = None
if settings_data:
settings = load_artwork_settings(settings_data)
balance = get_user_balance(session['user_id'])
return render_template('artwork_settings.html', artwork=artwork, settings=settings, balance=balance)
@app.route('/update_settings', methods=['POST'])
def update_settings():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
artwork_id = request.form.get('artwork_id')
description = request.form.get('description', '')
try:
artwork_id_int = int(artwork_id)
except (TypeError, ValueError):
flash('Недостаточно прав!', 'error')
return redirect(url_for('index'))
owner_id = get_artwork_owner_id(artwork_id_int)
if owner_id and owner_id == session['user_id']:
if description:
settings_data = save_artwork_description(description)
save_artwork_settings(artwork_id_int, settings_data)
_ = load_artwork_settings(settings_data)
flash('Описание обновлено!', 'success')
else:
flash('Введите описание!', 'error')
else:
flash('Недостаточно прав!', 'error')
return redirect(url_for('artwork_settings', artwork_id=artwork_id_int))
@app.route('/search')
def search():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
query = request.args.get('q', '')
balance = get_user_balance(session['user_id'])
results = search_artworks(query) if query else []
return render_template('search.html', results=results, query=query, balance=balance)
@app.route('/import_artwork', methods=['GET', 'POST'])
def import_artwork():
if 'username' not in session:
return redirect(url_for('login'))
if session_user(session['username']):
return redirect(url_for('login'))
preview_content = None
error = None
success = None
fetched_url = None
balance = get_user_balance(session['user_id'])
if request.method == 'POST':
artwork_url = request.form.get('artwork_url', '').strip()
if artwork_url:
is_safe, result = is_safe_url(artwork_url)
if not is_safe:
error = f"Небезопасный URL: {result}"
else:
try:
resp = requests.get(
artwork_url,
timeout=5,
allow_redirects=True,
headers={'User-Agent': 'Rodchenko-Gallery/1.0'}
)
if resp.status_code == 200:
preview_content = resp.text[:10000]
fetched_url = resp.url
try:
data = resp.json()
if isinstance(data, dict) and 'shapes' in data:
title = data.get('title', 'Импортированная композиция')
shapes_json = json.dumps(data['shapes'])
price = data.get('price', 100)
import_artwork_record(
owner_id=session['user_id'],
title=title,
shapes_json=shapes_json,
price=price,
signature=data.get('signature', ''),
)
success = f"Картина '{title}' успешно импортирована!"
except (json.JSONDecodeError, KeyError):
pass
else:
error = f"Сервер вернул статус {resp.status_code}"
except requests.exceptions.Timeout:
error = "Таймаут при загрузке"
except requests.exceptions.RequestException as e:
error = f"Ошибка загрузки: {str(e)}"
return render_template('import_artwork.html',
preview_content=preview_content,
error=error,
success=success,
fetched_url=fetched_url,
balance=balance)
@app.route('/healthcheck')
def healthcheck():
return check_connect(request.remote_addr), 200