Save this as **`jd_parallel_downloader.py`**. It uses **3–4 separate Firefox instances**, one per worker, captures `div.jd_summary_list`, downloads the file, and writes a crash-safe registry with: `url`, `downloaded_filepath`, `from_remote_metadata`, `internal_hash` It also stores `download_file_sha256` separately. ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Parallel Selenium Firefox downloader with source registry mapping. Main outputs: output/source_registry.jsonl # crash-safe full registry output/source_registry.csv # combined table output/source_registry.md # markdown mapping table downloads_by_hash/R_/... # downloaded files grouped by internal hash Run example: python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 Headless: python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless Resume behavior: - Successful URLs already present in output/source_registry.jsonl are skipped. - Failed URLs are retried. """ from __future__ import annotations import argparse import csv import hashlib import json import os import re import shutil import sys import time import traceback from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Dict, Iterable, List, Optional, Sequence, Tuple from urllib.parse import urldefrag, urlsplit, urlunsplit from selenium import webdriver from selenium.common.exceptions import ( ElementClickInterceptedException, JavascriptException, NoSuchElementException, TimeoutException, WebDriverException, ) from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options as FirefoxOptions from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait # ============================================================ # Defaults # ============================================================ DEFAULT_OUTPUT_DIR = "output" DEFAULT_DOWNLOAD_ROOT = "downloads_by_hash" DEFAULT_WORKDIR = "worker_runtime" DEFAULT_WORKERS = 4 PARTIAL_SUFFIXES = ( ".part", ".crdownload", ".tmp", ) DOWNLOAD_MIME_TYPES = ",".join( [ "application/octet-stream", "application/x-garmin-gdb", "application/gdb", "application/gpx+xml", "application/xml", "text/xml", "text/plain", "application/zip", "application/x-zip-compressed", "application/x-gzip", "application/gzip", "application/vnd.google-earth.kml+xml", "application/vnd.google-earth.kmz", "application/x-msdownload", "binary/octet-stream", ] ) # ============================================================ # Small helpers # ============================================================ def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def ensure_dir(path: Path) -> Path: path.mkdir(parents=True, exist_ok=True) return path def normalize_url(url: str) -> str: """ Normalizes the URL enough to make stable deterministic IDs. Keeps query string because download pages often identify resources through query params. Removes fragment because it normally does not affect server-side download identity. """ url = url.strip() url, _fragment = urldefrag(url) parts = urlsplit(url) scheme = parts.scheme.lower() netloc = parts.netloc.lower() return urlunsplit((scheme, netloc, parts.path, parts.query, "")) def make_internal_hash(url: str) -> str: """ Stable registry ID. Important: this hash is based on the normalized source URL, not on file contents. That means you can later inject R_ into GPX metadata without invalidating the mapping ID itself. """ normalized = normalize_url(url) digest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16].upper() return f"R_{digest}" def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str: h = hashlib.sha256() with path.open("rb") as f: while True: block = f.read(chunk_size) if not block: break h.update(block) return h.hexdigest() def safe_filename(name: str, fallback: str = "download.bin") -> str: name = name.strip() if not name: return fallback name = name.replace("\\", "_").replace("/", "_") name = re.sub(r"[\x00-\x1f\x7f]+", "_", name) name = re.sub(r'[:*?"<>|]+', "_", name) name = re.sub(r"\s+", " ", name).strip() return name or fallback def json_dumps_compact(obj) -> str: return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")) def write_jsonl_line(path: Path, record: dict) -> None: ensure_dir(path.parent) line = json.dumps(record, ensure_ascii=False, sort_keys=True) with path.open("a", encoding="utf-8", newline="\n") as f: f.write(line + "\n") f.flush() os.fsync(f.fileno()) def read_jsonl(path: Path) -> List[dict]: if not path.exists(): return [] records = [] with path.open("r", encoding="utf-8") as f: for line_no, line in enumerate(f, 1): line = line.strip() if not line: continue try: records.append(json.loads(line)) except Exception: print(f"[WARN] Broken JSONL line ignored: {path}:{line_no}", file=sys.stderr) return records def chunk_evenly(items: Sequence[dict], workers: int) -> List[List[dict]]: chunks = [[] for _ in range(workers)] for i, item in enumerate(items): chunks[i % workers].append(item) return [c for c in chunks if c] def truncate_text(text: str, max_len: int = 350) -> str: text = re.sub(r"\s+", " ", text or "").strip() if len(text) <= max_len: return text return text[: max_len - 3] + "..." # ============================================================ # Firefox setup # ============================================================ @dataclass class WorkerConfig: worker_id: int proxy_host: Optional[str] proxy_port: Optional[int] headless: bool page_load_timeout: int element_timeout: int download_timeout: int delay_between_urls: float workdir: str output_dir: str download_root: str user_agent: str def build_firefox_driver(cfg: WorkerConfig, temp_download_dir: Path) -> webdriver.Firefox: ensure_dir(temp_download_dir) options = FirefoxOptions() options.page_load_strategy = "eager" if cfg.headless: options.add_argument("--headless") # UA if cfg.user_agent: options.set_preference("general.useragent.override", cfg.user_agent) # Proxy if cfg.proxy_host and cfg.proxy_port: options.set_preference("network.proxy.type", 1) options.set_preference("network.proxy.socks", cfg.proxy_host) options.set_preference("network.proxy.socks_port", int(cfg.proxy_port)) options.set_preference("network.proxy.socks_version", 5) options.set_preference("network.proxy.socks_remote_dns", True) # Downloads options.set_preference("browser.download.folderList", 2) options.set_preference("browser.download.dir", str(temp_download_dir.resolve())) options.set_preference("browser.download.useDownloadDir", True) options.set_preference("browser.download.manager.showWhenStarting", False) options.set_preference("browser.download.alwaysOpenPanel", False) options.set_preference("browser.helperApps.neverAsk.saveToDisk", DOWNLOAD_MIME_TYPES) options.set_preference("browser.helperApps.neverAsk.openFile", DOWNLOAD_MIME_TYPES) options.set_preference("pdfjs.disabled", True) # Speed / lighter pages options.set_preference("permissions.default.image", 2) options.set_preference("gfx.downloadable_fonts.enabled", False) options.set_preference("media.autoplay.default", 5) options.set_preference("dom.ipc.plugins.enabled.libflashplayer.so", False) # Reduce automation noise; not guaranteed stealth, just less obvious. options.set_preference("dom.webdriver.enabled", False) options.set_preference("useAutomationExtension", False) driver = webdriver.Firefox(options=options) driver.set_page_load_timeout(cfg.page_load_timeout) return driver # ============================================================ # Selenium page logic # ============================================================ def wait_for_page_load(driver: webdriver.Firefox, timeout: int = 20) -> None: try: WebDriverWait(driver, timeout).until( lambda d: d.execute_script("return document.readyState") in ("interactive", "complete") ) except TimeoutException: pass def click_element(driver, element) -> bool: try: element.click() return True except ElementClickInterceptedException: try: driver.execute_script("arguments[0].click();", element) return True except JavascriptException: return False except Exception: try: driver.execute_script("arguments[0].click();", element) return True except Exception: return False def extract_jd_summary_metadata(driver: webdriver.Firefox) -> Dict: """ Captures div.jd_summary_list. Usually there is one, but this supports multiple. """ metadata = { "page_title": "", "jd_summary_count": 0, "jd_summary_items": [], } try: metadata["page_title"] = driver.title or "" except Exception: pass try: elements = driver.find_elements(By.CSS_SELECTOR, "div.jd_summary_list") except Exception: elements = [] metadata["jd_summary_count"] = len(elements) for idx, el in enumerate(elements): item = { "index": idx, "text": "", "html": "", "links": [], } try: item["text"] = (el.text or "").strip() except Exception: pass try: item["html"] = el.get_attribute("innerHTML") or "" except Exception: pass try: links = el.find_elements(By.CSS_SELECTOR, "a") for a in links: try: item["links"].append( { "text": (a.text or "").strip(), "href": a.get_attribute("href") or "", } ) except Exception: continue except Exception: pass metadata["jd_summary_items"].append(item) return metadata def accept_license_if_present(driver: webdriver.Firefox, timeout: int) -> bool: wait = WebDriverWait(driver, timeout) try: checkbox = wait.until(EC.presence_of_element_located((By.NAME, "license_agree"))) except TimeoutException: return False try: if checkbox.is_enabled() and checkbox.is_displayed() and not checkbox.is_selected(): return click_element(driver, checkbox) except Exception: return False return False def click_download_button(driver: webdriver.Firefox, timeout: int) -> bool: wait = WebDriverWait(driver, timeout) selectors = [ (By.ID, "jd_license_submit"), (By.CSS_SELECTOR, "#jd_license_submit"), (By.CSS_SELECTOR, "input[type='submit']"), (By.CSS_SELECTOR, "button[type='submit']"), (By.CSS_SELECTOR, "a[href*='download']"), ] for by, selector in selectors: try: btn = wait.until(EC.element_to_be_clickable((by, selector))) if click_element(driver, btn): return True except TimeoutException: continue except Exception: continue return False # ============================================================ # Download detection # ============================================================ def clear_partial_downloads(download_dir: Path) -> None: if not download_dir.exists(): return for path in download_dir.iterdir(): if path.is_file() and path.name.endswith(PARTIAL_SUFFIXES): try: path.unlink() except Exception: pass def list_completed_files(download_dir: Path) -> List[Path]: if not download_dir.exists(): return [] files = [] for p in download_dir.iterdir(): if not p.is_file(): continue if p.name.endswith(PARTIAL_SUFFIXES): continue files.append(p) return files def has_partial_files(download_dir: Path) -> bool: if not download_dir.exists(): return False for p in download_dir.iterdir(): if p.is_file() and p.name.endswith(PARTIAL_SUFFIXES): return True return False def wait_for_new_download( download_dir: Path, before_names: set, timeout: int, check_interval: float = 1.0, stable_polls_required: int = 2, ) -> Optional[Path]: """ Waits for a new completed file whose size is stable for several polls. """ end = time.time() + timeout size_state: Dict[str, Tuple[int, int]] = {} while time.time() < end: completed = list_completed_files(download_dir) new_completed = [p for p in completed if p.name not in before_names] if new_completed: # Prefer most recently modified file. new_completed.sort(key=lambda p: p.stat().st_mtime, reverse=True) for p in new_completed: try: current_size = p.stat().st_size except FileNotFoundError: continue prev_size, stable_count = size_state.get(str(p), (-1, 0)) if current_size == prev_size and current_size > 0: stable_count += 1 else: stable_count = 0 size_state[str(p)] = (current_size, stable_count) if stable_count >= stable_polls_required and not has_partial_files(download_dir): return p time.sleep(check_interval) return None def move_to_hash_folder( temp_file: Path, internal_hash: str, final_download_root: Path, ) -> Path: hash_dir = ensure_dir(final_download_root / internal_hash) original_name = safe_filename(temp_file.name) dest = hash_dir / original_name if dest.exists(): stem = dest.stem suffix = dest.suffix counter = 2 while True: candidate = hash_dir / f"{stem}__{counter}{suffix}" if not candidate.exists(): dest = candidate break counter += 1 shutil.move(str(temp_file), str(dest)) return dest.resolve() # ============================================================ # Optional GPX tagging helper # ============================================================ def add_internal_hash_to_gpx_file(gpx_path: Path, internal_hash: str) -> bool: """ Optional helper for plain .gpx files. This is intentionally conservative and only handles XML GPX files directly. It does NOT modify .gdb, because Garmin GDB is binary and should not be patched by raw string insertion. Current downloader does not call this automatically. Keep the original file clean; tag in a later controlled post-processing stage. """ import xml.etree.ElementTree as ET try: tree = ET.parse(str(gpx_path)) root = tree.getroot() ns_match = re.match(r"\{(.+?)\}", root.tag) ns = ns_match.group(1) if ns_match else "" prefix = f"{{{ns}}}" if ns else "" metadata = root.find(f"{prefix}metadata") if metadata is None: metadata = ET.Element(f"{prefix}metadata") root.insert(0, metadata) extensions = metadata.find(f"{prefix}extensions") if extensions is None: extensions = ET.SubElement(metadata, f"{prefix}extensions") tag = ET.SubElement(extensions, "source_registry_hash") tag.text = internal_hash tree.write(str(gpx_path), encoding="utf-8", xml_declaration=True) return True except Exception: return False # ============================================================ # One URL processing # ============================================================ def process_one_url( driver: webdriver.Firefox, url_item: dict, cfg: WorkerConfig, temp_download_dir: Path, final_download_root: Path, ) -> dict: url = url_item["url"] input_index = url_item["input_index"] internal_hash = make_internal_hash(url) started_at = utc_now_iso() record = { "success": False, "status": "started", "error": "", "worker_id": cfg.worker_id, "input_index": input_index, "url": url, "normalized_url": normalize_url(url), "internal_hash": internal_hash, "downloaded_filepath": None, "download_original_filename": None, "download_file_sha256": None, "from_remote_metadata": None, "started_at_utc": started_at, "finished_at_utc": None, } try: clear_partial_downloads(temp_download_dir) before_names = {p.name for p in list_completed_files(temp_download_dir)} driver.get(url) wait_for_page_load(driver, timeout=cfg.page_load_timeout) metadata = extract_jd_summary_metadata(driver) record["from_remote_metadata"] = metadata accept_license_if_present(driver, timeout=cfg.element_timeout) clicked = click_download_button(driver, timeout=cfg.element_timeout) if not clicked: record["status"] = "download_button_not_found_or_not_clickable" record["finished_at_utc"] = utc_now_iso() return record downloaded_temp_file = wait_for_new_download( temp_download_dir, before_names=before_names, timeout=cfg.download_timeout, ) if downloaded_temp_file is None: record["status"] = "download_timeout" record["finished_at_utc"] = utc_now_iso() return record file_sha256 = sha256_file(downloaded_temp_file) original_filename = downloaded_temp_file.name final_path = move_to_hash_folder( temp_file=downloaded_temp_file, internal_hash=internal_hash, final_download_root=final_download_root, ) # Sidecar metadata next to file for direct local browsing. sidecar_path = final_path.with_suffix(final_path.suffix + ".source.json") sidecar = { "url": url, "normalized_url": normalize_url(url), "internal_hash": internal_hash, "downloaded_filepath": str(final_path), "download_original_filename": original_filename, "download_file_sha256": file_sha256, "from_remote_metadata": metadata, "created_at_utc": utc_now_iso(), } sidecar_path.write_text( json.dumps(sidecar, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8", ) record["success"] = True record["status"] = "downloaded" record["downloaded_filepath"] = str(final_path) record["download_original_filename"] = original_filename record["download_file_sha256"] = file_sha256 record["finished_at_utc"] = utc_now_iso() return record except WebDriverException as e: record["status"] = "webdriver_error" record["error"] = str(e) record["finished_at_utc"] = utc_now_iso() return record except Exception as e: record["status"] = "unexpected_error" record["error"] = str(e) + "\n" + traceback.format_exc() record["finished_at_utc"] = utc_now_iso() return record # ============================================================ # Worker # ============================================================ def worker_main(url_items: List[dict], cfg_dict: dict) -> dict: cfg = WorkerConfig(**cfg_dict) workdir = Path(cfg.workdir) output_dir = Path(cfg.output_dir) final_download_root = Path(cfg.download_root) temp_download_dir = ensure_dir(workdir / f"worker_{cfg.worker_id:02d}" / "temp_downloads") worker_jsonl = output_dir / f"worker_{cfg.worker_id:02d}.jsonl" result_summary = { "worker_id": cfg.worker_id, "total": len(url_items), "success": 0, "failed": 0, "jsonl": str(worker_jsonl), } driver = None try: driver = build_firefox_driver(cfg, temp_download_dir) for n, url_item in enumerate(url_items, 1): print( f"[worker {cfg.worker_id:02d}] [{n}/{len(url_items)}] " f"input_index={url_item['input_index']} {url_item['url']}", flush=True, ) record = process_one_url( driver=driver, url_item=url_item, cfg=cfg, temp_download_dir=temp_download_dir, final_download_root=final_download_root, ) write_jsonl_line(worker_jsonl, record) if record.get("success"): result_summary["success"] += 1 print( f"[worker {cfg.worker_id:02d}] OK {record['internal_hash']} " f"{record.get('downloaded_filepath')}", flush=True, ) else: result_summary["failed"] += 1 print( f"[worker {cfg.worker_id:02d}] FAIL {record['internal_hash']} " f"{record.get('status')}", flush=True, ) if cfg.delay_between_urls > 0: time.sleep(cfg.delay_between_urls) finally: if driver is not None: try: driver.quit() except Exception: pass return result_summary # ============================================================ # Registry combining # ============================================================ def load_success_seen(output_dir: Path) -> set: """ Used for resume. Only successful URLs are skipped. Failed URLs are retried. """ seen = set() paths = [] combined = output_dir / "source_registry.jsonl" if combined.exists(): paths.append(combined) paths.extend(sorted(output_dir.glob("worker_*.jsonl"))) for path in paths: for r in read_jsonl(path): if r.get("success") and r.get("url"): seen.add(normalize_url(r["url"])) return seen def collect_worker_records(output_dir: Path) -> List[dict]: records = [] for path in sorted(output_dir.glob("worker_*.jsonl")): records.extend(read_jsonl(path)) # De-duplicate by normalized_url. # Prefer latest successful record; otherwise keep latest record. best_by_url: Dict[str, dict] = {} for r in records: url = r.get("url") if not url: continue key = normalize_url(url) old = best_by_url.get(key) if old is None: best_by_url[key] = r continue old_success = bool(old.get("success")) new_success = bool(r.get("success")) if new_success and not old_success: best_by_url[key] = r continue if new_success == old_success: old_finished = old.get("finished_at_utc") or "" new_finished = r.get("finished_at_utc") or "" if new_finished >= old_finished: best_by_url[key] = r final_records = list(best_by_url.values()) final_records.sort(key=lambda r: int(r.get("input_index", 0))) return final_records def write_combined_outputs(output_dir: Path) -> None: ensure_dir(output_dir) records = collect_worker_records(output_dir) jsonl_path = output_dir / "source_registry.jsonl" csv_path = output_dir / "source_registry.csv" md_path = output_dir / "source_registry.md" with jsonl_path.open("w", encoding="utf-8", newline="\n") as f: for r in records: f.write(json.dumps(r, ensure_ascii=False, sort_keys=True) + "\n") csv_fields = [ "input_index", "success", "status", "internal_hash", "url", "downloaded_filepath", "download_original_filename", "download_file_sha256", "from_remote_metadata", "error", "started_at_utc", "finished_at_utc", ] with csv_path.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=csv_fields) writer.writeheader() for r in records: row = {} for field in csv_fields: value = r.get(field) if field == "from_remote_metadata": value = json_dumps_compact(value) row[field] = value writer.writerow(row) with md_path.open("w", encoding="utf-8", newline="\n") as f: f.write("| internal_hash | success | status | url | downloaded_filepath | details |\n") f.write("|---|---:|---|---|---|---|\n") for r in records: meta = r.get("from_remote_metadata") or {} details_parts = [] title = truncate_text(meta.get("page_title", ""), 100) if title: details_parts.append(f"title: {title}") items = meta.get("jd_summary_items") or [] if items: first_text = truncate_text(items[0].get("text", ""), 180) if first_text: details_parts.append(first_text) details = "
".join(details_parts) details = details.replace("|", "\\|") f.write( "| {internal_hash} | {success} | {status} | {url} | {path} | {details} |\n".format( internal_hash=r.get("internal_hash", ""), success=str(bool(r.get("success"))), status=str(r.get("status", "")).replace("|", "\\|"), url=str(r.get("url", "")).replace("|", "\\|"), path=str(r.get("downloaded_filepath") or "").replace("|", "\\|"), details=details, ) ) print(f"\nCombined registry written:") print(f" JSONL: {jsonl_path}") print(f" CSV: {csv_path}") print(f" MD: {md_path}") # ============================================================ # Input / args # ============================================================ def parse_proxy(proxy: Optional[str]) -> Tuple[Optional[str], Optional[int]]: if not proxy: return None, None proxy = proxy.strip() if "://" in proxy: proxy = proxy.split("://", 1)[1] if ":" not in proxy: raise ValueError("Proxy must be in host:port format, example: 192.168.0.38:1080") host, port_s = proxy.rsplit(":", 1) return host.strip(), int(port_s.strip()) def load_urls(input_path: Path, start: int, limit: Optional[int]) -> List[dict]: urls = [] with input_path.open("r", encoding="utf-8") as f: for idx, line in enumerate(f): url = line.strip() if not url: continue urls.append({"input_index": idx, "url": url}) sliced = urls[start:] if limit is not None: sliced = sliced[:limit] return sliced def build_arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="Parallel Firefox downloader with jd_summary_list registry capture." ) p.add_argument("--input", default="url-final.txt", help="Input URL file, one URL per line.") p.add_argument("--start", type=int, default=0, help="Start offset in input file. Example: 780") p.add_argument("--limit", type=int, default=None, help="Optional max URLs to process.") p.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="Firefox worker count.") p.add_argument("--proxy", default=None, help="SOCKS5 proxy as host:port, example 192.168.0.38:1080") p.add_argument("--headless", action="store_true", help="Run Firefox headless.") p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR) p.add_argument("--download-root", default=DEFAULT_DOWNLOAD_ROOT) p.add_argument("--workdir", default=DEFAULT_WORKDIR) p.add_argument("--page-load-timeout", type=int, default=25) p.add_argument("--element-timeout", type=int, default=7) p.add_argument("--download-timeout", type=int, default=60) p.add_argument("--delay", type=float, default=2.0, help="Delay between URLs per worker.") p.add_argument( "--user-agent", default=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) " "Gecko/20100101 Firefox/126.0" ), ) p.add_argument( "--no-resume", action="store_true", help="Do not skip URLs already successful in previous registry files.", ) p.add_argument( "--combine-only", action="store_true", help="Only combine existing worker_*.jsonl files into source_registry.* outputs.", ) return p # ============================================================ # Main # ============================================================ def main() -> int: args = build_arg_parser().parse_args() input_path = Path(args.input) output_dir = ensure_dir(Path(args.output_dir)) download_root = ensure_dir(Path(args.download_root)) workdir = ensure_dir(Path(args.workdir)) if args.combine_only: write_combined_outputs(output_dir) return 0 if not input_path.exists(): print(f"Input file not found: {input_path}", file=sys.stderr) return 2 proxy_host, proxy_port = parse_proxy(args.proxy) url_items = load_urls(input_path=input_path, start=args.start, limit=args.limit) if not args.no_resume: seen_success = load_success_seen(output_dir) before_count = len(url_items) url_items = [ item for item in url_items if normalize_url(item["url"]) not in seen_success ] skipped = before_count - len(url_items) else: skipped = 0 if not url_items: print("No URLs to process.") if skipped: print(f"Skipped already-successful URLs: {skipped}") write_combined_outputs(output_dir) return 0 workers = max(1, min(int(args.workers), len(url_items))) chunks = chunk_evenly(url_items, workers) print("=== CONFIG ===") print(f"Input: {input_path}") print(f"Start offset: {args.start}") print(f"URLs to run: {len(url_items)}") print(f"Skipped resume: {skipped}") print(f"Workers: {workers}") print(f"Proxy: {proxy_host}:{proxy_port}" if proxy_host else "Proxy: none") print(f"Headless: {args.headless}") print(f"Output dir: {output_dir.resolve()}") print(f"Download root: {download_root.resolve()}") print("==============\n") cfgs = [] for worker_id, _chunk in enumerate(chunks): cfg = WorkerConfig( worker_id=worker_id, proxy_host=proxy_host, proxy_port=proxy_port, headless=bool(args.headless), page_load_timeout=int(args.page_load_timeout), element_timeout=int(args.element_timeout), download_timeout=int(args.download_timeout), delay_between_urls=float(args.delay), workdir=str(workdir), output_dir=str(output_dir), download_root=str(download_root), user_agent=str(args.user_agent), ) cfgs.append(cfg) total_success = 0 total_failed = 0 with ProcessPoolExecutor(max_workers=workers) as executor: futures = [] for chunk, cfg in zip(chunks, cfgs): futures.append(executor.submit(worker_main, chunk, cfg.__dict__)) for fut in as_completed(futures): try: summary = fut.result() total_success += int(summary.get("success", 0)) total_failed += int(summary.get("failed", 0)) print( f"\nWorker {summary.get('worker_id')} finished: " f"success={summary.get('success')} failed={summary.get('failed')}" ) except Exception as e: total_failed += 1 print(f"[FATAL] Worker crashed: {e}", file=sys.stderr) traceback.print_exc() write_combined_outputs(output_dir) print("\n=== FINISHED ===") print(f"Success this run: {total_success}") print(f"Failed this run: {total_failed}") print(f"Registry ID form: R_<16 hex chars from normalized URL SHA256>") print("For GDB files, do not raw-patch the binary. Use the registry or a controlled converter/post-processor.") return 0 if __name__ == "__main__": raise SystemExit(main()) ``` Run it like this: ```bash python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 ``` Headless: ```bash python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless ``` Important behavior: * Each worker has its own Firefox instance. * Each worker has its own temporary download directory. * Final files are moved into: ```text downloads_by_hash/ R_ABCDEF1234567890/ downloaded_file.gpx downloaded_file.gpx.source.json ``` * Main registry files are: ```text output/source_registry.jsonl output/source_registry.csv output/source_registry.md ``` For GPX/GDB injection later: * GPX can safely receive a metadata extension like: ```xml R_ABCDEF1234567890 ``` * GDB should not be patched as raw binary. Better flow is: keep the registry + sidecar, or convert GDB to GPX with GPSBabel, inject the hash into GPX metadata, and keep the original GDB mapped through `source_registry.csv/jsonl`.