""" CLI точка входа. Использование: python -m src.cli download [опции] python -m src.cli analyze """ import asyncio import re import sys import tempfile from pathlib import Path import click from loguru import logger from tqdm import tqdm from .browser import BrowserManager from .sources import registry, get_source_for_url from .sources.base import Chapter from .exporter import export, ExportFormat, MangaMeta from .state import StateDB from .utils import safe_name, safe_chapter_name OUTPUT_DIR = Path("/app/output") STATE_DIR = Path("/app/state") # ── Настройка логирования ───────────────────── def _setup_logging(verbose: bool): logger.remove() level = "DEBUG" if verbose else "INFO" logger.add(sys.stderr, level=level, format="{time:HH:mm:ss} | {level: <8} | {message}") logger.add(STATE_DIR / "manga.log", level="DEBUG", rotation="10 MB") # ── CLI ─────────────────────────────────────── @click.group() @click.option("--verbose", "-v", is_flag=True, help="Подробный вывод") @click.pass_context def cli(ctx, verbose): ctx.ensure_object(dict) ctx.obj["verbose"] = verbose _setup_logging(verbose) # ── download ────────────────────────────────── @cli.command() @click.argument("url") @click.option("--format", "-f", "fmt", type=click.Choice(["cbz", "pdf", "epub", "all"]), default="cbz", show_default=True, help="Формат вывода") @click.option("--chapters", "-c", default=None, help="Диапазон глав, напр. 1-10 или 5 или 1,3,7") @click.option("--output", "-o", default=str(OUTPUT_DIR), help="Папка для сохранения", show_default=True) @click.option("--resume/--no-resume", default=True, help="Пропускать уже скачанные главы") @click.option("--force", "-F", is_flag=True, default=False, help="Игнорировать состояние и скачать заново, перезаписывая файлы") @click.option("--concurrency", default=4, show_default=True, help="Параллельных загрузок изображений") @click.pass_context def download(ctx, url, fmt, chapters, output, resume, force, concurrency): """Скачать мангу по URL страницы.""" asyncio.run(_download( url=url, fmt=fmt, chapters_filter=chapters, output_dir=Path(output), resume=resume and not force, force=force, concurrency=concurrency, verbose=ctx.obj.get("verbose", False), )) async def _download(url, fmt, chapters_filter, output_dir, resume, force, concurrency, verbose): db = StateDB() db.sync_sources(registry) source = get_source_for_url(url, db) if source is None: srcs = registry.all_sources() source = srcs[0] if srcs else None if source is None: logger.error("Источник не определён для URL: {}", url) db.close() return async with BrowserManager(headless=True) as bm: ctx, page = await bm.new_page() manga = await source.get_manga_info(page, url) if not manga: logger.error("Не удалось получить информацию о манге") db.close() return manga_dir = output_dir / safe_name(manga.title_ru or manga.title) manga_dir.mkdir(parents=True, exist_ok=True) for ch in manga.chapters: db.upsert_chapter(url, ch.url, ch.title, ch.number, ch.volume) chapters = _filter_chapters(manga.chapters, chapters_filter) logger.info("Будет скачано глав: {}", len(chapters)) formats: list[ExportFormat] = ["cbz", "pdf", "epub"] if fmt == "all" else [fmt] with tqdm(total=len(chapters), desc="Главы", unit="гл") as pbar: for ch in chapters: pbar.set_description(f"Глава {ch.number}: {ch.title[:30]}") if force: db.reset_chapter(ch.url) elif resume and db.chapter_status(ch.url) == "done": logger.info("Пропускаем (уже скачана): {}", ch.title) pbar.update(1) continue await _process_chapter( source=source, ctx=ctx, ch=ch, manga=manga, manga_url=url, manga_dir=manga_dir, formats=formats, db=db, force=force, ) pbar.update(1) logger.info("✅ Готово! Файлы в: {}", manga_dir) await ctx.close() db.close() async def _process_chapter(source, ctx, ch: Chapter, manga, manga_url: str, manga_dir: Path, formats: list, db: StateDB, force: bool = False): ch_page = await ctx.new_page() try: with tempfile.TemporaryDirectory() as tmpdir: tmp_path = Path(tmpdir) image_paths = await source.get_chapter_images_and_download( ch_page, ch.url, dest_dir=tmp_path, manga_url=manga_url ) if not image_paths: logger.error("Нет скачанных изображений: {}", ch.title) db.mark_failed(ch.url) return ch_name = safe_chapter_name(ch) ch_meta = MangaMeta( series=manga.title_ru or manga.title, series_full=manga.title_full or "", chapter_title=ch.title, number=ch.number, volume=ch.volume, chapters_total=len(manga.chapters), pub_status=manga.pub_status, source_url=manga_url, summary=manga.description, genre=", ".join(manga.genres) if manga.genres else "", ) for fmt in formats: out_file = manga_dir / f"{ch_name}.{fmt}" if force and out_file.exists(): out_file.unlink() logger.debug("Удалён старый файл: {}", out_file.name) try: export(image_paths, out_file, fmt, meta=ch_meta) db.mark_done(ch.url, fmt, str(out_file)) except Exception as e: logger.error("Ошибка экспорта {}: {}", fmt, e) except Exception as e: logger.error("Ошибка обработки главы {}: {}", ch.title, e) db.mark_failed(ch.url) finally: await ch_page.close() # ── analyze ─────────────────────────────────── @cli.command() @click.argument("url") @click.pass_context def analyze(ctx, url): """Анализировать сайт и вывести список глав (без скачивания).""" asyncio.run(_analyze(url)) async def _analyze(url: str): db = StateDB() db.sync_sources(registry) source = get_source_for_url(url, db) if source is None: srcs = registry.all_sources() source = srcs[0] if srcs else None if source is None: click.echo("❌ Источник не найден") db.close() return async with BrowserManager(headless=True) as bm: _, page = await bm.new_page() manga = await source.get_manga_info(page, url) if not manga: click.echo("❌ Не удалось получить информацию") db.close() return click.echo(f"\n📚 Манга: {manga.title_ru or manga.title}") click.echo(f"🔗 URL: {manga.url}") click.echo(f"📖 Глав: {len(manga.chapters)}\n") for ch in manga.chapters[:20]: click.echo(f" Том {ch.volume:02d} Гл. {ch.number:06.1f} {ch.title}") if len(manga.chapters) > 20: click.echo(f" ... и ещё {len(manga.chapters) - 20} глав") if manga.chapters: first = manga.chapters[-1] click.echo(f"\n🔍 Проверяем первую главу: {first.url}") with tempfile.TemporaryDirectory() as tmp: paths = await source.get_chapter_images_and_download( page, first.url, dest_dir=Path(tmp), manga_url=url ) click.echo(f" Скачано изображений: {len(paths)}") for p in paths[:3]: click.echo(f" {p.name} ({p.stat().st_size} байт)") db.close() # ── Утилиты ─────────────────────────────────── def _filter_chapters(chapters: list[Chapter], filter_str: str | None) -> list[Chapter]: if not filter_str: return chapters m = re.match(r"^(\d+(?:\.\d+)?)-(\d+(?:\.\d+)?)$", filter_str) if m: lo, hi = float(m.group(1)), float(m.group(2)) return [c for c in chapters if lo <= c.number <= hi] nums = {float(x.strip()) for x in filter_str.split(",")} return [c for c in chapters if c.number in nums] if __name__ == "__main__": cli()