Files
manga/src/api.py
2026-05-01 02:02:36 +03:00

1263 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FastAPI веб-сервер: REST API + WebSocket для мониторинга загрузок манги.
"""
import asyncio
import hashlib
import hmac as _hmac
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
from croniter import croniter
from fastapi import FastAPI, Request, Response, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from loguru import logger
from .state import StateDB
from .worker import download_manga, check_for_updates
from .exporter import patch_meta, MangaMeta
from .sources import registry, get_source_for_url, extract_domain
OUTPUT_DIR = Path("/app/output")
FRONTEND_DIR = Path("/app/frontend")
# ── Авторизация ───────────────────────────────
AUTH_LOGIN = os.getenv("AUTH_LOGIN", "")
AUTH_PASSWORD = os.getenv("AUTH_PASSWORD", "")
AUTH_ENABLED = bool(AUTH_LOGIN and AUTH_PASSWORD)
COOKIE_NAME = "manga_session"
COOKIE_MAX_AGE = 30 * 24 * 3600 # 30 дней
def _compute_token() -> str:
"""Стабильный токен сессии, производный от credentials."""
return _hmac.new(
AUTH_PASSWORD.encode(),
AUTH_LOGIN.encode(),
hashlib.sha256,
).hexdigest()
_VALID_TOKEN: str = _compute_token() if AUTH_ENABLED else ""
# Пути, доступные без авторизации
_AUTH_EXEMPT = {"/api/login", "/api/auth/check", "/api/logout"}
app = FastAPI(title="Manga Downloader API")
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Проверяет авторизацию для всех /api/* эндпоинтов."""
if not AUTH_ENABLED:
return await call_next(request)
path = request.url.path
# Пропускаем статику и исключения
if not path.startswith("/api") or path in _AUTH_EXEMPT:
return await call_next(request)
token = request.cookies.get(COOKIE_NAME)
if token != _VALID_TOKEN:
return JSONResponse({"detail": "Unauthorized"}, status_code=401)
return await call_next(request)
# ── WebSocket менеджер ────────────────────────
class ConnectionManager:
def __init__(self):
self.active: set[WebSocket] = set()
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.add(ws)
def disconnect(self, ws: WebSocket):
self.active.discard(ws)
async def broadcast(self, data: dict):
dead = set()
for ws in list(self.active):
try:
await ws.send_json(data)
except Exception:
dead.add(ws)
self.active -= dead
ws_manager = ConnectionManager()
# ── Очередь загрузки ─────────────────────────
download_queue: asyncio.Queue = asyncio.Queue()
# url → asyncio.Task текущей загрузки
active_tasks: dict[str, asyncio.Task] = {}
async def _broadcast_queue_positions():
"""Отправляет всем клиентам актуальные позиции в очереди."""
queue_list = list(download_queue._queue) # type: ignore
positions = {job["url"]: i + 1 for i, job in enumerate(queue_list)}
await ws_manager.broadcast({"type": "queue_positions", "positions": positions})
async def queue_worker():
"""Последовательно обрабатывает очередь загрузок. Перезапускается при краше."""
while True:
try:
await _queue_worker_loop()
except Exception as e:
logger.error("queue_worker упал, перезапускаю через 5 сек: {}", e)
await asyncio.sleep(5)
async def _queue_worker_loop():
while True:
job = await download_queue.get()
url = job["url"]
fmt = job.get("fmt", "cbz")
# Проверяем, не была ли манга остановлена пока стояла в очереди
skip = False
db = StateDB()
try:
m = db.get_manga(url)
if m and m["status"] == "stopped":
logger.info("Воркер: пропускаю остановленную {}", url)
skip = True
finally:
db.close()
if skip:
download_queue.task_done()
await _broadcast_queue_positions()
continue
logger.info("Воркер: начинаю скачивать {}", url)
# Позиции изменились — уведомляем клиентов
await _broadcast_queue_positions()
dl_task = asyncio.create_task(download_manga(
url=url,
fmt=fmt,
is_update=job.get("is_update", False),
resume=job.get("resume", True),
on_event=ws_manager.broadcast,
))
active_tasks[url] = dl_task
try:
await dl_task
except asyncio.CancelledError:
logger.info("Воркер: загрузка прервана: {}", url)
_db = StateDB()
try:
current_status = _db.get_manga(url)
if current_status and current_status["status"] == "queued":
# Нас приоритизировали и поставили обратно в очередь — уведомляем
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": fmt})
elif current_status and current_status["status"] != "stopped":
# Статус ещё не "stopped" (например отменили не через /stop, а внутренне)
_db.update_manga_status(url, "stopped")
await ws_manager.broadcast({"type": "manga_stopped", "url": url})
# Если статус уже "stopped" — API-эндпоинт уже всё сделал, ничего не дублируем
finally:
_db.close()
except Exception as e:
logger.error("Воркер ошибка {}: {}", url, e)
finally:
active_tasks.pop(url, None)
download_queue.task_done()
await _broadcast_queue_positions()
@app.on_event("startup")
async def startup_event():
# Синхронизируем источники с кодом и мигрируем существующие манги
_db = StateDB()
try:
_db.sync_sources(registry)
migrated = _db.migrate_manga_sources()
if migrated:
logger.info("Авто-миграция: проставлен source_id для {} манг", migrated)
finally:
_db.close()
asyncio.create_task(queue_worker())
asyncio.create_task(update_scheduler())
# Восстанавливаем очередь из БД (незавершённые задачи)
db = StateDB()
try:
for manga in db.get_all_mangas():
if manga["status"] in ("queued", "downloading"):
db.update_manga_status(manga["url"], "queued")
await download_queue.put({"url": manga["url"], "fmt": manga["format"]})
logger.info("Восстановлено из очереди: {}", manga["url"])
finally:
db.close()
def _parse_schedule() -> Optional[str]:
"""
Читает расписание из переменных окружения.
Приоритет: UPDATE_SCHEDULE (cron-строка) → UPDATE_INTERVAL_HOURS (число часов, legacy).
Возвращает cron-строку или None если планировщик отключён.
"""
schedule = os.getenv("UPDATE_SCHEDULE", "").strip()
if schedule:
# Валидируем cron-выражение
if croniter.is_valid(schedule):
return schedule
logger.error("UPDATE_SCHEDULE='{}' — невалидное cron-выражение, планировщик отключён", schedule)
return None
# Обратная совместимость: UPDATE_INTERVAL_HOURS → конвертируем в cron
hours_raw = os.getenv("UPDATE_INTERVAL_HOURS", "").strip()
if not hours_raw:
return None
try:
hours = float(hours_raw)
if hours <= 0:
return None
# Конвертируем в cron: каждые N часов (если целое и делит 24) или фиксированное время
h = int(hours)
if h == hours and 24 % h == 0:
cron = f"0 */{h} * * *"
else:
# Нецелое или не делит 24 — берём ближайшее целое число часов
h = max(1, round(hours))
cron = f"0 */{h} * * *" if 24 % h == 0 else f"0 0/{h} * * *"
logger.info("UPDATE_INTERVAL_HOURS={} → cron: '{}'", hours_raw, cron)
return cron
except ValueError:
logger.error("UPDATE_INTERVAL_HOURS='{}' — не число, планировщик отключён", hours_raw)
return None
async def update_scheduler():
"""
Планировщик авто-обновлений на основе cron-расписания.
При любой ошибке — 3 попытки с интервалом 5 мин, затем ждёт следующего слота.
Цикл никогда не прерывается.
"""
cron_expr = _parse_schedule()
if not cron_expr:
logger.info("Планировщик обновлений отключён (UPDATE_SCHEDULE и UPDATE_INTERVAL_HOURS не заданы)")
return
logger.info("Планировщик обновлений запущен: '{}'", cron_expr)
# Первый запуск — через 5 минут после старта (не сразу, чтобы не мешать инициализации)
await asyncio.sleep(300)
while True:
# Вычисляем время до следующего запуска
now_utc = datetime.now(timezone.utc)
now_naive = now_utc.replace(tzinfo=None) # croniter работает с naive datetime
cron = croniter(cron_expr, now_naive)
next_run: datetime = cron.get_next(datetime)
wait_sec = max(0.0, (next_run - now_naive).total_seconds())
logger.info("Следующая проверка обновлений: {} UTC (через {:.0f} мин)",
next_run.strftime("%Y-%m-%d %H:%M"), wait_sec / 60)
await asyncio.sleep(wait_sec)
# Запускаем с retry-логикой
await _run_auto_updates_with_retry()
async def _run_auto_updates_with_retry():
"""Запускает _run_auto_updates с тремя попытками при ошибке."""
max_attempts = 3
retry_delay = 300 # 5 минут между попытками
for attempt in range(1, max_attempts + 1):
try:
await _run_auto_updates()
return # успех
except asyncio.CancelledError:
raise # не перехватываем отмену
except Exception as e:
if attempt < max_attempts:
logger.warning(
"Авто-обновление: попытка {}/{} завершилась ошибкой: {}. "
"Повтор через {} сек.", attempt, max_attempts, e, retry_delay
)
await asyncio.sleep(retry_delay)
else:
logger.error(
"Авто-обновление: все {} попытки исчерпаны. "
"Последняя ошибка: {}. Ждём следующего слота по расписанию.",
max_attempts, e
)
async def _run_auto_updates():
"""Проверяет все манги с auto_update=1 на наличие новых глав."""
db = StateDB()
try:
candidates = db.get_autos()
finally:
db.close()
if not candidates:
return
logger.info("Авто-обновление: проверяем {} манг", len(candidates))
for manga in candidates:
url = manga["url"]
fmt = manga.get("format", "cbz")
try:
new_chapters = await check_for_updates(url, on_event=ws_manager.broadcast)
if new_chapters:
logger.info("Новых глав для {}: {}", url, len(new_chapters))
db2 = StateDB()
try:
status = db2.get_manga(url)
if status and status["status"] not in ("downloading", "queued"):
db2.update_manga_status(url, "queued")
finally:
db2.close()
await download_queue.put({"url": url, "fmt": fmt, "is_update": True})
await ws_manager.broadcast({
"type": "manga_queued",
"url": url,
"format": fmt,
"reason": "auto_update",
})
except Exception as e:
logger.error("Ошибка авто-обновления {}: {}", url, e)
# ── Вспомогательные функции ───────────────────
def _safe_name(s: str) -> str:
return re.sub(r'[^\w\s\-]', '', s).strip().replace(" ", "_")[:80]
def _manga_folder(m: dict) -> Path:
"""Возвращает папку манги с учётом кастомного имени."""
if m.get("folder_name"):
return OUTPUT_DIR / m["folder_name"]
title = m.get("title") or ""
safe_title = _safe_name(title)
return OUTPUT_DIR / safe_title
def _dir_size(path: Path) -> int:
"""Размер директории в байтах."""
if not path.exists():
return 0
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
def _format_size(bytes_val: int) -> str:
for unit in ("Б", "КБ", "МБ", "ГБ"):
if bytes_val < 1024:
return f"{bytes_val:.1f} {unit}"
bytes_val /= 1024
return f"{bytes_val:.1f} ТБ"
def _enrich_manga(m: dict, db: StateDB) -> dict:
"""Обогащает строку манги реальными счётчиками из таблицы chapters."""
size_bytes = _dir_size(_manga_folder(m))
ch_done_count = db.conn.execute(
"SELECT COUNT(*) FROM chapters WHERE manga_url=? AND status='done'",
(m["url"],)
).fetchone()[0]
ch_failed = db.conn.execute(
"SELECT COUNT(*) FROM chapters WHERE manga_url=? AND status='failed'",
(m["url"],)
).fetchone()[0]
ch_partial = db.conn.execute(
"""SELECT COUNT(*) FROM chapters
WHERE manga_url=? AND status='done'
AND pages_total > 0 AND pages_done < pages_total""",
(m["url"],)
).fetchone()[0]
# Источник
source_info = None
if m.get("source_id"):
src = db.get_source_by_id(m["source_id"])
if src:
source_info = {"id": src["id"], "slug": src["slug"], "display_name": src["display_name"]}
else:
source_info = {"id": m["source_id"], "slug": "unknown", "display_name": "Источник недоступен"}
return {
**m,
"chapters_done": ch_done_count,
"size_bytes": size_bytes,
"size_human": _format_size(size_bytes),
"queue_position": None,
"is_active": m["url"] in active_tasks,
"errors_count": ch_failed + ch_partial,
"started_at": m.get("started_at"),
"finished_at": m.get("finished_at"),
"source": source_info,
}
def _manga_detail(manga: dict, db: StateDB) -> dict:
url = manga["url"]
chapters = db.get_all_chapters(url)
# Определяем директорию манги
manga_dir = _manga_folder(manga)
size_bytes = _dir_size(manga_dir)
# Файлы
files = []
if manga_dir.exists():
for f in sorted(manga_dir.iterdir()):
if f.is_file():
files.append({
"name": f.name,
"size": f.stat().st_size,
"size_human": _format_size(f.stat().st_size),
})
# ── Статистика ───────────────────────────
ch_done = [c for c in chapters if c["status"] == "done"]
ch_failed = [c for c in chapters if c["status"] == "failed"]
ch_pending = [c for c in chapters if c["status"] == "pending"]
total_pages_downloaded = sum(c.get("pages_done", 0) for c in chapters)
total_pages_expected = sum(c.get("pages_total", 0) for c in chapters if c.get("pages_total", 0) > 0)
# Частично скачанные (done, но pages_done < pages_total)
ch_partial = [
c for c in ch_done
if c.get("pages_total", 0) > 0 and c.get("pages_done", 0) < c.get("pages_total", 0)
]
# Сколько страниц потеряно в частичных
pages_missing = sum(
c.get("pages_total", 0) - c.get("pages_done", 0)
for c in ch_partial
)
errors = []
for c in ch_failed:
errors.append({**c, "error_type": "failed", "error_label": "Глава не загружена"})
for c in ch_partial:
missing = c.get("pages_total", 0) - c.get("pages_done", 0)
errors.append({**c, "error_type": "partial",
"error_label": f"Частичная загрузка: пропущено {missing} стр."})
# Сортируем: сначала failed, потом partial, внутри — по номеру
errors.sort(key=lambda c: (0 if c["error_type"] == "failed" else 1, c.get("number", 0)))
stats = {
"chapters_done": len(ch_done),
"chapters_failed": len(ch_failed),
"chapters_pending": len(ch_pending),
"chapters_partial": len(ch_partial),
"total_pages_downloaded": total_pages_downloaded,
"total_pages_expected": total_pages_expected,
"pages_missing": pages_missing,
"errors_count": len(errors),
}
return {
**manga,
"chapters": chapters,
"files": files,
"size_bytes": size_bytes,
"size_human": _format_size(size_bytes),
"files_count": len(files),
"stats": stats,
"errors": errors,
}
# ── REST API ──────────────────────────────────
class AddMangaRequest(BaseModel):
urls: List[str]
format: str = "cbz"
source_id: Optional[int] = None # явный выбор источника (для неизвестных доменов)
# ── Auth API ─────────────────────────────────
class LoginRequest(BaseModel):
login: str
password: str
@app.get("/api/auth/check")
async def auth_check(request: Request):
"""Проверить, авторизован ли пользователь."""
if not AUTH_ENABLED:
return {"authenticated": True, "auth_enabled": False}
ok = request.cookies.get(COOKIE_NAME) == _VALID_TOKEN
return {"authenticated": ok, "auth_enabled": True}
@app.post("/api/login")
async def login(body: LoginRequest, response: Response):
if not AUTH_ENABLED:
return {"ok": True}
if body.login != AUTH_LOGIN or body.password != AUTH_PASSWORD:
raise HTTPException(status_code=401, detail="Неверный логин или пароль")
response.set_cookie(
key=COOKIE_NAME,
value=_VALID_TOKEN,
max_age=COOKIE_MAX_AGE,
httponly=True,
samesite="lax",
secure=False, # включите True если HTTPS
)
return {"ok": True}
@app.post("/api/logout")
async def logout(response: Response):
response.delete_cookie(COOKIE_NAME)
return {"ok": True}
# ── REST API ──────────────────────────────────
@app.get("/api/mangas")
async def list_mangas():
db = StateDB()
try:
mangas = db.get_all_mangas()
result = [_enrich_manga(m, db) for m in mangas]
# Добавляем позицию в очереди
queue_list = list(download_queue._queue) # type: ignore
for i, job in enumerate(queue_list):
for r in result:
if r["url"] == job["url"]:
r["queue_position"] = i + 1
return result
finally:
db.close()
@app.get("/api/mangas/detail")
async def manga_detail(url: str):
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
return _manga_detail(manga, db)
finally:
db.close()
@app.post("/api/queue")
async def add_to_queue(body: AddMangaRequest):
db = StateDB()
added = []
skipped = []
try:
for url in body.urls:
url = url.strip()
if not url:
continue
# Определяем source_id: явный из запроса или авто по домену
source_id = body.source_id
if source_id is None:
domain = extract_domain(url)
source_row = db.get_source_by_domain(domain)
if source_row:
source_id = source_row["id"]
# Если источник указан явно — привязываем домен к нему
if body.source_id is not None:
domain = extract_domain(url)
existing = db.get_source_by_domain(domain)
if existing and existing["id"] != body.source_id:
db.remove_domain(existing["id"], domain)
db.add_domain(body.source_id, domain)
is_new = db.add_manga(url, body.format, source_id=source_id)
if is_new:
await download_queue.put({"url": url, "fmt": body.format})
added.append(url)
await ws_manager.broadcast({
"type": "manga_queued",
"url": url,
"format": body.format,
"source_id": source_id,
})
await _broadcast_queue_positions()
asyncio.create_task(_fetch_preview(url))
else:
skipped.append(url)
finally:
db.close()
return {"added": added, "skipped": skipped}
async def _fetch_preview(url: str):
"""Быстро получает название и количество глав сразу после добавления."""
try:
from .browser import BrowserManager
db = StateDB()
try:
source = get_source_for_url(url, db)
if source is None:
manga_row = db.get_manga(url)
if manga_row and manga_row.get("source_id"):
source = registry.get_by_db_id(manga_row["source_id"], db)
finally:
db.close()
if source is None:
return
async with BrowserManager(headless=True) as bm:
_, page = await bm.new_page()
manga = await source.get_manga_info(page, url)
if not manga:
return
db2 = StateDB()
try:
db2.update_manga_info(
url,
title=manga.title_ru or manga.title,
chapters_total=len(manga.chapters),
title_ru=manga.title_ru,
title_full=manga.title_full,
pub_status=manga.pub_status,
)
finally:
db2.close()
await ws_manager.broadcast({
"type": "manga_preview",
"url": url,
"title": manga.title_ru or manga.title,
"title_ru": manga.title_ru,
"title_full": manga.title_full,
"pub_status": manga.pub_status,
"chapters_total": len(manga.chapters),
})
logger.info("Предпросмотр готов: {} ({} глав)", manga.title_ru or manga.title, len(manga.chapters))
except Exception as e:
logger.warning("Ошибка предпросмотра {}: {}", url, e)
@app.post("/api/mangas/auto_update")
async def toggle_auto_update(url: str, enabled: bool):
"""Включить/выключить авто-обновление для манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
db.set_auto_update(url, enabled)
await ws_manager.broadcast({
"type": "auto_update_changed",
"url": url,
"auto_update": enabled,
})
return {"ok": True, "auto_update": enabled}
finally:
db.close()
@app.post("/api/mangas/check_now")
async def check_now(url: str):
"""Немедленно проверить новые главы для конкретной манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
finally:
db.close()
asyncio.create_task(_check_and_queue(url))
return {"ok": True}
async def _check_and_queue(url: str):
db = StateDB()
try:
manga = db.get_manga(url)
fmt = manga["format"] if manga else "cbz"
finally:
db.close()
new = await check_for_updates(url, on_event=ws_manager.broadcast)
if new:
db2 = StateDB()
try:
db2.update_manga_status(url, "queued")
finally:
db2.close()
await download_queue.put({"url": url, "fmt": fmt, "is_update": True})
@app.get("/api/news")
async def get_news(limit: int = 100):
"""Только скачанные и автодокаченные главы — для вкладки Новости."""
db = StateDB()
try:
cur = db.conn.execute("""
SELECT h.*, m.title as manga_title, m.title_ru
FROM history h LEFT JOIN mangas m ON h.manga_url = m.url
WHERE h.event_type IN ('downloaded', 'auto_downloaded')
ORDER BY h.created_at DESC LIMIT ?
""", (limit,))
return [dict(r) for r in cur.fetchall()]
finally:
db.close()
@app.get("/api/history")
async def get_history(limit: int = 100, manga_url: str = ""):
db = StateDB()
try:
return db.get_history(limit=limit, manga_url=manga_url)
finally:
db.close()
@app.post("/api/mangas/prioritize")
async def prioritize_manga(url: str):
"""Поместить мангу в начало очереди, прервав текущую загрузку и вернув её следом."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
return {"ok": True, "message": "Уже загружается"}
fmt = manga["format"] or "cbz"
# 1. Убираем target из очереди если там уже есть
items = list(download_queue._queue) # type: ignore
items = [i for i in items if i["url"] != url]
download_queue._queue.clear() # type: ignore
for item in items:
download_queue._queue.append(item) # type: ignore
# 2. Текущая активная загрузка
current_url = next(iter(active_tasks), None)
if current_url and current_url != url:
cur_manga = db.get_manga(current_url)
cur_fmt = cur_manga["format"] if cur_manga else "cbz"
# Помечаем как queued — воркер увидит это и не поставит stopped
db.update_manga_status(current_url, "queued")
# Вставляем обратно на второе место (сразу после target)
download_queue._queue.appendleft({"url": current_url, "fmt": cur_fmt}) # type: ignore
# Отменяем задачу — воркер сразу перейдёт к следующему элементу (target)
task = active_tasks.get(current_url)
if task and not task.done():
task.cancel()
# 3. Вставляем target в самое начало
download_queue._queue.appendleft({"url": url, "fmt": fmt}) # type: ignore
db.update_manga_status(url, "queued")
logger.info("Приоритет: {} → начало очереди (вытеснен: {})", url, current_url)
await ws_manager.broadcast({
"type": "manga_prioritized",
"url": url,
"preempted_url": current_url,
})
await _broadcast_queue_positions()
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/retry_errors")
async def retry_errors(url: str):
"""Сбросить статус failed/partial глав на pending для повторной загрузки."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
# Сбрасываем failed
db.conn.execute(
"UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=? WHERE manga_url=? AND status='failed'",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
# Сбрасываем partial (done, но страниц скачано меньше)
db.conn.execute(
"""UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=?
WHERE manga_url=? AND status='done' AND pages_total > 0 AND pages_done < pages_total""",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
db.conn.commit()
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/refresh_meta")
async def refresh_meta(url: str):
"""Обновить метаданные (ComicInfo.xml / EPUB OPF / PDF XMP) во всех уже скачанных файлах."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Манга сейчас загружается")
finally:
db.close()
asyncio.create_task(_do_refresh_meta(url))
return {"ok": True}
async def _do_refresh_meta(url: str):
"""Фоновая задача: обходит все скачанные файлы и обновляет метаданные."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
return
chapters = db.get_all_chapters(url)
chapters_total = len(chapters)
pub_status = manga.get("pub_status", "unknown") or "unknown"
updated = failed = 0
for ch in chapters:
for fmt_col, ext in (("output_cbz", ".cbz"), ("output_pdf", ".pdf"), ("output_epub", ".epub")):
fpath = ch.get(fmt_col)
if not fpath:
continue
p = Path(fpath)
if not p.exists():
continue
meta = MangaMeta(
series=manga.get("title_ru") or manga.get("title") or "",
series_full=manga.get("title_full") or "",
chapter_title=ch.get("title") or "",
number=float(ch.get("number") or 0),
volume=int(ch.get("volume") or 0),
chapters_total=chapters_total,
pub_status=pub_status,
source_url=url,
)
if patch_meta(p, meta):
updated += 1
else:
failed += 1
logger.info("refresh_meta {}: обновлено {}, ошибок {}", url, updated, failed)
await ws_manager.broadcast({
"type": "meta_refreshed",
"url": url,
"updated": updated,
"failed": failed,
})
except Exception as e:
logger.error("_do_refresh_meta {}: {}", url, e)
finally:
db.close()
class UpdateMetaRequest(BaseModel):
url: str
title_ru: str
title_full: str = ""
@app.post("/api/mangas/update_meta")
async def update_meta(body: UpdateMetaRequest):
"""Обновить метаданные манги (название серии) и применить к файлам."""
db = StateDB()
try:
manga = db.get_manga(body.url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
db.update_manga_meta_fields(
body.url,
title_ru=body.title_ru or None,
title_full=body.title_full or None,
)
finally:
db.close()
# Обновляем метаданные в файлах фоново
asyncio.create_task(_do_refresh_meta(body.url))
await ws_manager.broadcast({
"type": "manga_meta_updated",
"url": body.url,
"title": body.title_ru,
"title_ru": body.title_ru,
"title_full": body.title_full,
})
return {"ok": True}
class RenameFolderRequest(BaseModel):
url: str
folder_name: str
@app.post("/api/mangas/rename_folder")
async def rename_folder(body: RenameFolderRequest):
"""Переименовать папку манги и обновить пути в БД."""
new_folder = _safe_name(body.folder_name)
if not new_folder:
raise HTTPException(status_code=400, detail="Некорректное имя папки")
db = StateDB()
try:
manga = db.get_manga(body.url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and body.url in active_tasks:
raise HTTPException(status_code=400, detail="Нельзя переименовать — манга загружается")
old_dir = _manga_folder(manga)
new_dir = OUTPUT_DIR / new_folder
if old_dir != new_dir:
if new_dir.exists():
raise HTTPException(status_code=400, detail=f"Папка '{new_folder}' уже существует")
if old_dir.exists():
import shutil
shutil.move(str(old_dir), str(new_dir))
logger.info("Папка переименована: {}{}", old_dir, new_dir)
# Обновляем пути в таблице chapters
chapters = db.get_all_chapters(body.url)
for ch in chapters:
updates = {}
for col in ("output_cbz", "output_pdf", "output_epub"):
p = ch.get(col)
if p and str(old_dir) in p:
updates[col] = p.replace(str(old_dir), str(new_dir))
if updates:
sets = ", ".join(f"{k}=?" for k in updates)
db.conn.execute(
f"UPDATE chapters SET {sets} WHERE chapter_url=?",
[*updates.values(), ch["chapter_url"]]
)
db.conn.commit()
db.set_folder_name(body.url, new_folder)
await ws_manager.broadcast({
"type": "manga_folder_renamed",
"url": body.url,
"folder_name": new_folder,
})
return {"ok": True, "folder_name": new_folder}
finally:
db.close()
@app.post("/api/mangas/force_redownload")
async def force_redownload(url: str):
"""Сбросить все главы на pending и поставить мангу заново в очередь."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Сначала остановите загрузку")
# Сбрасываем все главы на pending
db.conn.execute(
"UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=? WHERE manga_url=?",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
db.conn.commit()
# Ставим в очередь с resume=False — перекачает всё заново
db.update_manga_status(url, "queued")
await download_queue.put({"url": url, "fmt": manga["format"], "resume": False})
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": manga["format"]})
await _broadcast_queue_positions()
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/stop")
async def stop_manga(url: str):
"""Остановить текущую загрузку манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
# Отменяем активную задачу если есть
task = active_tasks.get(url)
if task and not task.done():
task.cancel()
# Сразу обновляем статус и уведомляем клиентов — не ждём пока воркер
# обработает CancelledError (это может занять секунды пока браузер завершит операцию)
db.update_manga_status(url, "stopped")
await ws_manager.broadcast({"type": "manga_stopped", "url": url})
await _broadcast_queue_positions()
else:
# Манга в очереди (ещё не начата) — просто помечаем как stopped
db.update_manga_status(url, "stopped")
await ws_manager.broadcast({"type": "manga_stopped", "url": url})
await _broadcast_queue_positions()
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/resume")
async def resume_manga(url: str):
"""Возобновить загрузку остановленной/упавшей манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Манга уже загружается")
db.update_manga_status(url, "queued")
await download_queue.put({"url": url, "fmt": manga["format"]})
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": manga["format"]})
await _broadcast_queue_positions()
return {"ok": True}
finally:
db.close()
@app.delete("/api/mangas")
async def delete_manga(url: str, delete_files: bool = False):
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Нельзя удалить активную загрузку")
deleted_size = 0
if delete_files:
manga_dir = _manga_folder(manga)
if manga_dir.exists() and manga_dir.is_dir():
deleted_size = _dir_size(manga_dir)
import shutil
shutil.rmtree(str(manga_dir))
logger.info("Удалена папка: {} ({} байт)", manga_dir, deleted_size)
db.conn.execute("DELETE FROM chapters WHERE manga_url=?", (url,))
db.conn.execute("DELETE FROM history WHERE manga_url=?", (url,))
db.conn.execute("DELETE FROM mangas WHERE url=?", (url,))
db.conn.commit()
return {"ok": True, "deleted_size": deleted_size}
finally:
db.close()
# ── Sources API ───────────────────────────────
class DomainAdd(BaseModel):
domain: str
class SwitchSourceRequest(BaseModel):
url: str
source_id: int
@app.get("/api/sources")
async def list_sources():
"""Список всех источников с доменами."""
db = StateDB()
try:
return db.get_all_sources()
finally:
db.close()
@app.get("/api/resolve-source")
async def resolve_source(url: str):
"""Определить источник по URL. Возвращает {id, slug, display_name} или null."""
db = StateDB()
try:
domain = extract_domain(url)
row = db.get_source_by_domain(domain)
if not row:
return {"source": None, "domain": domain}
return {
"source": {
"id": row["id"],
"slug": row["slug"],
"display_name": row["display_name"],
},
"domain": domain,
}
finally:
db.close()
@app.post("/api/sources/{source_id}/domains")
async def add_domain(source_id: int, body: DomainAdd):
"""Добавить домен к источнику."""
db = StateDB()
try:
source = db.get_source_by_id(source_id)
if not source:
raise HTTPException(status_code=404, detail="Источник не найден")
domain = body.domain.lower().strip()
if not domain:
raise HTTPException(status_code=400, detail="Домен не может быть пустым")
# Проверяем не занят ли домен другим источником
existing = db.get_source_by_domain(domain)
if existing and existing["id"] != source_id:
raise HTTPException(
status_code=409,
detail=f"Домен уже привязан к источнику «{existing['display_name']}»"
)
ok = db.add_domain(source_id, domain)
if not ok:
raise HTTPException(status_code=409, detail="Домен уже существует")
await ws_manager.broadcast({
"type": "source_domain_added",
"source_id": source_id,
"domain": domain,
})
return {"ok": True, "domain": domain}
finally:
db.close()
@app.delete("/api/sources/{source_id}/domains/{domain:path}")
async def remove_domain(source_id: int, domain: str):
"""Удалить домен у источника."""
db = StateDB()
try:
source = db.get_source_by_id(source_id)
if not source:
raise HTTPException(status_code=404, detail="Источник не найден")
ok = db.remove_domain(source_id, domain)
if not ok:
raise HTTPException(status_code=404, detail="Домен не найден")
await ws_manager.broadcast({
"type": "source_domain_removed",
"source_id": source_id,
"domain": domain,
})
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/switch-source")
async def switch_manga_source(body: SwitchSourceRequest):
"""Сменить источник у манги + перепривязать домен."""
db = StateDB()
try:
manga = db.get_manga(body.url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and body.url in active_tasks:
raise HTTPException(status_code=400, detail="Нельзя сменить источник во время загрузки")
new_source = db.get_source_by_id(body.source_id)
if not new_source:
raise HTTPException(status_code=404, detail="Источник не найден")
old_source_id = manga.get("source_id")
domain = extract_domain(body.url)
# Перепривязываем домен
if domain:
existing_domain = db.get_source_by_domain(domain)
if existing_domain and existing_domain["id"] != body.source_id:
db.remove_domain(existing_domain["id"], domain)
db.add_domain(body.source_id, domain)
# Меняем источник у манги
db.set_manga_source(body.url, body.source_id)
# Сбрасываем failed/partial главы → pending
reset_count = db.reset_failed_chapters(body.url)
await ws_manager.broadcast({
"type": "source_switched",
"url": body.url,
"old_source_id": old_source_id,
"new_source_id": body.source_id,
"new_source_name": new_source["display_name"],
"domain_rebound": bool(domain),
"chapters_reset": reset_count,
})
return {
"ok": True,
"source_id": body.source_id,
"source_name": new_source["display_name"],
"chapters_reset": reset_count,
}
finally:
db.close()
@app.get("/api/stats")
async def global_stats():
db = StateDB()
try:
mangas = db.get_all_mangas()
total_size = _dir_size(OUTPUT_DIR)
return {
"mangas_total": len(mangas),
"mangas_done": sum(1 for m in mangas if m["status"] == "done"),
"mangas_downloading": sum(1 for m in mangas if m["status"] == "downloading"),
"mangas_queued": sum(1 for m in mangas if m["status"] == "queued"),
"mangas_failed": sum(1 for m in mangas if m["status"] == "failed"),
"mangas_stopped": sum(1 for m in mangas if m["status"] == "stopped"),
"queue_size": download_queue.qsize(),
"total_size_bytes": total_size,
"total_size_human": _format_size(total_size),
}
finally:
db.close()
# ── WebSocket ─────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
# Проверяем авторизацию по cookie
if AUTH_ENABLED and ws.cookies.get(COOKIE_NAME) != _VALID_TOKEN:
await ws.close(code=4401)
return
await ws_manager.connect(ws)
try:
# Отправляем начальный снимок состояния
db = StateDB()
try:
mangas = db.get_all_mangas()
enriched = [_enrich_manga(m, db) for m in mangas]
# Добавляем позицию в очереди
queue_list = list(download_queue._queue) # type: ignore
for i, job in enumerate(queue_list):
for em in enriched:
if em["url"] == job["url"]:
em["queue_position"] = i + 1
await ws.send_json({"type": "snapshot", "mangas": enriched})
finally:
db.close()
while True:
# Держим соединение живым, ждём пинги
data = await ws.receive_text()
if data == "ping":
await ws.send_json({"type": "pong"})
except WebSocketDisconnect:
ws_manager.disconnect(ws)
except Exception:
ws_manager.disconnect(ws)
# ── Статические файлы (фронтенд) ──────────────
if FRONTEND_DIR.exists():
app.mount("/", StaticFiles(directory=str(FRONTEND_DIR), html=True), name="frontend")