Files
manga/src/worker.py
2026-04-30 17:14:21 +03:00

386 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Воркер скачивания манги с поддержкой событий прогресса.
"""
import asyncio
import os
import re
import tempfile
from pathlib import Path
from typing import Callable, Optional
from loguru import logger
from .browser import BrowserManager
from .scraper import get_manga_info, get_chapter_images_and_download, Chapter
from .exporter import export, MangaMeta
from .state import StateDB
OUTPUT_DIR = Path("/app/output")
# Читаем из переменных окружения; можно переопределить в docker-compose
CHAPTER_CONCURRENCY = int(os.getenv("CHAPTER_CONCURRENCY", "3"))
def _safe_name(s: str) -> str:
return re.sub(r'[^\w\s\-]', '', s).strip().replace(" ", "_")[:80]
def _safe_chapter_name(ch: Chapter) -> str:
vol = f"v{ch.volume:02d}_" if ch.volume else ""
return f"{vol}ch{ch.number:06.1f}"
async def download_manga(
url: str,
fmt: str = "cbz",
output_dir: Path = OUTPUT_DIR,
resume: bool = True,
is_update: bool = False,
on_event: Optional[Callable] = None,
chapter_concurrency: int = CHAPTER_CONCURRENCY,
):
"""Скачать мангу. Главы обрабатываются параллельно (chapter_concurrency штук)."""
async def emit(event: dict):
if on_event:
try:
await on_event(event)
except Exception as e:
logger.debug("on_event error: {}", e)
db = StateDB()
db_lock = asyncio.Lock() # защита от параллельных записей в SQLite
async def db_call(fn, *args, **kwargs):
"""Обёртка: все обращения к db идут через общий asyncio.Lock."""
async with db_lock:
return fn(*args, **kwargs)
try:
await db_call(db.update_manga_status, url, "downloading")
started_ts = await db_call(db.mark_started, url)
await emit({"type": "manga_start", "url": url, "started_at": started_ts})
async with BrowserManager(headless=True) as bm:
ctx, info_page = await bm.new_page()
manga = await get_manga_info(info_page, url)
await info_page.close()
if not manga:
await db_call(db.update_manga_status, url, "failed")
await emit({"type": "manga_failed", "url": url,
"error": "Не удалось получить информацию о манге"})
return
await db_call(
db.update_manga_info,
url,
title=manga.title_ru or manga.title,
chapters_total=len(manga.chapters),
title_ru=manga.title_ru,
title_full=manga.title_full,
pub_status=manga.pub_status,
)
await emit({
"type": "manga_info",
"url": url,
"title": manga.title_ru or manga.title,
"title_ru": manga.title_ru,
"title_full": manga.title_full,
"pub_status": manga.pub_status,
"chapters_total": len(manga.chapters),
})
# Используем кастомное имя папки из БД, если задано
_db_manga = await db_call(db.get_manga, url)
folder_name = (
(_db_manga.get("folder_name") if _db_manga else None)
or _safe_name(manga.title_ru or manga.title)
)
manga_dir = output_dir / folder_name
manga_dir.mkdir(parents=True, exist_ok=True)
for ch in manga.chapters:
await db_call(db.upsert_chapter, url, ch.url, ch.title, ch.number, ch.volume)
formats = ["cbz", "pdf", "epub"] if fmt == "all" else [fmt]
# ── Разделяем главы: пропустить / скачать ────────────────────
to_skip = []
to_download = []
for ch in manga.chapters:
if resume and (await db_call(db.chapter_status, ch.url)) == "done":
to_skip.append(ch)
else:
to_download.append(ch)
# Счётчик и блокировка для безопасного обновления из параллельных задач
counter_lock = asyncio.Lock()
# Начинаем с 0: to_skip-цикл сам доберёт до len(to_skip),
# иначе sync_chapters_done() + len(to_skip) = двойной счёт
chapters_done = 0
# Сообщаем о пропущенных главах (уже скачаны)
for ch in to_skip:
chapters_done += 1
await emit({
"type": "chapter_skipped",
"url": url,
"chapter_url": ch.url,
"chapter_number": ch.number,
"chapter_title": ch.title,
"volume": ch.volume,
"chapters_done": chapters_done,
"chapters_total": len(manga.chapters),
})
logger.info(
"Параллельность: {} гл одновременно. Пропущено: {}, скачать: {}",
chapter_concurrency, len(to_skip), len(to_download),
)
# ── Семафор ограничивает одновременно открытые страницы ───────
sem = asyncio.Semaphore(chapter_concurrency)
async def process_chapter(ch: Chapter) -> None:
nonlocal chapters_done
async with sem:
# Повторная проверка (другая горутина могла скачать)
if (await db_call(db.chapter_status, ch.url)) == "done":
async with counter_lock:
chapters_done += 1
done_snap = chapters_done
await emit({
"type": "chapter_skipped",
"url": url,
"chapter_url": ch.url,
"chapter_number": ch.number,
"chapter_title": ch.title,
"volume": ch.volume,
"chapters_done": done_snap,
"chapters_total": len(manga.chapters),
})
return
await emit({
"type": "chapter_start",
"url": url,
"chapter_url": ch.url,
"chapter_title": ch.title,
"chapter_number": ch.number,
"volume": ch.volume,
"chapters_done": chapters_done,
"chapters_total": len(manga.chapters),
})
ch_page = await ctx.new_page()
try:
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
pages_done_count = [0]
async def on_page(page_idx: int, pages_total: int):
pages_done_count[0] += 1
await db_call(db.update_chapter_pages,
ch.url, pages_total, pages_done_count[0])
await emit({
"type": "page_done",
"url": url,
"chapter_url": ch.url,
"page_idx": page_idx,
"pages_done": pages_done_count[0],
"pages_total": pages_total,
})
image_paths = await get_chapter_images_and_download(
ch_page, ch.url,
dest_dir=tmp_path,
manga_url=url,
on_page=on_page,
)
if not image_paths:
logger.error(
"Т{} Гл.{} '{}' — get_chapter_images вернул пустой список. "
"URL: {}",
ch.volume, ch.number, ch.title, ch.url,
)
await db_call(db.mark_failed, ch.url)
await emit({"type": "chapter_failed", "url": url,
"chapter_url": ch.url})
return
ch_name = _safe_chapter_name(ch)
ch_meta = MangaMeta(
series=manga.title_ru or manga.title,
series_full=manga.title_full or "",
chapter_title=ch.title,
number=ch.number,
volume=ch.volume,
chapters_total=len(manga.chapters),
pub_status=manga.pub_status,
source_url=url,
summary=manga.description,
genre=", ".join(manga.genres) if manga.genres else "",
)
for f in formats:
out_file = manga_dir / f"{ch_name}.{f}"
try:
export(image_paths, out_file, f, meta=ch_meta)
await db_call(db.mark_done, ch.url, f, str(out_file))
except Exception as e:
logger.exception(
"Ошибка экспорта Т{} Гл.{}{} | {}: {}",
ch.volume, ch.number, f, out_file.name, e,
)
event_type = "auto_downloaded" if is_update else "downloaded"
await db_call(
db.add_history,
manga_url=url,
event_type=event_type,
chapter_url=ch.url,
chapter_title=ch.title,
chapter_number=ch.number,
volume=ch.volume,
)
async with counter_lock:
chapters_done += 1
done_snap = chapters_done
await emit({
"type": "chapter_done",
"url": url,
"chapter_url": ch.url,
"chapter_title": ch.title,
"chapter_number": ch.number,
"volume": ch.volume,
"chapters_done": done_snap,
"chapters_total": len(manga.chapters),
})
except Exception as e:
logger.exception(
"Необработанное исключение в Т{} Гл.{} '{}' | {}: {}",
ch.volume, ch.number, ch.title, ch.url, e,
)
await db_call(db.mark_failed, ch.url)
await emit({"type": "chapter_failed", "url": url,
"chapter_url": ch.url, "error": str(e)})
finally:
await ch_page.close()
# ── Запускаем все задачи сразу; семафор дозирует параллельность ──
tasks = [process_chapter(ch) for ch in to_download]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Логируем неожиданные исключения из gather
for ch, res in zip(to_download, results):
if isinstance(res, Exception) and not isinstance(res, asyncio.CancelledError):
logger.exception(
"gather: необработанное исключение Т{} Гл.{} '{}': {}",
ch.volume, ch.number, ch.title, res,
)
real_done = await db_call(db.sync_chapters_done, url)
await db_call(db.update_manga_status, url, "done")
finished_ts = await db_call(db.mark_finished, url)
await db_call(db.set_last_checked, url)
await emit({
"type": "manga_done",
"url": url,
"chapters_done": real_done,
"chapters_total": len(manga.chapters),
"finished_at": finished_ts,
})
await ctx.close()
except asyncio.CancelledError:
raise
except Exception as e:
logger.error("Manga worker error {}: {}", url, e)
await db_call(db.update_manga_status, url, "failed")
finished_ts = await db_call(db.mark_finished, url)
await emit({"type": "manga_failed", "url": url, "error": str(e), "finished_at": finished_ts})
finally:
db.close()
async def check_for_updates(
url: str,
on_event: Optional[Callable] = None,
) -> list[str]:
"""
Проверяет наличие новых глав для манги.
Возвращает список новых chapter_url.
"""
async def emit(event: dict):
if on_event:
try:
await on_event(event)
except Exception:
pass
db = StateDB()
try:
db.set_last_checked(url)
db.add_history(manga_url=url, event_type="check_started")
await emit({"type": "check_started", "url": url})
async with BrowserManager(headless=True) as bm:
_, page = await bm.new_page()
manga = await get_manga_info(page, url)
await page.close()
if not manga:
return []
# Обновляем pub_status и количество глав
db.update_manga_info(
url,
title=manga.title_ru or manga.title,
chapters_total=len(manga.chapters),
title_ru=manga.title_ru,
title_full=manga.title_full,
pub_status=manga.pub_status,
)
# Находим главы которых ещё нет в БД
known = {ch["chapter_url"] for ch in db.get_all_chapters(url)}
new_chapters = [ch for ch in manga.chapters if ch.url not in known]
for ch in new_chapters:
db.upsert_chapter(url, ch.url, ch.title, ch.number, ch.volume)
db.add_history(
manga_url=url,
event_type="new_chapter_found",
chapter_url=ch.url,
chapter_title=ch.title,
chapter_number=ch.number,
volume=ch.volume,
)
await emit({
"type": "new_chapter_found",
"url": url,
"chapter_url": ch.url,
"chapter_title": ch.title,
"chapter_number": ch.number,
})
db.add_history(
manga_url=url,
event_type="check_done",
details=f"Найдено новых: {len(new_chapters)}",
)
await emit({
"type": "check_done",
"url": url,
"new_chapters": len(new_chapters),
})
return [ch.url for ch in new_chapters]
finally:
db.close()