This commit is contained in:
2026-04-29 02:07:21 +03:00
parent ba6bfc5ed3
commit 0aa057c991
14 changed files with 4257 additions and 139 deletions

779
src/api.py Normal file
View File

@@ -0,0 +1,779 @@
"""
FastAPI веб-сервер: REST API + WebSocket для мониторинга загрузок манги.
"""
import asyncio
import os
import re
from pathlib import Path
from typing import List, Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
from loguru import logger
from .state import StateDB
from .worker import download_manga, check_for_updates
from .exporter import patch_meta, MangaMeta
OUTPUT_DIR = Path("/app/output")
FRONTEND_DIR = Path("/app/frontend")
app = FastAPI(title="Manga Downloader API")
# ── WebSocket менеджер ────────────────────────
class ConnectionManager:
def __init__(self):
self.active: set[WebSocket] = set()
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.add(ws)
def disconnect(self, ws: WebSocket):
self.active.discard(ws)
async def broadcast(self, data: dict):
dead = set()
for ws in list(self.active):
try:
await ws.send_json(data)
except Exception:
dead.add(ws)
self.active -= dead
ws_manager = ConnectionManager()
# ── Очередь загрузки ─────────────────────────
download_queue: asyncio.Queue = asyncio.Queue()
# url → asyncio.Task текущей загрузки
active_tasks: dict[str, asyncio.Task] = {}
async def queue_worker():
"""Последовательно обрабатывает очередь загрузок. Перезапускается при краше."""
while True:
try:
await _queue_worker_loop()
except Exception as e:
logger.error("queue_worker упал, перезапускаю через 5 сек: {}", e)
await asyncio.sleep(5)
async def _queue_worker_loop():
while True:
job = await download_queue.get()
url = job["url"]
fmt = job.get("fmt", "cbz")
# Проверяем, не была ли манга остановлена пока стояла в очереди
skip = False
db = StateDB()
try:
m = db.get_manga(url)
if m and m["status"] == "stopped":
logger.info("Воркер: пропускаю остановленную {}", url)
skip = True
finally:
db.close()
if skip:
download_queue.task_done()
continue
logger.info("Воркер: начинаю скачивать {}", url)
dl_task = asyncio.create_task(download_manga(
url=url,
fmt=fmt,
is_update=job.get("is_update", False),
resume=job.get("resume", True),
on_event=ws_manager.broadcast,
))
active_tasks[url] = dl_task
try:
await dl_task
except asyncio.CancelledError:
logger.info("Воркер: загрузка прервана: {}", url)
_db = StateDB()
try:
current_status = _db.get_manga(url)
# Если статус уже "queued" — значит нас приоритизировали и поставили обратно
# в очередь; не перетираем на "stopped"
if current_status and current_status["status"] != "queued":
_db.update_manga_status(url, "stopped")
await ws_manager.broadcast({"type": "manga_stopped", "url": url})
else:
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": fmt})
finally:
_db.close()
except Exception as e:
logger.error("Воркер ошибка {}: {}", url, e)
finally:
active_tasks.pop(url, None)
download_queue.task_done()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(queue_worker())
asyncio.create_task(update_scheduler())
# Восстанавливаем очередь из БД (незавершённые задачи)
db = StateDB()
try:
for manga in db.get_all_mangas():
if manga["status"] in ("queued", "downloading"):
db.update_manga_status(manga["url"], "queued")
await download_queue.put({"url": manga["url"], "fmt": manga["format"]})
logger.info("Восстановлено из очереди: {}", manga["url"])
finally:
db.close()
async def update_scheduler():
"""Периодически проверяет новые главы для манг с auto_update=1."""
interval_hours = float(os.getenv("UPDATE_INTERVAL_HOURS", "6"))
interval_sec = interval_hours * 3600
logger.info("Планировщик обновлений: каждые {} ч", interval_hours)
# Первый запуск — через 5 минут после старта
await asyncio.sleep(300)
while True:
await _run_auto_updates()
await asyncio.sleep(interval_sec)
async def _run_auto_updates():
"""Проверяет все манги с auto_update=1 на наличие новых глав."""
db = StateDB()
try:
candidates = db.get_autos()
finally:
db.close()
if not candidates:
return
logger.info("Авто-обновление: проверяем {} манг", len(candidates))
for manga in candidates:
url = manga["url"]
fmt = manga.get("format", "cbz")
try:
new_chapters = await check_for_updates(url, on_event=ws_manager.broadcast)
if new_chapters:
logger.info("Новых глав для {}: {}", url, len(new_chapters))
# Добавляем в очередь с флагом is_update
db2 = StateDB()
try:
status = db2.get_manga(url)
if status and status["status"] not in ("downloading", "queued"):
db2.update_manga_status(url, "queued")
finally:
db2.close()
await download_queue.put({"url": url, "fmt": fmt, "is_update": True})
await ws_manager.broadcast({
"type": "manga_queued",
"url": url,
"format": fmt,
"reason": "auto_update",
})
except Exception as e:
logger.error("Ошибка авто-обновления {}: {}", url, e)
# ── Вспомогательные функции ───────────────────
def _dir_size(path: Path) -> int:
"""Размер директории в байтах."""
if not path.exists():
return 0
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file())
def _format_size(bytes_val: int) -> str:
for unit in ("Б", "КБ", "МБ", "ГБ"):
if bytes_val < 1024:
return f"{bytes_val:.1f} {unit}"
bytes_val /= 1024
return f"{bytes_val:.1f} ТБ"
def _enrich_manga(m: dict, db: StateDB) -> dict:
"""Обогащает строку манги реальными счётчиками из таблицы chapters."""
title = m.get("title") or ""
safe_title = re.sub(r'[^\w\s\-]', '', title).strip().replace(" ", "_")[:80]
size_bytes = _dir_size(OUTPUT_DIR / safe_title)
ch_done_count = db.conn.execute(
"SELECT COUNT(*) FROM chapters WHERE manga_url=? AND status='done'",
(m["url"],)
).fetchone()[0]
ch_failed = db.conn.execute(
"SELECT COUNT(*) FROM chapters WHERE manga_url=? AND status='failed'",
(m["url"],)
).fetchone()[0]
ch_partial = db.conn.execute(
"""SELECT COUNT(*) FROM chapters
WHERE manga_url=? AND status='done'
AND pages_total > 0 AND pages_done < pages_total""",
(m["url"],)
).fetchone()[0]
return {
**m,
"chapters_done": ch_done_count,
"size_bytes": size_bytes,
"size_human": _format_size(size_bytes),
"queue_position": None,
"is_active": m["url"] in active_tasks,
"errors_count": ch_failed + ch_partial,
"started_at": m.get("started_at"),
"finished_at": m.get("finished_at"),
}
def _manga_detail(manga: dict, db: StateDB) -> dict:
url = manga["url"]
chapters = db.get_all_chapters(url)
# Определяем директорию манги
title = manga.get("title") or ""
safe_title = re.sub(r'[^\w\s\-]', '', title).strip().replace(" ", "_")[:80]
manga_dir = OUTPUT_DIR / safe_title
size_bytes = _dir_size(manga_dir)
# Файлы
files = []
if manga_dir.exists():
for f in sorted(manga_dir.iterdir()):
if f.is_file():
files.append({
"name": f.name,
"size": f.stat().st_size,
"size_human": _format_size(f.stat().st_size),
})
# ── Статистика ───────────────────────────
ch_done = [c for c in chapters if c["status"] == "done"]
ch_failed = [c for c in chapters if c["status"] == "failed"]
ch_pending = [c for c in chapters if c["status"] == "pending"]
total_pages_downloaded = sum(c.get("pages_done", 0) for c in chapters)
total_pages_expected = sum(c.get("pages_total", 0) for c in chapters if c.get("pages_total", 0) > 0)
# Частично скачанные (done, но pages_done < pages_total)
ch_partial = [
c for c in ch_done
if c.get("pages_total", 0) > 0 and c.get("pages_done", 0) < c.get("pages_total", 0)
]
# Сколько страниц потеряно в частичных
pages_missing = sum(
c.get("pages_total", 0) - c.get("pages_done", 0)
for c in ch_partial
)
errors = []
for c in ch_failed:
errors.append({**c, "error_type": "failed", "error_label": "Глава не загружена"})
for c in ch_partial:
missing = c.get("pages_total", 0) - c.get("pages_done", 0)
errors.append({**c, "error_type": "partial",
"error_label": f"Частичная загрузка: пропущено {missing} стр."})
# Сортируем: сначала failed, потом partial, внутри — по номеру
errors.sort(key=lambda c: (0 if c["error_type"] == "failed" else 1, c.get("number", 0)))
stats = {
"chapters_done": len(ch_done),
"chapters_failed": len(ch_failed),
"chapters_pending": len(ch_pending),
"chapters_partial": len(ch_partial),
"total_pages_downloaded": total_pages_downloaded,
"total_pages_expected": total_pages_expected,
"pages_missing": pages_missing,
"errors_count": len(errors),
}
return {
**manga,
"chapters": chapters,
"files": files,
"size_bytes": size_bytes,
"size_human": _format_size(size_bytes),
"files_count": len(files),
"stats": stats,
"errors": errors,
}
# ── REST API ──────────────────────────────────
class AddMangaRequest(BaseModel):
urls: List[str]
format: str = "cbz"
@app.get("/api/mangas")
async def list_mangas():
db = StateDB()
try:
mangas = db.get_all_mangas()
result = [_enrich_manga(m, db) for m in mangas]
# Добавляем позицию в очереди
queue_list = list(download_queue._queue) # type: ignore
for i, job in enumerate(queue_list):
for r in result:
if r["url"] == job["url"]:
r["queue_position"] = i + 1
return result
finally:
db.close()
@app.get("/api/mangas/detail")
async def manga_detail(url: str):
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
return _manga_detail(manga, db)
finally:
db.close()
@app.post("/api/queue")
async def add_to_queue(body: AddMangaRequest):
db = StateDB()
added = []
skipped = []
try:
for url in body.urls:
url = url.strip()
if not url:
continue
is_new = db.add_manga(url, body.format)
if is_new:
await download_queue.put({"url": url, "fmt": body.format})
added.append(url)
await ws_manager.broadcast({
"type": "manga_queued",
"url": url,
"format": body.format,
})
# Запускаем фоновую задачу предпросмотра (без Chromium — быстро)
asyncio.create_task(_fetch_preview(url))
else:
skipped.append(url)
finally:
db.close()
return {"added": added, "skipped": skipped}
async def _fetch_preview(url: str):
"""Быстро получает название и количество глав сразу после добавления."""
try:
from .browser import BrowserManager
from .scraper import get_manga_info
async with BrowserManager(headless=True) as bm:
_, page = await bm.new_page()
manga = await get_manga_info(page, url)
if not manga:
return
db = StateDB()
try:
db.update_manga_info(
url,
title=manga.title_ru or manga.title,
chapters_total=len(manga.chapters),
title_ru=manga.title_ru,
title_full=manga.title_full,
pub_status=manga.pub_status,
)
finally:
db.close()
await ws_manager.broadcast({
"type": "manga_preview",
"url": url,
"title": manga.title_ru or manga.title,
"title_ru": manga.title_ru,
"title_full": manga.title_full,
"pub_status": manga.pub_status,
"chapters_total": len(manga.chapters),
})
logger.info("Предпросмотр готов: {} ({} глав)", manga.title_ru or manga.title, len(manga.chapters))
except Exception as e:
logger.warning("Ошибка предпросмотра {}: {}", url, e)
@app.post("/api/mangas/auto_update")
async def toggle_auto_update(url: str, enabled: bool):
"""Включить/выключить авто-обновление для манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
db.set_auto_update(url, enabled)
await ws_manager.broadcast({
"type": "auto_update_changed",
"url": url,
"auto_update": enabled,
})
return {"ok": True, "auto_update": enabled}
finally:
db.close()
@app.post("/api/mangas/check_now")
async def check_now(url: str):
"""Немедленно проверить новые главы для конкретной манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
finally:
db.close()
asyncio.create_task(_check_and_queue(url))
return {"ok": True}
async def _check_and_queue(url: str):
db = StateDB()
try:
manga = db.get_manga(url)
fmt = manga["format"] if manga else "cbz"
finally:
db.close()
new = await check_for_updates(url, on_event=ws_manager.broadcast)
if new:
db2 = StateDB()
try:
db2.update_manga_status(url, "queued")
finally:
db2.close()
await download_queue.put({"url": url, "fmt": fmt, "is_update": True})
@app.get("/api/news")
async def get_news(limit: int = 100):
"""Только скачанные и автодокаченные главы — для вкладки Новости."""
db = StateDB()
try:
cur = db.conn.execute("""
SELECT h.*, m.title as manga_title, m.title_ru
FROM history h LEFT JOIN mangas m ON h.manga_url = m.url
WHERE h.event_type IN ('downloaded', 'auto_downloaded')
ORDER BY h.created_at DESC LIMIT ?
""", (limit,))
return [dict(r) for r in cur.fetchall()]
finally:
db.close()
@app.get("/api/history")
async def get_history(limit: int = 100, manga_url: str = ""):
db = StateDB()
try:
return db.get_history(limit=limit, manga_url=manga_url)
finally:
db.close()
@app.post("/api/mangas/prioritize")
async def prioritize_manga(url: str):
"""Поместить мангу в начало очереди, прервав текущую загрузку и вернув её следом."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
return {"ok": True, "message": "Уже загружается"}
fmt = manga["format"] or "cbz"
# 1. Убираем target из очереди если там уже есть
items = list(download_queue._queue) # type: ignore
items = [i for i in items if i["url"] != url]
download_queue._queue.clear() # type: ignore
for item in items:
download_queue._queue.append(item) # type: ignore
# 2. Текущая активная загрузка
current_url = next(iter(active_tasks), None)
if current_url and current_url != url:
cur_manga = db.get_manga(current_url)
cur_fmt = cur_manga["format"] if cur_manga else "cbz"
# Помечаем как queued — воркер увидит это и не поставит stopped
db.update_manga_status(current_url, "queued")
# Вставляем обратно на второе место (сразу после target)
download_queue._queue.appendleft({"url": current_url, "fmt": cur_fmt}) # type: ignore
# Отменяем задачу — воркер сразу перейдёт к следующему элементу (target)
task = active_tasks.get(current_url)
if task and not task.done():
task.cancel()
# 3. Вставляем target в самое начало
download_queue._queue.appendleft({"url": url, "fmt": fmt}) # type: ignore
db.update_manga_status(url, "queued")
logger.info("Приоритет: {} → начало очереди (вытеснен: {})", url, current_url)
await ws_manager.broadcast({
"type": "manga_prioritized",
"url": url,
"preempted_url": current_url,
})
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/retry_errors")
async def retry_errors(url: str):
"""Сбросить статус failed/partial глав на pending для повторной загрузки."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
# Сбрасываем failed
db.conn.execute(
"UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=? WHERE manga_url=? AND status='failed'",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
# Сбрасываем partial (done, но страниц скачано меньше)
db.conn.execute(
"""UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=?
WHERE manga_url=? AND status='done' AND pages_total > 0 AND pages_done < pages_total""",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
db.conn.commit()
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/refresh_meta")
async def refresh_meta(url: str):
"""Обновить метаданные (ComicInfo.xml / EPUB OPF / PDF XMP) во всех уже скачанных файлах."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Манга сейчас загружается")
finally:
db.close()
asyncio.create_task(_do_refresh_meta(url))
return {"ok": True}
async def _do_refresh_meta(url: str):
"""Фоновая задача: обходит все скачанные файлы и обновляет метаданные."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
return
chapters = db.get_all_chapters(url)
chapters_total = len(chapters)
pub_status = manga.get("pub_status", "unknown") or "unknown"
updated = failed = 0
for ch in chapters:
for fmt_col, ext in (("output_cbz", ".cbz"), ("output_pdf", ".pdf"), ("output_epub", ".epub")):
fpath = ch.get(fmt_col)
if not fpath:
continue
p = Path(fpath)
if not p.exists():
continue
meta = MangaMeta(
series=manga.get("title_ru") or manga.get("title") or "",
series_full=manga.get("title_full") or "",
chapter_title=ch.get("title") or "",
number=float(ch.get("number") or 0),
volume=int(ch.get("volume") or 0),
chapters_total=chapters_total,
pub_status=pub_status,
source_url=url,
)
if patch_meta(p, meta):
updated += 1
else:
failed += 1
logger.info("refresh_meta {}: обновлено {}, ошибок {}", url, updated, failed)
await ws_manager.broadcast({
"type": "meta_refreshed",
"url": url,
"updated": updated,
"failed": failed,
})
except Exception as e:
logger.error("_do_refresh_meta {}: {}", url, e)
finally:
db.close()
@app.post("/api/mangas/force_redownload")
async def force_redownload(url: str):
"""Сбросить все главы на pending и поставить мангу заново в очередь."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Сначала остановите загрузку")
# Сбрасываем все главы на pending
db.conn.execute(
"UPDATE chapters SET status='pending', pages_done=0, pages_total=0, updated_at=? WHERE manga_url=?",
(db.conn.execute("SELECT datetime('now')").fetchone()[0], url)
)
db.conn.commit()
# Ставим в очередь с resume=False — перекачает всё заново
db.update_manga_status(url, "queued")
await download_queue.put({"url": url, "fmt": manga["format"], "resume": False})
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": manga["format"]})
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/stop")
async def stop_manga(url: str):
"""Остановить текущую загрузку манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
# Отменяем активную задачу если есть
task = active_tasks.get(url)
if task and not task.done():
task.cancel()
# Статус обновит воркер после CancelledError
else:
# Манга в очереди (ещё не начата) — просто помечаем как stopped
db.update_manga_status(url, "stopped")
await ws_manager.broadcast({"type": "manga_stopped", "url": url})
return {"ok": True}
finally:
db.close()
@app.post("/api/mangas/resume")
async def resume_manga(url: str):
"""Возобновить загрузку остановленной/упавшей манги."""
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Манга уже загружается")
db.update_manga_status(url, "queued")
await download_queue.put({"url": url, "fmt": manga["format"]})
await ws_manager.broadcast({"type": "manga_queued", "url": url, "format": manga["format"]})
return {"ok": True}
finally:
db.close()
@app.delete("/api/mangas")
async def delete_manga(url: str, delete_files: bool = False):
db = StateDB()
try:
manga = db.get_manga(url)
if not manga:
raise HTTPException(status_code=404, detail="Манга не найдена")
if manga["status"] == "downloading" and url in active_tasks:
raise HTTPException(status_code=400, detail="Нельзя удалить активную загрузку")
deleted_size = 0
if delete_files:
title = manga.get("title") or ""
safe_title = re.sub(r'[^\w\s\-]', '', title).strip().replace(" ", "_")[:80]
manga_dir = OUTPUT_DIR / safe_title
if manga_dir.exists() and manga_dir.is_dir():
deleted_size = _dir_size(manga_dir)
import shutil
shutil.rmtree(str(manga_dir))
logger.info("Удалена папка: {} ({} байт)", manga_dir, deleted_size)
db.conn.execute("DELETE FROM chapters WHERE manga_url=?", (url,))
db.conn.execute("DELETE FROM history WHERE manga_url=?", (url,))
db.conn.execute("DELETE FROM mangas WHERE url=?", (url,))
db.conn.commit()
return {"ok": True, "deleted_size": deleted_size}
finally:
db.close()
@app.get("/api/stats")
async def global_stats():
db = StateDB()
try:
mangas = db.get_all_mangas()
total_size = _dir_size(OUTPUT_DIR)
return {
"mangas_total": len(mangas),
"mangas_done": sum(1 for m in mangas if m["status"] == "done"),
"mangas_downloading": sum(1 for m in mangas if m["status"] == "downloading"),
"mangas_queued": sum(1 for m in mangas if m["status"] == "queued"),
"mangas_failed": sum(1 for m in mangas if m["status"] == "failed"),
"mangas_stopped": sum(1 for m in mangas if m["status"] == "stopped"),
"queue_size": download_queue.qsize(),
"total_size_bytes": total_size,
"total_size_human": _format_size(total_size),
}
finally:
db.close()
# ── WebSocket ─────────────────────────────────
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
# Отправляем начальный снимок состояния
db = StateDB()
try:
mangas = db.get_all_mangas()
enriched = [_enrich_manga(m, db) for m in mangas]
# Добавляем позицию в очереди
queue_list = list(download_queue._queue) # type: ignore
for i, job in enumerate(queue_list):
for em in enriched:
if em["url"] == job["url"]:
em["queue_position"] = i + 1
await ws.send_json({"type": "snapshot", "mangas": enriched})
finally:
db.close()
while True:
# Держим соединение живым, ждём пинги
data = await ws.receive_text()
if data == "ping":
await ws.send_json({"type": "pong"})
except WebSocketDisconnect:
ws_manager.disconnect(ws)
except Exception:
ws_manager.disconnect(ws)
# ── Статические файлы (фронтенд) ──────────────
if FRONTEND_DIR.exists():
app.mount("/", StaticFiles(directory=str(FRONTEND_DIR), html=True), name="frontend")