Files
2026-05-10 14:39:54 +03:00

34 KiB
Raw Permalink Blame History

Save this as jd_parallel_downloader.py.

It uses 34 separate Firefox instances, one per worker, captures div.jd_summary_list, downloads the file, and writes a crash-safe registry with:

url, downloaded_filepath, from_remote_metadata, internal_hash

It also stores download_file_sha256 separately.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Parallel Selenium Firefox downloader with source registry mapping.

Main outputs:
  output/source_registry.jsonl   # crash-safe full registry
  output/source_registry.csv     # combined table
  output/source_registry.md      # markdown mapping table
  downloads_by_hash/R_<HASH>/... # downloaded files grouped by internal hash

Run example:
  python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080

Headless:
  python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless

Resume behavior:
  - Successful URLs already present in output/source_registry.jsonl are skipped.
  - Failed URLs are retried.
"""

from __future__ import annotations

import argparse
import csv
import hashlib
import json
import os
import re
import shutil
import sys
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from urllib.parse import urldefrag, urlsplit, urlunsplit

from selenium import webdriver
from selenium.common.exceptions import (
    ElementClickInterceptedException,
    JavascriptException,
    NoSuchElementException,
    TimeoutException,
    WebDriverException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait


# ============================================================
# Defaults
# ============================================================

DEFAULT_OUTPUT_DIR = "output"
DEFAULT_DOWNLOAD_ROOT = "downloads_by_hash"
DEFAULT_WORKDIR = "worker_runtime"
DEFAULT_WORKERS = 4

PARTIAL_SUFFIXES = (
    ".part",
    ".crdownload",
    ".tmp",
)

DOWNLOAD_MIME_TYPES = ",".join(
    [
        "application/octet-stream",
        "application/x-garmin-gdb",
        "application/gdb",
        "application/gpx+xml",
        "application/xml",
        "text/xml",
        "text/plain",
        "application/zip",
        "application/x-zip-compressed",
        "application/x-gzip",
        "application/gzip",
        "application/vnd.google-earth.kml+xml",
        "application/vnd.google-earth.kmz",
        "application/x-msdownload",
        "binary/octet-stream",
    ]
)


# ============================================================
# Small helpers
# ============================================================

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def ensure_dir(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path


def normalize_url(url: str) -> str:
    """
    Normalizes the URL enough to make stable deterministic IDs.

    Keeps query string because download pages often identify resources through query params.
    Removes fragment because it normally does not affect server-side download identity.
    """
    url = url.strip()
    url, _fragment = urldefrag(url)

    parts = urlsplit(url)
    scheme = parts.scheme.lower()
    netloc = parts.netloc.lower()

    return urlunsplit((scheme, netloc, parts.path, parts.query, ""))


def make_internal_hash(url: str) -> str:
    """
    Stable registry ID.

    Important: this hash is based on the normalized source URL, not on file contents.
    That means you can later inject R_<HASH> into GPX metadata without invalidating
    the mapping ID itself.
    """
    normalized = normalize_url(url)
    digest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16].upper()
    return f"R_{digest}"


def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            block = f.read(chunk_size)
            if not block:
                break
            h.update(block)
    return h.hexdigest()


def safe_filename(name: str, fallback: str = "download.bin") -> str:
    name = name.strip()
    if not name:
        return fallback

    name = name.replace("\\", "_").replace("/", "_")
    name = re.sub(r"[\x00-\x1f\x7f]+", "_", name)
    name = re.sub(r'[:*?"<>|]+', "_", name)
    name = re.sub(r"\s+", " ", name).strip()

    return name or fallback


def json_dumps_compact(obj) -> str:
    return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))


def write_jsonl_line(path: Path, record: dict) -> None:
    ensure_dir(path.parent)
    line = json.dumps(record, ensure_ascii=False, sort_keys=True)
    with path.open("a", encoding="utf-8", newline="\n") as f:
        f.write(line + "\n")
        f.flush()
        os.fsync(f.fileno())


def read_jsonl(path: Path) -> List[dict]:
    if not path.exists():
        return []

    records = []
    with path.open("r", encoding="utf-8") as f:
        for line_no, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            try:
                records.append(json.loads(line))
            except Exception:
                print(f"[WARN] Broken JSONL line ignored: {path}:{line_no}", file=sys.stderr)
    return records


def chunk_evenly(items: Sequence[dict], workers: int) -> List[List[dict]]:
    chunks = [[] for _ in range(workers)]
    for i, item in enumerate(items):
        chunks[i % workers].append(item)
    return [c for c in chunks if c]


def truncate_text(text: str, max_len: int = 350) -> str:
    text = re.sub(r"\s+", " ", text or "").strip()
    if len(text) <= max_len:
        return text
    return text[: max_len - 3] + "..."


# ============================================================
# Firefox setup
# ============================================================

@dataclass
class WorkerConfig:
    worker_id: int
    proxy_host: Optional[str]
    proxy_port: Optional[int]
    headless: bool
    page_load_timeout: int
    element_timeout: int
    download_timeout: int
    delay_between_urls: float
    workdir: str
    output_dir: str
    download_root: str
    user_agent: str


def build_firefox_driver(cfg: WorkerConfig, temp_download_dir: Path) -> webdriver.Firefox:
    ensure_dir(temp_download_dir)

    options = FirefoxOptions()
    options.page_load_strategy = "eager"

    if cfg.headless:
        options.add_argument("--headless")

    # UA
    if cfg.user_agent:
        options.set_preference("general.useragent.override", cfg.user_agent)

    # Proxy
    if cfg.proxy_host and cfg.proxy_port:
        options.set_preference("network.proxy.type", 1)
        options.set_preference("network.proxy.socks", cfg.proxy_host)
        options.set_preference("network.proxy.socks_port", int(cfg.proxy_port))
        options.set_preference("network.proxy.socks_version", 5)
        options.set_preference("network.proxy.socks_remote_dns", True)

    # Downloads
    options.set_preference("browser.download.folderList", 2)
    options.set_preference("browser.download.dir", str(temp_download_dir.resolve()))
    options.set_preference("browser.download.useDownloadDir", True)
    options.set_preference("browser.download.manager.showWhenStarting", False)
    options.set_preference("browser.download.alwaysOpenPanel", False)
    options.set_preference("browser.helperApps.neverAsk.saveToDisk", DOWNLOAD_MIME_TYPES)
    options.set_preference("browser.helperApps.neverAsk.openFile", DOWNLOAD_MIME_TYPES)
    options.set_preference("pdfjs.disabled", True)

    # Speed / lighter pages
    options.set_preference("permissions.default.image", 2)
    options.set_preference("gfx.downloadable_fonts.enabled", False)
    options.set_preference("media.autoplay.default", 5)
    options.set_preference("dom.ipc.plugins.enabled.libflashplayer.so", False)

    # Reduce automation noise; not guaranteed stealth, just less obvious.
    options.set_preference("dom.webdriver.enabled", False)
    options.set_preference("useAutomationExtension", False)

    driver = webdriver.Firefox(options=options)
    driver.set_page_load_timeout(cfg.page_load_timeout)
    return driver


# ============================================================
# Selenium page logic
# ============================================================

def wait_for_page_load(driver: webdriver.Firefox, timeout: int = 20) -> None:
    try:
        WebDriverWait(driver, timeout).until(
            lambda d: d.execute_script("return document.readyState") in ("interactive", "complete")
        )
    except TimeoutException:
        pass


def click_element(driver, element) -> bool:
    try:
        element.click()
        return True
    except ElementClickInterceptedException:
        try:
            driver.execute_script("arguments[0].click();", element)
            return True
        except JavascriptException:
            return False
    except Exception:
        try:
            driver.execute_script("arguments[0].click();", element)
            return True
        except Exception:
            return False


def extract_jd_summary_metadata(driver: webdriver.Firefox) -> Dict:
    """
    Captures div.jd_summary_list.

    Usually there is one, but this supports multiple.
    """
    metadata = {
        "page_title": "",
        "jd_summary_count": 0,
        "jd_summary_items": [],
    }

    try:
        metadata["page_title"] = driver.title or ""
    except Exception:
        pass

    try:
        elements = driver.find_elements(By.CSS_SELECTOR, "div.jd_summary_list")
    except Exception:
        elements = []

    metadata["jd_summary_count"] = len(elements)

    for idx, el in enumerate(elements):
        item = {
            "index": idx,
            "text": "",
            "html": "",
            "links": [],
        }

        try:
            item["text"] = (el.text or "").strip()
        except Exception:
            pass

        try:
            item["html"] = el.get_attribute("innerHTML") or ""
        except Exception:
            pass

        try:
            links = el.find_elements(By.CSS_SELECTOR, "a")
            for a in links:
                try:
                    item["links"].append(
                        {
                            "text": (a.text or "").strip(),
                            "href": a.get_attribute("href") or "",
                        }
                    )
                except Exception:
                    continue
        except Exception:
            pass

        metadata["jd_summary_items"].append(item)

    return metadata


def accept_license_if_present(driver: webdriver.Firefox, timeout: int) -> bool:
    wait = WebDriverWait(driver, timeout)
    try:
        checkbox = wait.until(EC.presence_of_element_located((By.NAME, "license_agree")))
    except TimeoutException:
        return False

    try:
        if checkbox.is_enabled() and checkbox.is_displayed() and not checkbox.is_selected():
            return click_element(driver, checkbox)
    except Exception:
        return False

    return False


def click_download_button(driver: webdriver.Firefox, timeout: int) -> bool:
    wait = WebDriverWait(driver, timeout)

    selectors = [
        (By.ID, "jd_license_submit"),
        (By.CSS_SELECTOR, "#jd_license_submit"),
        (By.CSS_SELECTOR, "input[type='submit']"),
        (By.CSS_SELECTOR, "button[type='submit']"),
        (By.CSS_SELECTOR, "a[href*='download']"),
    ]

    for by, selector in selectors:
        try:
            btn = wait.until(EC.element_to_be_clickable((by, selector)))
            if click_element(driver, btn):
                return True
        except TimeoutException:
            continue
        except Exception:
            continue

    return False


# ============================================================
# Download detection
# ============================================================

def clear_partial_downloads(download_dir: Path) -> None:
    if not download_dir.exists():
        return

    for path in download_dir.iterdir():
        if path.is_file() and path.name.endswith(PARTIAL_SUFFIXES):
            try:
                path.unlink()
            except Exception:
                pass


def list_completed_files(download_dir: Path) -> List[Path]:
    if not download_dir.exists():
        return []

    files = []
    for p in download_dir.iterdir():
        if not p.is_file():
            continue
        if p.name.endswith(PARTIAL_SUFFIXES):
            continue
        files.append(p)

    return files


def has_partial_files(download_dir: Path) -> bool:
    if not download_dir.exists():
        return False

    for p in download_dir.iterdir():
        if p.is_file() and p.name.endswith(PARTIAL_SUFFIXES):
            return True
    return False


def wait_for_new_download(
    download_dir: Path,
    before_names: set,
    timeout: int,
    check_interval: float = 1.0,
    stable_polls_required: int = 2,
) -> Optional[Path]:
    """
    Waits for a new completed file whose size is stable for several polls.
    """
    end = time.time() + timeout
    size_state: Dict[str, Tuple[int, int]] = {}

    while time.time() < end:
        completed = list_completed_files(download_dir)
        new_completed = [p for p in completed if p.name not in before_names]

        if new_completed:
            # Prefer most recently modified file.
            new_completed.sort(key=lambda p: p.stat().st_mtime, reverse=True)

            for p in new_completed:
                try:
                    current_size = p.stat().st_size
                except FileNotFoundError:
                    continue

                prev_size, stable_count = size_state.get(str(p), (-1, 0))

                if current_size == prev_size and current_size > 0:
                    stable_count += 1
                else:
                    stable_count = 0

                size_state[str(p)] = (current_size, stable_count)

                if stable_count >= stable_polls_required and not has_partial_files(download_dir):
                    return p

        time.sleep(check_interval)

    return None


def move_to_hash_folder(
    temp_file: Path,
    internal_hash: str,
    final_download_root: Path,
) -> Path:
    hash_dir = ensure_dir(final_download_root / internal_hash)
    original_name = safe_filename(temp_file.name)
    dest = hash_dir / original_name

    if dest.exists():
        stem = dest.stem
        suffix = dest.suffix
        counter = 2
        while True:
            candidate = hash_dir / f"{stem}__{counter}{suffix}"
            if not candidate.exists():
                dest = candidate
                break
            counter += 1

    shutil.move(str(temp_file), str(dest))
    return dest.resolve()


# ============================================================
# Optional GPX tagging helper
# ============================================================

def add_internal_hash_to_gpx_file(gpx_path: Path, internal_hash: str) -> bool:
    """
    Optional helper for plain .gpx files.

    This is intentionally conservative and only handles XML GPX files directly.
    It does NOT modify .gdb, because Garmin GDB is binary and should not be patched
    by raw string insertion.

    Current downloader does not call this automatically.
    Keep the original file clean; tag in a later controlled post-processing stage.
    """
    import xml.etree.ElementTree as ET

    try:
        tree = ET.parse(str(gpx_path))
        root = tree.getroot()

        ns_match = re.match(r"\{(.+?)\}", root.tag)
        ns = ns_match.group(1) if ns_match else ""
        prefix = f"{{{ns}}}" if ns else ""

        metadata = root.find(f"{prefix}metadata")
        if metadata is None:
            metadata = ET.Element(f"{prefix}metadata")
            root.insert(0, metadata)

        extensions = metadata.find(f"{prefix}extensions")
        if extensions is None:
            extensions = ET.SubElement(metadata, f"{prefix}extensions")

        tag = ET.SubElement(extensions, "source_registry_hash")
        tag.text = internal_hash

        tree.write(str(gpx_path), encoding="utf-8", xml_declaration=True)
        return True
    except Exception:
        return False


# ============================================================
# One URL processing
# ============================================================

def process_one_url(
    driver: webdriver.Firefox,
    url_item: dict,
    cfg: WorkerConfig,
    temp_download_dir: Path,
    final_download_root: Path,
) -> dict:
    url = url_item["url"]
    input_index = url_item["input_index"]
    internal_hash = make_internal_hash(url)

    started_at = utc_now_iso()

    record = {
        "success": False,
        "status": "started",
        "error": "",
        "worker_id": cfg.worker_id,
        "input_index": input_index,
        "url": url,
        "normalized_url": normalize_url(url),
        "internal_hash": internal_hash,
        "downloaded_filepath": None,
        "download_original_filename": None,
        "download_file_sha256": None,
        "from_remote_metadata": None,
        "started_at_utc": started_at,
        "finished_at_utc": None,
    }

    try:
        clear_partial_downloads(temp_download_dir)

        before_names = {p.name for p in list_completed_files(temp_download_dir)}

        driver.get(url)
        wait_for_page_load(driver, timeout=cfg.page_load_timeout)

        metadata = extract_jd_summary_metadata(driver)
        record["from_remote_metadata"] = metadata

        accept_license_if_present(driver, timeout=cfg.element_timeout)

        clicked = click_download_button(driver, timeout=cfg.element_timeout)
        if not clicked:
            record["status"] = "download_button_not_found_or_not_clickable"
            record["finished_at_utc"] = utc_now_iso()
            return record

        downloaded_temp_file = wait_for_new_download(
            temp_download_dir,
            before_names=before_names,
            timeout=cfg.download_timeout,
        )

        if downloaded_temp_file is None:
            record["status"] = "download_timeout"
            record["finished_at_utc"] = utc_now_iso()
            return record

        file_sha256 = sha256_file(downloaded_temp_file)
        original_filename = downloaded_temp_file.name

        final_path = move_to_hash_folder(
            temp_file=downloaded_temp_file,
            internal_hash=internal_hash,
            final_download_root=final_download_root,
        )

        # Sidecar metadata next to file for direct local browsing.
        sidecar_path = final_path.with_suffix(final_path.suffix + ".source.json")
        sidecar = {
            "url": url,
            "normalized_url": normalize_url(url),
            "internal_hash": internal_hash,
            "downloaded_filepath": str(final_path),
            "download_original_filename": original_filename,
            "download_file_sha256": file_sha256,
            "from_remote_metadata": metadata,
            "created_at_utc": utc_now_iso(),
        }
        sidecar_path.write_text(
            json.dumps(sidecar, ensure_ascii=False, indent=2, sort_keys=True),
            encoding="utf-8",
        )

        record["success"] = True
        record["status"] = "downloaded"
        record["downloaded_filepath"] = str(final_path)
        record["download_original_filename"] = original_filename
        record["download_file_sha256"] = file_sha256
        record["finished_at_utc"] = utc_now_iso()
        return record

    except WebDriverException as e:
        record["status"] = "webdriver_error"
        record["error"] = str(e)
        record["finished_at_utc"] = utc_now_iso()
        return record

    except Exception as e:
        record["status"] = "unexpected_error"
        record["error"] = str(e) + "\n" + traceback.format_exc()
        record["finished_at_utc"] = utc_now_iso()
        return record


# ============================================================
# Worker
# ============================================================

def worker_main(url_items: List[dict], cfg_dict: dict) -> dict:
    cfg = WorkerConfig(**cfg_dict)

    workdir = Path(cfg.workdir)
    output_dir = Path(cfg.output_dir)
    final_download_root = Path(cfg.download_root)

    temp_download_dir = ensure_dir(workdir / f"worker_{cfg.worker_id:02d}" / "temp_downloads")
    worker_jsonl = output_dir / f"worker_{cfg.worker_id:02d}.jsonl"

    result_summary = {
        "worker_id": cfg.worker_id,
        "total": len(url_items),
        "success": 0,
        "failed": 0,
        "jsonl": str(worker_jsonl),
    }

    driver = None

    try:
        driver = build_firefox_driver(cfg, temp_download_dir)

        for n, url_item in enumerate(url_items, 1):
            print(
                f"[worker {cfg.worker_id:02d}] [{n}/{len(url_items)}] "
                f"input_index={url_item['input_index']} {url_item['url']}",
                flush=True,
            )

            record = process_one_url(
                driver=driver,
                url_item=url_item,
                cfg=cfg,
                temp_download_dir=temp_download_dir,
                final_download_root=final_download_root,
            )

            write_jsonl_line(worker_jsonl, record)

            if record.get("success"):
                result_summary["success"] += 1
                print(
                    f"[worker {cfg.worker_id:02d}] OK {record['internal_hash']} "
                    f"{record.get('downloaded_filepath')}",
                    flush=True,
                )
            else:
                result_summary["failed"] += 1
                print(
                    f"[worker {cfg.worker_id:02d}] FAIL {record['internal_hash']} "
                    f"{record.get('status')}",
                    flush=True,
                )

            if cfg.delay_between_urls > 0:
                time.sleep(cfg.delay_between_urls)

    finally:
        if driver is not None:
            try:
                driver.quit()
            except Exception:
                pass

    return result_summary


# ============================================================
# Registry combining
# ============================================================

def load_success_seen(output_dir: Path) -> set:
    """
    Used for resume.

    Only successful URLs are skipped. Failed URLs are retried.
    """
    seen = set()

    paths = []
    combined = output_dir / "source_registry.jsonl"
    if combined.exists():
        paths.append(combined)

    paths.extend(sorted(output_dir.glob("worker_*.jsonl")))

    for path in paths:
        for r in read_jsonl(path):
            if r.get("success") and r.get("url"):
                seen.add(normalize_url(r["url"]))

    return seen


def collect_worker_records(output_dir: Path) -> List[dict]:
    records = []
    for path in sorted(output_dir.glob("worker_*.jsonl")):
        records.extend(read_jsonl(path))

    # De-duplicate by normalized_url.
    # Prefer latest successful record; otherwise keep latest record.
    best_by_url: Dict[str, dict] = {}

    for r in records:
        url = r.get("url")
        if not url:
            continue

        key = normalize_url(url)
        old = best_by_url.get(key)

        if old is None:
            best_by_url[key] = r
            continue

        old_success = bool(old.get("success"))
        new_success = bool(r.get("success"))

        if new_success and not old_success:
            best_by_url[key] = r
            continue

        if new_success == old_success:
            old_finished = old.get("finished_at_utc") or ""
            new_finished = r.get("finished_at_utc") or ""
            if new_finished >= old_finished:
                best_by_url[key] = r

    final_records = list(best_by_url.values())
    final_records.sort(key=lambda r: int(r.get("input_index", 0)))
    return final_records


def write_combined_outputs(output_dir: Path) -> None:
    ensure_dir(output_dir)

    records = collect_worker_records(output_dir)

    jsonl_path = output_dir / "source_registry.jsonl"
    csv_path = output_dir / "source_registry.csv"
    md_path = output_dir / "source_registry.md"

    with jsonl_path.open("w", encoding="utf-8", newline="\n") as f:
        for r in records:
            f.write(json.dumps(r, ensure_ascii=False, sort_keys=True) + "\n")

    csv_fields = [
        "input_index",
        "success",
        "status",
        "internal_hash",
        "url",
        "downloaded_filepath",
        "download_original_filename",
        "download_file_sha256",
        "from_remote_metadata",
        "error",
        "started_at_utc",
        "finished_at_utc",
    ]

    with csv_path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=csv_fields)
        writer.writeheader()

        for r in records:
            row = {}
            for field in csv_fields:
                value = r.get(field)
                if field == "from_remote_metadata":
                    value = json_dumps_compact(value)
                row[field] = value
            writer.writerow(row)

    with md_path.open("w", encoding="utf-8", newline="\n") as f:
        f.write("| internal_hash | success | status | url | downloaded_filepath | details |\n")
        f.write("|---|---:|---|---|---|---|\n")

        for r in records:
            meta = r.get("from_remote_metadata") or {}
            details_parts = []

            title = truncate_text(meta.get("page_title", ""), 100)
            if title:
                details_parts.append(f"title: {title}")

            items = meta.get("jd_summary_items") or []
            if items:
                first_text = truncate_text(items[0].get("text", ""), 180)
                if first_text:
                    details_parts.append(first_text)

            details = "<br>".join(details_parts)
            details = details.replace("|", "\\|")

            f.write(
                "| {internal_hash} | {success} | {status} | {url} | {path} | {details} |\n".format(
                    internal_hash=r.get("internal_hash", ""),
                    success=str(bool(r.get("success"))),
                    status=str(r.get("status", "")).replace("|", "\\|"),
                    url=str(r.get("url", "")).replace("|", "\\|"),
                    path=str(r.get("downloaded_filepath") or "").replace("|", "\\|"),
                    details=details,
                )
            )

    print(f"\nCombined registry written:")
    print(f"  JSONL: {jsonl_path}")
    print(f"  CSV:   {csv_path}")
    print(f"  MD:    {md_path}")


# ============================================================
# Input / args
# ============================================================

def parse_proxy(proxy: Optional[str]) -> Tuple[Optional[str], Optional[int]]:
    if not proxy:
        return None, None

    proxy = proxy.strip()
    if "://" in proxy:
        proxy = proxy.split("://", 1)[1]

    if ":" not in proxy:
        raise ValueError("Proxy must be in host:port format, example: 192.168.0.38:1080")

    host, port_s = proxy.rsplit(":", 1)
    return host.strip(), int(port_s.strip())


def load_urls(input_path: Path, start: int, limit: Optional[int]) -> List[dict]:
    urls = []
    with input_path.open("r", encoding="utf-8") as f:
        for idx, line in enumerate(f):
            url = line.strip()
            if not url:
                continue
            urls.append({"input_index": idx, "url": url})

    sliced = urls[start:]
    if limit is not None:
        sliced = sliced[:limit]

    return sliced


def build_arg_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        description="Parallel Firefox downloader with jd_summary_list registry capture."
    )

    p.add_argument("--input", default="url-final.txt", help="Input URL file, one URL per line.")
    p.add_argument("--start", type=int, default=0, help="Start offset in input file. Example: 780")
    p.add_argument("--limit", type=int, default=None, help="Optional max URLs to process.")

    p.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="Firefox worker count.")
    p.add_argument("--proxy", default=None, help="SOCKS5 proxy as host:port, example 192.168.0.38:1080")
    p.add_argument("--headless", action="store_true", help="Run Firefox headless.")

    p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
    p.add_argument("--download-root", default=DEFAULT_DOWNLOAD_ROOT)
    p.add_argument("--workdir", default=DEFAULT_WORKDIR)

    p.add_argument("--page-load-timeout", type=int, default=25)
    p.add_argument("--element-timeout", type=int, default=7)
    p.add_argument("--download-timeout", type=int, default=60)
    p.add_argument("--delay", type=float, default=2.0, help="Delay between URLs per worker.")

    p.add_argument(
        "--user-agent",
        default=(
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) "
            "Gecko/20100101 Firefox/126.0"
        ),
    )

    p.add_argument(
        "--no-resume",
        action="store_true",
        help="Do not skip URLs already successful in previous registry files.",
    )

    p.add_argument(
        "--combine-only",
        action="store_true",
        help="Only combine existing worker_*.jsonl files into source_registry.* outputs.",
    )

    return p


# ============================================================
# Main
# ============================================================

def main() -> int:
    args = build_arg_parser().parse_args()

    input_path = Path(args.input)
    output_dir = ensure_dir(Path(args.output_dir))
    download_root = ensure_dir(Path(args.download_root))
    workdir = ensure_dir(Path(args.workdir))

    if args.combine_only:
        write_combined_outputs(output_dir)
        return 0

    if not input_path.exists():
        print(f"Input file not found: {input_path}", file=sys.stderr)
        return 2

    proxy_host, proxy_port = parse_proxy(args.proxy)

    url_items = load_urls(input_path=input_path, start=args.start, limit=args.limit)

    if not args.no_resume:
        seen_success = load_success_seen(output_dir)
        before_count = len(url_items)
        url_items = [
            item for item in url_items
            if normalize_url(item["url"]) not in seen_success
        ]
        skipped = before_count - len(url_items)
    else:
        skipped = 0

    if not url_items:
        print("No URLs to process.")
        if skipped:
            print(f"Skipped already-successful URLs: {skipped}")
        write_combined_outputs(output_dir)
        return 0

    workers = max(1, min(int(args.workers), len(url_items)))
    chunks = chunk_evenly(url_items, workers)

    print("=== CONFIG ===")
    print(f"Input:          {input_path}")
    print(f"Start offset:   {args.start}")
    print(f"URLs to run:    {len(url_items)}")
    print(f"Skipped resume: {skipped}")
    print(f"Workers:        {workers}")
    print(f"Proxy:          {proxy_host}:{proxy_port}" if proxy_host else "Proxy:          none")
    print(f"Headless:       {args.headless}")
    print(f"Output dir:     {output_dir.resolve()}")
    print(f"Download root:  {download_root.resolve()}")
    print("==============\n")

    cfgs = []
    for worker_id, _chunk in enumerate(chunks):
        cfg = WorkerConfig(
            worker_id=worker_id,
            proxy_host=proxy_host,
            proxy_port=proxy_port,
            headless=bool(args.headless),
            page_load_timeout=int(args.page_load_timeout),
            element_timeout=int(args.element_timeout),
            download_timeout=int(args.download_timeout),
            delay_between_urls=float(args.delay),
            workdir=str(workdir),
            output_dir=str(output_dir),
            download_root=str(download_root),
            user_agent=str(args.user_agent),
        )
        cfgs.append(cfg)

    total_success = 0
    total_failed = 0

    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = []
        for chunk, cfg in zip(chunks, cfgs):
            futures.append(executor.submit(worker_main, chunk, cfg.__dict__))

        for fut in as_completed(futures):
            try:
                summary = fut.result()
                total_success += int(summary.get("success", 0))
                total_failed += int(summary.get("failed", 0))
                print(
                    f"\nWorker {summary.get('worker_id')} finished: "
                    f"success={summary.get('success')} failed={summary.get('failed')}"
                )
            except Exception as e:
                total_failed += 1
                print(f"[FATAL] Worker crashed: {e}", file=sys.stderr)
                traceback.print_exc()

    write_combined_outputs(output_dir)

    print("\n=== FINISHED ===")
    print(f"Success this run: {total_success}")
    print(f"Failed this run:  {total_failed}")
    print(f"Registry ID form: R_<16 hex chars from normalized URL SHA256>")
    print("For GDB files, do not raw-patch the binary. Use the registry or a controlled converter/post-processor.")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

Run it like this:

python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080

Headless:

python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless

Important behavior:

  • Each worker has its own Firefox instance.
  • Each worker has its own temporary download directory.
  • Final files are moved into:
downloads_by_hash/
  R_ABCDEF1234567890/
    downloaded_file.gpx
    downloaded_file.gpx.source.json
  • Main registry files are:
output/source_registry.jsonl
output/source_registry.csv
output/source_registry.md

For GPX/GDB injection later:

  • GPX can safely receive a metadata extension like:
<source_registry_hash>R_ABCDEF1234567890</source_registry_hash>
  • GDB should not be patched as raw binary. Better flow is: keep the registry + sidecar, or convert GDB to GPX with GPSBabel, inject the hash into GPX metadata, and keep the original GDB mapped through source_registry.csv/jsonl.