34 KiB
34 KiB
Save this as jd_parallel_downloader.py.
It uses 3–4 separate Firefox instances, one per worker, captures div.jd_summary_list, downloads the file, and writes a crash-safe registry with:
url, downloaded_filepath, from_remote_metadata, internal_hash
It also stores download_file_sha256 separately.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Parallel Selenium Firefox downloader with source registry mapping.
Main outputs:
output/source_registry.jsonl # crash-safe full registry
output/source_registry.csv # combined table
output/source_registry.md # markdown mapping table
downloads_by_hash/R_<HASH>/... # downloaded files grouped by internal hash
Run example:
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080
Headless:
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless
Resume behavior:
- Successful URLs already present in output/source_registry.jsonl are skipped.
- Failed URLs are retried.
"""
from __future__ import annotations
import argparse
import csv
import hashlib
import json
import os
import re
import shutil
import sys
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from urllib.parse import urldefrag, urlsplit, urlunsplit
from selenium import webdriver
from selenium.common.exceptions import (
ElementClickInterceptedException,
JavascriptException,
NoSuchElementException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# ============================================================
# Defaults
# ============================================================
DEFAULT_OUTPUT_DIR = "output"
DEFAULT_DOWNLOAD_ROOT = "downloads_by_hash"
DEFAULT_WORKDIR = "worker_runtime"
DEFAULT_WORKERS = 4
PARTIAL_SUFFIXES = (
".part",
".crdownload",
".tmp",
)
DOWNLOAD_MIME_TYPES = ",".join(
[
"application/octet-stream",
"application/x-garmin-gdb",
"application/gdb",
"application/gpx+xml",
"application/xml",
"text/xml",
"text/plain",
"application/zip",
"application/x-zip-compressed",
"application/x-gzip",
"application/gzip",
"application/vnd.google-earth.kml+xml",
"application/vnd.google-earth.kmz",
"application/x-msdownload",
"binary/octet-stream",
]
)
# ============================================================
# Small helpers
# ============================================================
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def normalize_url(url: str) -> str:
"""
Normalizes the URL enough to make stable deterministic IDs.
Keeps query string because download pages often identify resources through query params.
Removes fragment because it normally does not affect server-side download identity.
"""
url = url.strip()
url, _fragment = urldefrag(url)
parts = urlsplit(url)
scheme = parts.scheme.lower()
netloc = parts.netloc.lower()
return urlunsplit((scheme, netloc, parts.path, parts.query, ""))
def make_internal_hash(url: str) -> str:
"""
Stable registry ID.
Important: this hash is based on the normalized source URL, not on file contents.
That means you can later inject R_<HASH> into GPX metadata without invalidating
the mapping ID itself.
"""
normalized = normalize_url(url)
digest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16].upper()
return f"R_{digest}"
def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
block = f.read(chunk_size)
if not block:
break
h.update(block)
return h.hexdigest()
def safe_filename(name: str, fallback: str = "download.bin") -> str:
name = name.strip()
if not name:
return fallback
name = name.replace("\\", "_").replace("/", "_")
name = re.sub(r"[\x00-\x1f\x7f]+", "_", name)
name = re.sub(r'[:*?"<>|]+', "_", name)
name = re.sub(r"\s+", " ", name).strip()
return name or fallback
def json_dumps_compact(obj) -> str:
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def write_jsonl_line(path: Path, record: dict) -> None:
ensure_dir(path.parent)
line = json.dumps(record, ensure_ascii=False, sort_keys=True)
with path.open("a", encoding="utf-8", newline="\n") as f:
f.write(line + "\n")
f.flush()
os.fsync(f.fileno())
def read_jsonl(path: Path) -> List[dict]:
if not path.exists():
return []
records = []
with path.open("r", encoding="utf-8") as f:
for line_no, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except Exception:
print(f"[WARN] Broken JSONL line ignored: {path}:{line_no}", file=sys.stderr)
return records
def chunk_evenly(items: Sequence[dict], workers: int) -> List[List[dict]]:
chunks = [[] for _ in range(workers)]
for i, item in enumerate(items):
chunks[i % workers].append(item)
return [c for c in chunks if c]
def truncate_text(text: str, max_len: int = 350) -> str:
text = re.sub(r"\s+", " ", text or "").strip()
if len(text) <= max_len:
return text
return text[: max_len - 3] + "..."
# ============================================================
# Firefox setup
# ============================================================
@dataclass
class WorkerConfig:
worker_id: int
proxy_host: Optional[str]
proxy_port: Optional[int]
headless: bool
page_load_timeout: int
element_timeout: int
download_timeout: int
delay_between_urls: float
workdir: str
output_dir: str
download_root: str
user_agent: str
def build_firefox_driver(cfg: WorkerConfig, temp_download_dir: Path) -> webdriver.Firefox:
ensure_dir(temp_download_dir)
options = FirefoxOptions()
options.page_load_strategy = "eager"
if cfg.headless:
options.add_argument("--headless")
# UA
if cfg.user_agent:
options.set_preference("general.useragent.override", cfg.user_agent)
# Proxy
if cfg.proxy_host and cfg.proxy_port:
options.set_preference("network.proxy.type", 1)
options.set_preference("network.proxy.socks", cfg.proxy_host)
options.set_preference("network.proxy.socks_port", int(cfg.proxy_port))
options.set_preference("network.proxy.socks_version", 5)
options.set_preference("network.proxy.socks_remote_dns", True)
# Downloads
options.set_preference("browser.download.folderList", 2)
options.set_preference("browser.download.dir", str(temp_download_dir.resolve()))
options.set_preference("browser.download.useDownloadDir", True)
options.set_preference("browser.download.manager.showWhenStarting", False)
options.set_preference("browser.download.alwaysOpenPanel", False)
options.set_preference("browser.helperApps.neverAsk.saveToDisk", DOWNLOAD_MIME_TYPES)
options.set_preference("browser.helperApps.neverAsk.openFile", DOWNLOAD_MIME_TYPES)
options.set_preference("pdfjs.disabled", True)
# Speed / lighter pages
options.set_preference("permissions.default.image", 2)
options.set_preference("gfx.downloadable_fonts.enabled", False)
options.set_preference("media.autoplay.default", 5)
options.set_preference("dom.ipc.plugins.enabled.libflashplayer.so", False)
# Reduce automation noise; not guaranteed stealth, just less obvious.
options.set_preference("dom.webdriver.enabled", False)
options.set_preference("useAutomationExtension", False)
driver = webdriver.Firefox(options=options)
driver.set_page_load_timeout(cfg.page_load_timeout)
return driver
# ============================================================
# Selenium page logic
# ============================================================
def wait_for_page_load(driver: webdriver.Firefox, timeout: int = 20) -> None:
try:
WebDriverWait(driver, timeout).until(
lambda d: d.execute_script("return document.readyState") in ("interactive", "complete")
)
except TimeoutException:
pass
def click_element(driver, element) -> bool:
try:
element.click()
return True
except ElementClickInterceptedException:
try:
driver.execute_script("arguments[0].click();", element)
return True
except JavascriptException:
return False
except Exception:
try:
driver.execute_script("arguments[0].click();", element)
return True
except Exception:
return False
def extract_jd_summary_metadata(driver: webdriver.Firefox) -> Dict:
"""
Captures div.jd_summary_list.
Usually there is one, but this supports multiple.
"""
metadata = {
"page_title": "",
"jd_summary_count": 0,
"jd_summary_items": [],
}
try:
metadata["page_title"] = driver.title or ""
except Exception:
pass
try:
elements = driver.find_elements(By.CSS_SELECTOR, "div.jd_summary_list")
except Exception:
elements = []
metadata["jd_summary_count"] = len(elements)
for idx, el in enumerate(elements):
item = {
"index": idx,
"text": "",
"html": "",
"links": [],
}
try:
item["text"] = (el.text or "").strip()
except Exception:
pass
try:
item["html"] = el.get_attribute("innerHTML") or ""
except Exception:
pass
try:
links = el.find_elements(By.CSS_SELECTOR, "a")
for a in links:
try:
item["links"].append(
{
"text": (a.text or "").strip(),
"href": a.get_attribute("href") or "",
}
)
except Exception:
continue
except Exception:
pass
metadata["jd_summary_items"].append(item)
return metadata
def accept_license_if_present(driver: webdriver.Firefox, timeout: int) -> bool:
wait = WebDriverWait(driver, timeout)
try:
checkbox = wait.until(EC.presence_of_element_located((By.NAME, "license_agree")))
except TimeoutException:
return False
try:
if checkbox.is_enabled() and checkbox.is_displayed() and not checkbox.is_selected():
return click_element(driver, checkbox)
except Exception:
return False
return False
def click_download_button(driver: webdriver.Firefox, timeout: int) -> bool:
wait = WebDriverWait(driver, timeout)
selectors = [
(By.ID, "jd_license_submit"),
(By.CSS_SELECTOR, "#jd_license_submit"),
(By.CSS_SELECTOR, "input[type='submit']"),
(By.CSS_SELECTOR, "button[type='submit']"),
(By.CSS_SELECTOR, "a[href*='download']"),
]
for by, selector in selectors:
try:
btn = wait.until(EC.element_to_be_clickable((by, selector)))
if click_element(driver, btn):
return True
except TimeoutException:
continue
except Exception:
continue
return False
# ============================================================
# Download detection
# ============================================================
def clear_partial_downloads(download_dir: Path) -> None:
if not download_dir.exists():
return
for path in download_dir.iterdir():
if path.is_file() and path.name.endswith(PARTIAL_SUFFIXES):
try:
path.unlink()
except Exception:
pass
def list_completed_files(download_dir: Path) -> List[Path]:
if not download_dir.exists():
return []
files = []
for p in download_dir.iterdir():
if not p.is_file():
continue
if p.name.endswith(PARTIAL_SUFFIXES):
continue
files.append(p)
return files
def has_partial_files(download_dir: Path) -> bool:
if not download_dir.exists():
return False
for p in download_dir.iterdir():
if p.is_file() and p.name.endswith(PARTIAL_SUFFIXES):
return True
return False
def wait_for_new_download(
download_dir: Path,
before_names: set,
timeout: int,
check_interval: float = 1.0,
stable_polls_required: int = 2,
) -> Optional[Path]:
"""
Waits for a new completed file whose size is stable for several polls.
"""
end = time.time() + timeout
size_state: Dict[str, Tuple[int, int]] = {}
while time.time() < end:
completed = list_completed_files(download_dir)
new_completed = [p for p in completed if p.name not in before_names]
if new_completed:
# Prefer most recently modified file.
new_completed.sort(key=lambda p: p.stat().st_mtime, reverse=True)
for p in new_completed:
try:
current_size = p.stat().st_size
except FileNotFoundError:
continue
prev_size, stable_count = size_state.get(str(p), (-1, 0))
if current_size == prev_size and current_size > 0:
stable_count += 1
else:
stable_count = 0
size_state[str(p)] = (current_size, stable_count)
if stable_count >= stable_polls_required and not has_partial_files(download_dir):
return p
time.sleep(check_interval)
return None
def move_to_hash_folder(
temp_file: Path,
internal_hash: str,
final_download_root: Path,
) -> Path:
hash_dir = ensure_dir(final_download_root / internal_hash)
original_name = safe_filename(temp_file.name)
dest = hash_dir / original_name
if dest.exists():
stem = dest.stem
suffix = dest.suffix
counter = 2
while True:
candidate = hash_dir / f"{stem}__{counter}{suffix}"
if not candidate.exists():
dest = candidate
break
counter += 1
shutil.move(str(temp_file), str(dest))
return dest.resolve()
# ============================================================
# Optional GPX tagging helper
# ============================================================
def add_internal_hash_to_gpx_file(gpx_path: Path, internal_hash: str) -> bool:
"""
Optional helper for plain .gpx files.
This is intentionally conservative and only handles XML GPX files directly.
It does NOT modify .gdb, because Garmin GDB is binary and should not be patched
by raw string insertion.
Current downloader does not call this automatically.
Keep the original file clean; tag in a later controlled post-processing stage.
"""
import xml.etree.ElementTree as ET
try:
tree = ET.parse(str(gpx_path))
root = tree.getroot()
ns_match = re.match(r"\{(.+?)\}", root.tag)
ns = ns_match.group(1) if ns_match else ""
prefix = f"{{{ns}}}" if ns else ""
metadata = root.find(f"{prefix}metadata")
if metadata is None:
metadata = ET.Element(f"{prefix}metadata")
root.insert(0, metadata)
extensions = metadata.find(f"{prefix}extensions")
if extensions is None:
extensions = ET.SubElement(metadata, f"{prefix}extensions")
tag = ET.SubElement(extensions, "source_registry_hash")
tag.text = internal_hash
tree.write(str(gpx_path), encoding="utf-8", xml_declaration=True)
return True
except Exception:
return False
# ============================================================
# One URL processing
# ============================================================
def process_one_url(
driver: webdriver.Firefox,
url_item: dict,
cfg: WorkerConfig,
temp_download_dir: Path,
final_download_root: Path,
) -> dict:
url = url_item["url"]
input_index = url_item["input_index"]
internal_hash = make_internal_hash(url)
started_at = utc_now_iso()
record = {
"success": False,
"status": "started",
"error": "",
"worker_id": cfg.worker_id,
"input_index": input_index,
"url": url,
"normalized_url": normalize_url(url),
"internal_hash": internal_hash,
"downloaded_filepath": None,
"download_original_filename": None,
"download_file_sha256": None,
"from_remote_metadata": None,
"started_at_utc": started_at,
"finished_at_utc": None,
}
try:
clear_partial_downloads(temp_download_dir)
before_names = {p.name for p in list_completed_files(temp_download_dir)}
driver.get(url)
wait_for_page_load(driver, timeout=cfg.page_load_timeout)
metadata = extract_jd_summary_metadata(driver)
record["from_remote_metadata"] = metadata
accept_license_if_present(driver, timeout=cfg.element_timeout)
clicked = click_download_button(driver, timeout=cfg.element_timeout)
if not clicked:
record["status"] = "download_button_not_found_or_not_clickable"
record["finished_at_utc"] = utc_now_iso()
return record
downloaded_temp_file = wait_for_new_download(
temp_download_dir,
before_names=before_names,
timeout=cfg.download_timeout,
)
if downloaded_temp_file is None:
record["status"] = "download_timeout"
record["finished_at_utc"] = utc_now_iso()
return record
file_sha256 = sha256_file(downloaded_temp_file)
original_filename = downloaded_temp_file.name
final_path = move_to_hash_folder(
temp_file=downloaded_temp_file,
internal_hash=internal_hash,
final_download_root=final_download_root,
)
# Sidecar metadata next to file for direct local browsing.
sidecar_path = final_path.with_suffix(final_path.suffix + ".source.json")
sidecar = {
"url": url,
"normalized_url": normalize_url(url),
"internal_hash": internal_hash,
"downloaded_filepath": str(final_path),
"download_original_filename": original_filename,
"download_file_sha256": file_sha256,
"from_remote_metadata": metadata,
"created_at_utc": utc_now_iso(),
}
sidecar_path.write_text(
json.dumps(sidecar, ensure_ascii=False, indent=2, sort_keys=True),
encoding="utf-8",
)
record["success"] = True
record["status"] = "downloaded"
record["downloaded_filepath"] = str(final_path)
record["download_original_filename"] = original_filename
record["download_file_sha256"] = file_sha256
record["finished_at_utc"] = utc_now_iso()
return record
except WebDriverException as e:
record["status"] = "webdriver_error"
record["error"] = str(e)
record["finished_at_utc"] = utc_now_iso()
return record
except Exception as e:
record["status"] = "unexpected_error"
record["error"] = str(e) + "\n" + traceback.format_exc()
record["finished_at_utc"] = utc_now_iso()
return record
# ============================================================
# Worker
# ============================================================
def worker_main(url_items: List[dict], cfg_dict: dict) -> dict:
cfg = WorkerConfig(**cfg_dict)
workdir = Path(cfg.workdir)
output_dir = Path(cfg.output_dir)
final_download_root = Path(cfg.download_root)
temp_download_dir = ensure_dir(workdir / f"worker_{cfg.worker_id:02d}" / "temp_downloads")
worker_jsonl = output_dir / f"worker_{cfg.worker_id:02d}.jsonl"
result_summary = {
"worker_id": cfg.worker_id,
"total": len(url_items),
"success": 0,
"failed": 0,
"jsonl": str(worker_jsonl),
}
driver = None
try:
driver = build_firefox_driver(cfg, temp_download_dir)
for n, url_item in enumerate(url_items, 1):
print(
f"[worker {cfg.worker_id:02d}] [{n}/{len(url_items)}] "
f"input_index={url_item['input_index']} {url_item['url']}",
flush=True,
)
record = process_one_url(
driver=driver,
url_item=url_item,
cfg=cfg,
temp_download_dir=temp_download_dir,
final_download_root=final_download_root,
)
write_jsonl_line(worker_jsonl, record)
if record.get("success"):
result_summary["success"] += 1
print(
f"[worker {cfg.worker_id:02d}] OK {record['internal_hash']} "
f"{record.get('downloaded_filepath')}",
flush=True,
)
else:
result_summary["failed"] += 1
print(
f"[worker {cfg.worker_id:02d}] FAIL {record['internal_hash']} "
f"{record.get('status')}",
flush=True,
)
if cfg.delay_between_urls > 0:
time.sleep(cfg.delay_between_urls)
finally:
if driver is not None:
try:
driver.quit()
except Exception:
pass
return result_summary
# ============================================================
# Registry combining
# ============================================================
def load_success_seen(output_dir: Path) -> set:
"""
Used for resume.
Only successful URLs are skipped. Failed URLs are retried.
"""
seen = set()
paths = []
combined = output_dir / "source_registry.jsonl"
if combined.exists():
paths.append(combined)
paths.extend(sorted(output_dir.glob("worker_*.jsonl")))
for path in paths:
for r in read_jsonl(path):
if r.get("success") and r.get("url"):
seen.add(normalize_url(r["url"]))
return seen
def collect_worker_records(output_dir: Path) -> List[dict]:
records = []
for path in sorted(output_dir.glob("worker_*.jsonl")):
records.extend(read_jsonl(path))
# De-duplicate by normalized_url.
# Prefer latest successful record; otherwise keep latest record.
best_by_url: Dict[str, dict] = {}
for r in records:
url = r.get("url")
if not url:
continue
key = normalize_url(url)
old = best_by_url.get(key)
if old is None:
best_by_url[key] = r
continue
old_success = bool(old.get("success"))
new_success = bool(r.get("success"))
if new_success and not old_success:
best_by_url[key] = r
continue
if new_success == old_success:
old_finished = old.get("finished_at_utc") or ""
new_finished = r.get("finished_at_utc") or ""
if new_finished >= old_finished:
best_by_url[key] = r
final_records = list(best_by_url.values())
final_records.sort(key=lambda r: int(r.get("input_index", 0)))
return final_records
def write_combined_outputs(output_dir: Path) -> None:
ensure_dir(output_dir)
records = collect_worker_records(output_dir)
jsonl_path = output_dir / "source_registry.jsonl"
csv_path = output_dir / "source_registry.csv"
md_path = output_dir / "source_registry.md"
with jsonl_path.open("w", encoding="utf-8", newline="\n") as f:
for r in records:
f.write(json.dumps(r, ensure_ascii=False, sort_keys=True) + "\n")
csv_fields = [
"input_index",
"success",
"status",
"internal_hash",
"url",
"downloaded_filepath",
"download_original_filename",
"download_file_sha256",
"from_remote_metadata",
"error",
"started_at_utc",
"finished_at_utc",
]
with csv_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=csv_fields)
writer.writeheader()
for r in records:
row = {}
for field in csv_fields:
value = r.get(field)
if field == "from_remote_metadata":
value = json_dumps_compact(value)
row[field] = value
writer.writerow(row)
with md_path.open("w", encoding="utf-8", newline="\n") as f:
f.write("| internal_hash | success | status | url | downloaded_filepath | details |\n")
f.write("|---|---:|---|---|---|---|\n")
for r in records:
meta = r.get("from_remote_metadata") or {}
details_parts = []
title = truncate_text(meta.get("page_title", ""), 100)
if title:
details_parts.append(f"title: {title}")
items = meta.get("jd_summary_items") or []
if items:
first_text = truncate_text(items[0].get("text", ""), 180)
if first_text:
details_parts.append(first_text)
details = "<br>".join(details_parts)
details = details.replace("|", "\\|")
f.write(
"| {internal_hash} | {success} | {status} | {url} | {path} | {details} |\n".format(
internal_hash=r.get("internal_hash", ""),
success=str(bool(r.get("success"))),
status=str(r.get("status", "")).replace("|", "\\|"),
url=str(r.get("url", "")).replace("|", "\\|"),
path=str(r.get("downloaded_filepath") or "").replace("|", "\\|"),
details=details,
)
)
print(f"\nCombined registry written:")
print(f" JSONL: {jsonl_path}")
print(f" CSV: {csv_path}")
print(f" MD: {md_path}")
# ============================================================
# Input / args
# ============================================================
def parse_proxy(proxy: Optional[str]) -> Tuple[Optional[str], Optional[int]]:
if not proxy:
return None, None
proxy = proxy.strip()
if "://" in proxy:
proxy = proxy.split("://", 1)[1]
if ":" not in proxy:
raise ValueError("Proxy must be in host:port format, example: 192.168.0.38:1080")
host, port_s = proxy.rsplit(":", 1)
return host.strip(), int(port_s.strip())
def load_urls(input_path: Path, start: int, limit: Optional[int]) -> List[dict]:
urls = []
with input_path.open("r", encoding="utf-8") as f:
for idx, line in enumerate(f):
url = line.strip()
if not url:
continue
urls.append({"input_index": idx, "url": url})
sliced = urls[start:]
if limit is not None:
sliced = sliced[:limit]
return sliced
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Parallel Firefox downloader with jd_summary_list registry capture."
)
p.add_argument("--input", default="url-final.txt", help="Input URL file, one URL per line.")
p.add_argument("--start", type=int, default=0, help="Start offset in input file. Example: 780")
p.add_argument("--limit", type=int, default=None, help="Optional max URLs to process.")
p.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="Firefox worker count.")
p.add_argument("--proxy", default=None, help="SOCKS5 proxy as host:port, example 192.168.0.38:1080")
p.add_argument("--headless", action="store_true", help="Run Firefox headless.")
p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
p.add_argument("--download-root", default=DEFAULT_DOWNLOAD_ROOT)
p.add_argument("--workdir", default=DEFAULT_WORKDIR)
p.add_argument("--page-load-timeout", type=int, default=25)
p.add_argument("--element-timeout", type=int, default=7)
p.add_argument("--download-timeout", type=int, default=60)
p.add_argument("--delay", type=float, default=2.0, help="Delay between URLs per worker.")
p.add_argument(
"--user-agent",
default=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) "
"Gecko/20100101 Firefox/126.0"
),
)
p.add_argument(
"--no-resume",
action="store_true",
help="Do not skip URLs already successful in previous registry files.",
)
p.add_argument(
"--combine-only",
action="store_true",
help="Only combine existing worker_*.jsonl files into source_registry.* outputs.",
)
return p
# ============================================================
# Main
# ============================================================
def main() -> int:
args = build_arg_parser().parse_args()
input_path = Path(args.input)
output_dir = ensure_dir(Path(args.output_dir))
download_root = ensure_dir(Path(args.download_root))
workdir = ensure_dir(Path(args.workdir))
if args.combine_only:
write_combined_outputs(output_dir)
return 0
if not input_path.exists():
print(f"Input file not found: {input_path}", file=sys.stderr)
return 2
proxy_host, proxy_port = parse_proxy(args.proxy)
url_items = load_urls(input_path=input_path, start=args.start, limit=args.limit)
if not args.no_resume:
seen_success = load_success_seen(output_dir)
before_count = len(url_items)
url_items = [
item for item in url_items
if normalize_url(item["url"]) not in seen_success
]
skipped = before_count - len(url_items)
else:
skipped = 0
if not url_items:
print("No URLs to process.")
if skipped:
print(f"Skipped already-successful URLs: {skipped}")
write_combined_outputs(output_dir)
return 0
workers = max(1, min(int(args.workers), len(url_items)))
chunks = chunk_evenly(url_items, workers)
print("=== CONFIG ===")
print(f"Input: {input_path}")
print(f"Start offset: {args.start}")
print(f"URLs to run: {len(url_items)}")
print(f"Skipped resume: {skipped}")
print(f"Workers: {workers}")
print(f"Proxy: {proxy_host}:{proxy_port}" if proxy_host else "Proxy: none")
print(f"Headless: {args.headless}")
print(f"Output dir: {output_dir.resolve()}")
print(f"Download root: {download_root.resolve()}")
print("==============\n")
cfgs = []
for worker_id, _chunk in enumerate(chunks):
cfg = WorkerConfig(
worker_id=worker_id,
proxy_host=proxy_host,
proxy_port=proxy_port,
headless=bool(args.headless),
page_load_timeout=int(args.page_load_timeout),
element_timeout=int(args.element_timeout),
download_timeout=int(args.download_timeout),
delay_between_urls=float(args.delay),
workdir=str(workdir),
output_dir=str(output_dir),
download_root=str(download_root),
user_agent=str(args.user_agent),
)
cfgs.append(cfg)
total_success = 0
total_failed = 0
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = []
for chunk, cfg in zip(chunks, cfgs):
futures.append(executor.submit(worker_main, chunk, cfg.__dict__))
for fut in as_completed(futures):
try:
summary = fut.result()
total_success += int(summary.get("success", 0))
total_failed += int(summary.get("failed", 0))
print(
f"\nWorker {summary.get('worker_id')} finished: "
f"success={summary.get('success')} failed={summary.get('failed')}"
)
except Exception as e:
total_failed += 1
print(f"[FATAL] Worker crashed: {e}", file=sys.stderr)
traceback.print_exc()
write_combined_outputs(output_dir)
print("\n=== FINISHED ===")
print(f"Success this run: {total_success}")
print(f"Failed this run: {total_failed}")
print(f"Registry ID form: R_<16 hex chars from normalized URL SHA256>")
print("For GDB files, do not raw-patch the binary. Use the registry or a controlled converter/post-processor.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Run it like this:
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080
Headless:
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless
Important behavior:
- Each worker has its own Firefox instance.
- Each worker has its own temporary download directory.
- Final files are moved into:
downloads_by_hash/
R_ABCDEF1234567890/
downloaded_file.gpx
downloaded_file.gpx.source.json
- Main registry files are:
output/source_registry.jsonl
output/source_registry.csv
output/source_registry.md
For GPX/GDB injection later:
- GPX can safely receive a metadata extension like:
<source_registry_hash>R_ABCDEF1234567890</source_registry_hash>
- GDB should not be patched as raw binary. Better flow is: keep the registry + sidecar, or convert GDB to GPX with GPSBabel, inject the hash into GPX metadata, and keep the original GDB mapped through
source_registry.csv/jsonl.