1126 lines
34 KiB
Markdown
1126 lines
34 KiB
Markdown
Save this as **`jd_parallel_downloader.py`**.
|
||
|
||
It uses **3–4 separate Firefox instances**, one per worker, captures `div.jd_summary_list`, downloads the file, and writes a crash-safe registry with:
|
||
|
||
`url`, `downloaded_filepath`, `from_remote_metadata`, `internal_hash`
|
||
|
||
It also stores `download_file_sha256` separately.
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Parallel Selenium Firefox downloader with source registry mapping.
|
||
|
||
Main outputs:
|
||
output/source_registry.jsonl # crash-safe full registry
|
||
output/source_registry.csv # combined table
|
||
output/source_registry.md # markdown mapping table
|
||
downloads_by_hash/R_<HASH>/... # downloaded files grouped by internal hash
|
||
|
||
Run example:
|
||
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080
|
||
|
||
Headless:
|
||
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless
|
||
|
||
Resume behavior:
|
||
- Successful URLs already present in output/source_registry.jsonl are skipped.
|
||
- Failed URLs are retried.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import sys
|
||
import time
|
||
import traceback
|
||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
|
||
from urllib.parse import urldefrag, urlsplit, urlunsplit
|
||
|
||
from selenium import webdriver
|
||
from selenium.common.exceptions import (
|
||
ElementClickInterceptedException,
|
||
JavascriptException,
|
||
NoSuchElementException,
|
||
TimeoutException,
|
||
WebDriverException,
|
||
)
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.firefox.options import Options as FirefoxOptions
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
||
|
||
# ============================================================
|
||
# Defaults
|
||
# ============================================================
|
||
|
||
DEFAULT_OUTPUT_DIR = "output"
|
||
DEFAULT_DOWNLOAD_ROOT = "downloads_by_hash"
|
||
DEFAULT_WORKDIR = "worker_runtime"
|
||
DEFAULT_WORKERS = 4
|
||
|
||
PARTIAL_SUFFIXES = (
|
||
".part",
|
||
".crdownload",
|
||
".tmp",
|
||
)
|
||
|
||
DOWNLOAD_MIME_TYPES = ",".join(
|
||
[
|
||
"application/octet-stream",
|
||
"application/x-garmin-gdb",
|
||
"application/gdb",
|
||
"application/gpx+xml",
|
||
"application/xml",
|
||
"text/xml",
|
||
"text/plain",
|
||
"application/zip",
|
||
"application/x-zip-compressed",
|
||
"application/x-gzip",
|
||
"application/gzip",
|
||
"application/vnd.google-earth.kml+xml",
|
||
"application/vnd.google-earth.kmz",
|
||
"application/x-msdownload",
|
||
"binary/octet-stream",
|
||
]
|
||
)
|
||
|
||
|
||
# ============================================================
|
||
# Small helpers
|
||
# ============================================================
|
||
|
||
def utc_now_iso() -> str:
|
||
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||
|
||
|
||
def ensure_dir(path: Path) -> Path:
|
||
path.mkdir(parents=True, exist_ok=True)
|
||
return path
|
||
|
||
|
||
def normalize_url(url: str) -> str:
|
||
"""
|
||
Normalizes the URL enough to make stable deterministic IDs.
|
||
|
||
Keeps query string because download pages often identify resources through query params.
|
||
Removes fragment because it normally does not affect server-side download identity.
|
||
"""
|
||
url = url.strip()
|
||
url, _fragment = urldefrag(url)
|
||
|
||
parts = urlsplit(url)
|
||
scheme = parts.scheme.lower()
|
||
netloc = parts.netloc.lower()
|
||
|
||
return urlunsplit((scheme, netloc, parts.path, parts.query, ""))
|
||
|
||
|
||
def make_internal_hash(url: str) -> str:
|
||
"""
|
||
Stable registry ID.
|
||
|
||
Important: this hash is based on the normalized source URL, not on file contents.
|
||
That means you can later inject R_<HASH> into GPX metadata without invalidating
|
||
the mapping ID itself.
|
||
"""
|
||
normalized = normalize_url(url)
|
||
digest = hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16].upper()
|
||
return f"R_{digest}"
|
||
|
||
|
||
def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
|
||
h = hashlib.sha256()
|
||
with path.open("rb") as f:
|
||
while True:
|
||
block = f.read(chunk_size)
|
||
if not block:
|
||
break
|
||
h.update(block)
|
||
return h.hexdigest()
|
||
|
||
|
||
def safe_filename(name: str, fallback: str = "download.bin") -> str:
|
||
name = name.strip()
|
||
if not name:
|
||
return fallback
|
||
|
||
name = name.replace("\\", "_").replace("/", "_")
|
||
name = re.sub(r"[\x00-\x1f\x7f]+", "_", name)
|
||
name = re.sub(r'[:*?"<>|]+', "_", name)
|
||
name = re.sub(r"\s+", " ", name).strip()
|
||
|
||
return name or fallback
|
||
|
||
|
||
def json_dumps_compact(obj) -> str:
|
||
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||
|
||
|
||
def write_jsonl_line(path: Path, record: dict) -> None:
|
||
ensure_dir(path.parent)
|
||
line = json.dumps(record, ensure_ascii=False, sort_keys=True)
|
||
with path.open("a", encoding="utf-8", newline="\n") as f:
|
||
f.write(line + "\n")
|
||
f.flush()
|
||
os.fsync(f.fileno())
|
||
|
||
|
||
def read_jsonl(path: Path) -> List[dict]:
|
||
if not path.exists():
|
||
return []
|
||
|
||
records = []
|
||
with path.open("r", encoding="utf-8") as f:
|
||
for line_no, line in enumerate(f, 1):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
records.append(json.loads(line))
|
||
except Exception:
|
||
print(f"[WARN] Broken JSONL line ignored: {path}:{line_no}", file=sys.stderr)
|
||
return records
|
||
|
||
|
||
def chunk_evenly(items: Sequence[dict], workers: int) -> List[List[dict]]:
|
||
chunks = [[] for _ in range(workers)]
|
||
for i, item in enumerate(items):
|
||
chunks[i % workers].append(item)
|
||
return [c for c in chunks if c]
|
||
|
||
|
||
def truncate_text(text: str, max_len: int = 350) -> str:
|
||
text = re.sub(r"\s+", " ", text or "").strip()
|
||
if len(text) <= max_len:
|
||
return text
|
||
return text[: max_len - 3] + "..."
|
||
|
||
|
||
# ============================================================
|
||
# Firefox setup
|
||
# ============================================================
|
||
|
||
@dataclass
|
||
class WorkerConfig:
|
||
worker_id: int
|
||
proxy_host: Optional[str]
|
||
proxy_port: Optional[int]
|
||
headless: bool
|
||
page_load_timeout: int
|
||
element_timeout: int
|
||
download_timeout: int
|
||
delay_between_urls: float
|
||
workdir: str
|
||
output_dir: str
|
||
download_root: str
|
||
user_agent: str
|
||
|
||
|
||
def build_firefox_driver(cfg: WorkerConfig, temp_download_dir: Path) -> webdriver.Firefox:
|
||
ensure_dir(temp_download_dir)
|
||
|
||
options = FirefoxOptions()
|
||
options.page_load_strategy = "eager"
|
||
|
||
if cfg.headless:
|
||
options.add_argument("--headless")
|
||
|
||
# UA
|
||
if cfg.user_agent:
|
||
options.set_preference("general.useragent.override", cfg.user_agent)
|
||
|
||
# Proxy
|
||
if cfg.proxy_host and cfg.proxy_port:
|
||
options.set_preference("network.proxy.type", 1)
|
||
options.set_preference("network.proxy.socks", cfg.proxy_host)
|
||
options.set_preference("network.proxy.socks_port", int(cfg.proxy_port))
|
||
options.set_preference("network.proxy.socks_version", 5)
|
||
options.set_preference("network.proxy.socks_remote_dns", True)
|
||
|
||
# Downloads
|
||
options.set_preference("browser.download.folderList", 2)
|
||
options.set_preference("browser.download.dir", str(temp_download_dir.resolve()))
|
||
options.set_preference("browser.download.useDownloadDir", True)
|
||
options.set_preference("browser.download.manager.showWhenStarting", False)
|
||
options.set_preference("browser.download.alwaysOpenPanel", False)
|
||
options.set_preference("browser.helperApps.neverAsk.saveToDisk", DOWNLOAD_MIME_TYPES)
|
||
options.set_preference("browser.helperApps.neverAsk.openFile", DOWNLOAD_MIME_TYPES)
|
||
options.set_preference("pdfjs.disabled", True)
|
||
|
||
# Speed / lighter pages
|
||
options.set_preference("permissions.default.image", 2)
|
||
options.set_preference("gfx.downloadable_fonts.enabled", False)
|
||
options.set_preference("media.autoplay.default", 5)
|
||
options.set_preference("dom.ipc.plugins.enabled.libflashplayer.so", False)
|
||
|
||
# Reduce automation noise; not guaranteed stealth, just less obvious.
|
||
options.set_preference("dom.webdriver.enabled", False)
|
||
options.set_preference("useAutomationExtension", False)
|
||
|
||
driver = webdriver.Firefox(options=options)
|
||
driver.set_page_load_timeout(cfg.page_load_timeout)
|
||
return driver
|
||
|
||
|
||
# ============================================================
|
||
# Selenium page logic
|
||
# ============================================================
|
||
|
||
def wait_for_page_load(driver: webdriver.Firefox, timeout: int = 20) -> None:
|
||
try:
|
||
WebDriverWait(driver, timeout).until(
|
||
lambda d: d.execute_script("return document.readyState") in ("interactive", "complete")
|
||
)
|
||
except TimeoutException:
|
||
pass
|
||
|
||
|
||
def click_element(driver, element) -> bool:
|
||
try:
|
||
element.click()
|
||
return True
|
||
except ElementClickInterceptedException:
|
||
try:
|
||
driver.execute_script("arguments[0].click();", element)
|
||
return True
|
||
except JavascriptException:
|
||
return False
|
||
except Exception:
|
||
try:
|
||
driver.execute_script("arguments[0].click();", element)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def extract_jd_summary_metadata(driver: webdriver.Firefox) -> Dict:
|
||
"""
|
||
Captures div.jd_summary_list.
|
||
|
||
Usually there is one, but this supports multiple.
|
||
"""
|
||
metadata = {
|
||
"page_title": "",
|
||
"jd_summary_count": 0,
|
||
"jd_summary_items": [],
|
||
}
|
||
|
||
try:
|
||
metadata["page_title"] = driver.title or ""
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
elements = driver.find_elements(By.CSS_SELECTOR, "div.jd_summary_list")
|
||
except Exception:
|
||
elements = []
|
||
|
||
metadata["jd_summary_count"] = len(elements)
|
||
|
||
for idx, el in enumerate(elements):
|
||
item = {
|
||
"index": idx,
|
||
"text": "",
|
||
"html": "",
|
||
"links": [],
|
||
}
|
||
|
||
try:
|
||
item["text"] = (el.text or "").strip()
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
item["html"] = el.get_attribute("innerHTML") or ""
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
links = el.find_elements(By.CSS_SELECTOR, "a")
|
||
for a in links:
|
||
try:
|
||
item["links"].append(
|
||
{
|
||
"text": (a.text or "").strip(),
|
||
"href": a.get_attribute("href") or "",
|
||
}
|
||
)
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
metadata["jd_summary_items"].append(item)
|
||
|
||
return metadata
|
||
|
||
|
||
def accept_license_if_present(driver: webdriver.Firefox, timeout: int) -> bool:
|
||
wait = WebDriverWait(driver, timeout)
|
||
try:
|
||
checkbox = wait.until(EC.presence_of_element_located((By.NAME, "license_agree")))
|
||
except TimeoutException:
|
||
return False
|
||
|
||
try:
|
||
if checkbox.is_enabled() and checkbox.is_displayed() and not checkbox.is_selected():
|
||
return click_element(driver, checkbox)
|
||
except Exception:
|
||
return False
|
||
|
||
return False
|
||
|
||
|
||
def click_download_button(driver: webdriver.Firefox, timeout: int) -> bool:
|
||
wait = WebDriverWait(driver, timeout)
|
||
|
||
selectors = [
|
||
(By.ID, "jd_license_submit"),
|
||
(By.CSS_SELECTOR, "#jd_license_submit"),
|
||
(By.CSS_SELECTOR, "input[type='submit']"),
|
||
(By.CSS_SELECTOR, "button[type='submit']"),
|
||
(By.CSS_SELECTOR, "a[href*='download']"),
|
||
]
|
||
|
||
for by, selector in selectors:
|
||
try:
|
||
btn = wait.until(EC.element_to_be_clickable((by, selector)))
|
||
if click_element(driver, btn):
|
||
return True
|
||
except TimeoutException:
|
||
continue
|
||
except Exception:
|
||
continue
|
||
|
||
return False
|
||
|
||
|
||
# ============================================================
|
||
# Download detection
|
||
# ============================================================
|
||
|
||
def clear_partial_downloads(download_dir: Path) -> None:
|
||
if not download_dir.exists():
|
||
return
|
||
|
||
for path in download_dir.iterdir():
|
||
if path.is_file() and path.name.endswith(PARTIAL_SUFFIXES):
|
||
try:
|
||
path.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def list_completed_files(download_dir: Path) -> List[Path]:
|
||
if not download_dir.exists():
|
||
return []
|
||
|
||
files = []
|
||
for p in download_dir.iterdir():
|
||
if not p.is_file():
|
||
continue
|
||
if p.name.endswith(PARTIAL_SUFFIXES):
|
||
continue
|
||
files.append(p)
|
||
|
||
return files
|
||
|
||
|
||
def has_partial_files(download_dir: Path) -> bool:
|
||
if not download_dir.exists():
|
||
return False
|
||
|
||
for p in download_dir.iterdir():
|
||
if p.is_file() and p.name.endswith(PARTIAL_SUFFIXES):
|
||
return True
|
||
return False
|
||
|
||
|
||
def wait_for_new_download(
|
||
download_dir: Path,
|
||
before_names: set,
|
||
timeout: int,
|
||
check_interval: float = 1.0,
|
||
stable_polls_required: int = 2,
|
||
) -> Optional[Path]:
|
||
"""
|
||
Waits for a new completed file whose size is stable for several polls.
|
||
"""
|
||
end = time.time() + timeout
|
||
size_state: Dict[str, Tuple[int, int]] = {}
|
||
|
||
while time.time() < end:
|
||
completed = list_completed_files(download_dir)
|
||
new_completed = [p for p in completed if p.name not in before_names]
|
||
|
||
if new_completed:
|
||
# Prefer most recently modified file.
|
||
new_completed.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
||
|
||
for p in new_completed:
|
||
try:
|
||
current_size = p.stat().st_size
|
||
except FileNotFoundError:
|
||
continue
|
||
|
||
prev_size, stable_count = size_state.get(str(p), (-1, 0))
|
||
|
||
if current_size == prev_size and current_size > 0:
|
||
stable_count += 1
|
||
else:
|
||
stable_count = 0
|
||
|
||
size_state[str(p)] = (current_size, stable_count)
|
||
|
||
if stable_count >= stable_polls_required and not has_partial_files(download_dir):
|
||
return p
|
||
|
||
time.sleep(check_interval)
|
||
|
||
return None
|
||
|
||
|
||
def move_to_hash_folder(
|
||
temp_file: Path,
|
||
internal_hash: str,
|
||
final_download_root: Path,
|
||
) -> Path:
|
||
hash_dir = ensure_dir(final_download_root / internal_hash)
|
||
original_name = safe_filename(temp_file.name)
|
||
dest = hash_dir / original_name
|
||
|
||
if dest.exists():
|
||
stem = dest.stem
|
||
suffix = dest.suffix
|
||
counter = 2
|
||
while True:
|
||
candidate = hash_dir / f"{stem}__{counter}{suffix}"
|
||
if not candidate.exists():
|
||
dest = candidate
|
||
break
|
||
counter += 1
|
||
|
||
shutil.move(str(temp_file), str(dest))
|
||
return dest.resolve()
|
||
|
||
|
||
# ============================================================
|
||
# Optional GPX tagging helper
|
||
# ============================================================
|
||
|
||
def add_internal_hash_to_gpx_file(gpx_path: Path, internal_hash: str) -> bool:
|
||
"""
|
||
Optional helper for plain .gpx files.
|
||
|
||
This is intentionally conservative and only handles XML GPX files directly.
|
||
It does NOT modify .gdb, because Garmin GDB is binary and should not be patched
|
||
by raw string insertion.
|
||
|
||
Current downloader does not call this automatically.
|
||
Keep the original file clean; tag in a later controlled post-processing stage.
|
||
"""
|
||
import xml.etree.ElementTree as ET
|
||
|
||
try:
|
||
tree = ET.parse(str(gpx_path))
|
||
root = tree.getroot()
|
||
|
||
ns_match = re.match(r"\{(.+?)\}", root.tag)
|
||
ns = ns_match.group(1) if ns_match else ""
|
||
prefix = f"{{{ns}}}" if ns else ""
|
||
|
||
metadata = root.find(f"{prefix}metadata")
|
||
if metadata is None:
|
||
metadata = ET.Element(f"{prefix}metadata")
|
||
root.insert(0, metadata)
|
||
|
||
extensions = metadata.find(f"{prefix}extensions")
|
||
if extensions is None:
|
||
extensions = ET.SubElement(metadata, f"{prefix}extensions")
|
||
|
||
tag = ET.SubElement(extensions, "source_registry_hash")
|
||
tag.text = internal_hash
|
||
|
||
tree.write(str(gpx_path), encoding="utf-8", xml_declaration=True)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# ============================================================
|
||
# One URL processing
|
||
# ============================================================
|
||
|
||
def process_one_url(
|
||
driver: webdriver.Firefox,
|
||
url_item: dict,
|
||
cfg: WorkerConfig,
|
||
temp_download_dir: Path,
|
||
final_download_root: Path,
|
||
) -> dict:
|
||
url = url_item["url"]
|
||
input_index = url_item["input_index"]
|
||
internal_hash = make_internal_hash(url)
|
||
|
||
started_at = utc_now_iso()
|
||
|
||
record = {
|
||
"success": False,
|
||
"status": "started",
|
||
"error": "",
|
||
"worker_id": cfg.worker_id,
|
||
"input_index": input_index,
|
||
"url": url,
|
||
"normalized_url": normalize_url(url),
|
||
"internal_hash": internal_hash,
|
||
"downloaded_filepath": None,
|
||
"download_original_filename": None,
|
||
"download_file_sha256": None,
|
||
"from_remote_metadata": None,
|
||
"started_at_utc": started_at,
|
||
"finished_at_utc": None,
|
||
}
|
||
|
||
try:
|
||
clear_partial_downloads(temp_download_dir)
|
||
|
||
before_names = {p.name for p in list_completed_files(temp_download_dir)}
|
||
|
||
driver.get(url)
|
||
wait_for_page_load(driver, timeout=cfg.page_load_timeout)
|
||
|
||
metadata = extract_jd_summary_metadata(driver)
|
||
record["from_remote_metadata"] = metadata
|
||
|
||
accept_license_if_present(driver, timeout=cfg.element_timeout)
|
||
|
||
clicked = click_download_button(driver, timeout=cfg.element_timeout)
|
||
if not clicked:
|
||
record["status"] = "download_button_not_found_or_not_clickable"
|
||
record["finished_at_utc"] = utc_now_iso()
|
||
return record
|
||
|
||
downloaded_temp_file = wait_for_new_download(
|
||
temp_download_dir,
|
||
before_names=before_names,
|
||
timeout=cfg.download_timeout,
|
||
)
|
||
|
||
if downloaded_temp_file is None:
|
||
record["status"] = "download_timeout"
|
||
record["finished_at_utc"] = utc_now_iso()
|
||
return record
|
||
|
||
file_sha256 = sha256_file(downloaded_temp_file)
|
||
original_filename = downloaded_temp_file.name
|
||
|
||
final_path = move_to_hash_folder(
|
||
temp_file=downloaded_temp_file,
|
||
internal_hash=internal_hash,
|
||
final_download_root=final_download_root,
|
||
)
|
||
|
||
# Sidecar metadata next to file for direct local browsing.
|
||
sidecar_path = final_path.with_suffix(final_path.suffix + ".source.json")
|
||
sidecar = {
|
||
"url": url,
|
||
"normalized_url": normalize_url(url),
|
||
"internal_hash": internal_hash,
|
||
"downloaded_filepath": str(final_path),
|
||
"download_original_filename": original_filename,
|
||
"download_file_sha256": file_sha256,
|
||
"from_remote_metadata": metadata,
|
||
"created_at_utc": utc_now_iso(),
|
||
}
|
||
sidecar_path.write_text(
|
||
json.dumps(sidecar, ensure_ascii=False, indent=2, sort_keys=True),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
record["success"] = True
|
||
record["status"] = "downloaded"
|
||
record["downloaded_filepath"] = str(final_path)
|
||
record["download_original_filename"] = original_filename
|
||
record["download_file_sha256"] = file_sha256
|
||
record["finished_at_utc"] = utc_now_iso()
|
||
return record
|
||
|
||
except WebDriverException as e:
|
||
record["status"] = "webdriver_error"
|
||
record["error"] = str(e)
|
||
record["finished_at_utc"] = utc_now_iso()
|
||
return record
|
||
|
||
except Exception as e:
|
||
record["status"] = "unexpected_error"
|
||
record["error"] = str(e) + "\n" + traceback.format_exc()
|
||
record["finished_at_utc"] = utc_now_iso()
|
||
return record
|
||
|
||
|
||
# ============================================================
|
||
# Worker
|
||
# ============================================================
|
||
|
||
def worker_main(url_items: List[dict], cfg_dict: dict) -> dict:
|
||
cfg = WorkerConfig(**cfg_dict)
|
||
|
||
workdir = Path(cfg.workdir)
|
||
output_dir = Path(cfg.output_dir)
|
||
final_download_root = Path(cfg.download_root)
|
||
|
||
temp_download_dir = ensure_dir(workdir / f"worker_{cfg.worker_id:02d}" / "temp_downloads")
|
||
worker_jsonl = output_dir / f"worker_{cfg.worker_id:02d}.jsonl"
|
||
|
||
result_summary = {
|
||
"worker_id": cfg.worker_id,
|
||
"total": len(url_items),
|
||
"success": 0,
|
||
"failed": 0,
|
||
"jsonl": str(worker_jsonl),
|
||
}
|
||
|
||
driver = None
|
||
|
||
try:
|
||
driver = build_firefox_driver(cfg, temp_download_dir)
|
||
|
||
for n, url_item in enumerate(url_items, 1):
|
||
print(
|
||
f"[worker {cfg.worker_id:02d}] [{n}/{len(url_items)}] "
|
||
f"input_index={url_item['input_index']} {url_item['url']}",
|
||
flush=True,
|
||
)
|
||
|
||
record = process_one_url(
|
||
driver=driver,
|
||
url_item=url_item,
|
||
cfg=cfg,
|
||
temp_download_dir=temp_download_dir,
|
||
final_download_root=final_download_root,
|
||
)
|
||
|
||
write_jsonl_line(worker_jsonl, record)
|
||
|
||
if record.get("success"):
|
||
result_summary["success"] += 1
|
||
print(
|
||
f"[worker {cfg.worker_id:02d}] OK {record['internal_hash']} "
|
||
f"{record.get('downloaded_filepath')}",
|
||
flush=True,
|
||
)
|
||
else:
|
||
result_summary["failed"] += 1
|
||
print(
|
||
f"[worker {cfg.worker_id:02d}] FAIL {record['internal_hash']} "
|
||
f"{record.get('status')}",
|
||
flush=True,
|
||
)
|
||
|
||
if cfg.delay_between_urls > 0:
|
||
time.sleep(cfg.delay_between_urls)
|
||
|
||
finally:
|
||
if driver is not None:
|
||
try:
|
||
driver.quit()
|
||
except Exception:
|
||
pass
|
||
|
||
return result_summary
|
||
|
||
|
||
# ============================================================
|
||
# Registry combining
|
||
# ============================================================
|
||
|
||
def load_success_seen(output_dir: Path) -> set:
|
||
"""
|
||
Used for resume.
|
||
|
||
Only successful URLs are skipped. Failed URLs are retried.
|
||
"""
|
||
seen = set()
|
||
|
||
paths = []
|
||
combined = output_dir / "source_registry.jsonl"
|
||
if combined.exists():
|
||
paths.append(combined)
|
||
|
||
paths.extend(sorted(output_dir.glob("worker_*.jsonl")))
|
||
|
||
for path in paths:
|
||
for r in read_jsonl(path):
|
||
if r.get("success") and r.get("url"):
|
||
seen.add(normalize_url(r["url"]))
|
||
|
||
return seen
|
||
|
||
|
||
def collect_worker_records(output_dir: Path) -> List[dict]:
|
||
records = []
|
||
for path in sorted(output_dir.glob("worker_*.jsonl")):
|
||
records.extend(read_jsonl(path))
|
||
|
||
# De-duplicate by normalized_url.
|
||
# Prefer latest successful record; otherwise keep latest record.
|
||
best_by_url: Dict[str, dict] = {}
|
||
|
||
for r in records:
|
||
url = r.get("url")
|
||
if not url:
|
||
continue
|
||
|
||
key = normalize_url(url)
|
||
old = best_by_url.get(key)
|
||
|
||
if old is None:
|
||
best_by_url[key] = r
|
||
continue
|
||
|
||
old_success = bool(old.get("success"))
|
||
new_success = bool(r.get("success"))
|
||
|
||
if new_success and not old_success:
|
||
best_by_url[key] = r
|
||
continue
|
||
|
||
if new_success == old_success:
|
||
old_finished = old.get("finished_at_utc") or ""
|
||
new_finished = r.get("finished_at_utc") or ""
|
||
if new_finished >= old_finished:
|
||
best_by_url[key] = r
|
||
|
||
final_records = list(best_by_url.values())
|
||
final_records.sort(key=lambda r: int(r.get("input_index", 0)))
|
||
return final_records
|
||
|
||
|
||
def write_combined_outputs(output_dir: Path) -> None:
|
||
ensure_dir(output_dir)
|
||
|
||
records = collect_worker_records(output_dir)
|
||
|
||
jsonl_path = output_dir / "source_registry.jsonl"
|
||
csv_path = output_dir / "source_registry.csv"
|
||
md_path = output_dir / "source_registry.md"
|
||
|
||
with jsonl_path.open("w", encoding="utf-8", newline="\n") as f:
|
||
for r in records:
|
||
f.write(json.dumps(r, ensure_ascii=False, sort_keys=True) + "\n")
|
||
|
||
csv_fields = [
|
||
"input_index",
|
||
"success",
|
||
"status",
|
||
"internal_hash",
|
||
"url",
|
||
"downloaded_filepath",
|
||
"download_original_filename",
|
||
"download_file_sha256",
|
||
"from_remote_metadata",
|
||
"error",
|
||
"started_at_utc",
|
||
"finished_at_utc",
|
||
]
|
||
|
||
with csv_path.open("w", encoding="utf-8", newline="") as f:
|
||
writer = csv.DictWriter(f, fieldnames=csv_fields)
|
||
writer.writeheader()
|
||
|
||
for r in records:
|
||
row = {}
|
||
for field in csv_fields:
|
||
value = r.get(field)
|
||
if field == "from_remote_metadata":
|
||
value = json_dumps_compact(value)
|
||
row[field] = value
|
||
writer.writerow(row)
|
||
|
||
with md_path.open("w", encoding="utf-8", newline="\n") as f:
|
||
f.write("| internal_hash | success | status | url | downloaded_filepath | details |\n")
|
||
f.write("|---|---:|---|---|---|---|\n")
|
||
|
||
for r in records:
|
||
meta = r.get("from_remote_metadata") or {}
|
||
details_parts = []
|
||
|
||
title = truncate_text(meta.get("page_title", ""), 100)
|
||
if title:
|
||
details_parts.append(f"title: {title}")
|
||
|
||
items = meta.get("jd_summary_items") or []
|
||
if items:
|
||
first_text = truncate_text(items[0].get("text", ""), 180)
|
||
if first_text:
|
||
details_parts.append(first_text)
|
||
|
||
details = "<br>".join(details_parts)
|
||
details = details.replace("|", "\\|")
|
||
|
||
f.write(
|
||
"| {internal_hash} | {success} | {status} | {url} | {path} | {details} |\n".format(
|
||
internal_hash=r.get("internal_hash", ""),
|
||
success=str(bool(r.get("success"))),
|
||
status=str(r.get("status", "")).replace("|", "\\|"),
|
||
url=str(r.get("url", "")).replace("|", "\\|"),
|
||
path=str(r.get("downloaded_filepath") or "").replace("|", "\\|"),
|
||
details=details,
|
||
)
|
||
)
|
||
|
||
print(f"\nCombined registry written:")
|
||
print(f" JSONL: {jsonl_path}")
|
||
print(f" CSV: {csv_path}")
|
||
print(f" MD: {md_path}")
|
||
|
||
|
||
# ============================================================
|
||
# Input / args
|
||
# ============================================================
|
||
|
||
def parse_proxy(proxy: Optional[str]) -> Tuple[Optional[str], Optional[int]]:
|
||
if not proxy:
|
||
return None, None
|
||
|
||
proxy = proxy.strip()
|
||
if "://" in proxy:
|
||
proxy = proxy.split("://", 1)[1]
|
||
|
||
if ":" not in proxy:
|
||
raise ValueError("Proxy must be in host:port format, example: 192.168.0.38:1080")
|
||
|
||
host, port_s = proxy.rsplit(":", 1)
|
||
return host.strip(), int(port_s.strip())
|
||
|
||
|
||
def load_urls(input_path: Path, start: int, limit: Optional[int]) -> List[dict]:
|
||
urls = []
|
||
with input_path.open("r", encoding="utf-8") as f:
|
||
for idx, line in enumerate(f):
|
||
url = line.strip()
|
||
if not url:
|
||
continue
|
||
urls.append({"input_index": idx, "url": url})
|
||
|
||
sliced = urls[start:]
|
||
if limit is not None:
|
||
sliced = sliced[:limit]
|
||
|
||
return sliced
|
||
|
||
|
||
def build_arg_parser() -> argparse.ArgumentParser:
|
||
p = argparse.ArgumentParser(
|
||
description="Parallel Firefox downloader with jd_summary_list registry capture."
|
||
)
|
||
|
||
p.add_argument("--input", default="url-final.txt", help="Input URL file, one URL per line.")
|
||
p.add_argument("--start", type=int, default=0, help="Start offset in input file. Example: 780")
|
||
p.add_argument("--limit", type=int, default=None, help="Optional max URLs to process.")
|
||
|
||
p.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="Firefox worker count.")
|
||
p.add_argument("--proxy", default=None, help="SOCKS5 proxy as host:port, example 192.168.0.38:1080")
|
||
p.add_argument("--headless", action="store_true", help="Run Firefox headless.")
|
||
|
||
p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
|
||
p.add_argument("--download-root", default=DEFAULT_DOWNLOAD_ROOT)
|
||
p.add_argument("--workdir", default=DEFAULT_WORKDIR)
|
||
|
||
p.add_argument("--page-load-timeout", type=int, default=25)
|
||
p.add_argument("--element-timeout", type=int, default=7)
|
||
p.add_argument("--download-timeout", type=int, default=60)
|
||
p.add_argument("--delay", type=float, default=2.0, help="Delay between URLs per worker.")
|
||
|
||
p.add_argument(
|
||
"--user-agent",
|
||
default=(
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) "
|
||
"Gecko/20100101 Firefox/126.0"
|
||
),
|
||
)
|
||
|
||
p.add_argument(
|
||
"--no-resume",
|
||
action="store_true",
|
||
help="Do not skip URLs already successful in previous registry files.",
|
||
)
|
||
|
||
p.add_argument(
|
||
"--combine-only",
|
||
action="store_true",
|
||
help="Only combine existing worker_*.jsonl files into source_registry.* outputs.",
|
||
)
|
||
|
||
return p
|
||
|
||
|
||
# ============================================================
|
||
# Main
|
||
# ============================================================
|
||
|
||
def main() -> int:
|
||
args = build_arg_parser().parse_args()
|
||
|
||
input_path = Path(args.input)
|
||
output_dir = ensure_dir(Path(args.output_dir))
|
||
download_root = ensure_dir(Path(args.download_root))
|
||
workdir = ensure_dir(Path(args.workdir))
|
||
|
||
if args.combine_only:
|
||
write_combined_outputs(output_dir)
|
||
return 0
|
||
|
||
if not input_path.exists():
|
||
print(f"Input file not found: {input_path}", file=sys.stderr)
|
||
return 2
|
||
|
||
proxy_host, proxy_port = parse_proxy(args.proxy)
|
||
|
||
url_items = load_urls(input_path=input_path, start=args.start, limit=args.limit)
|
||
|
||
if not args.no_resume:
|
||
seen_success = load_success_seen(output_dir)
|
||
before_count = len(url_items)
|
||
url_items = [
|
||
item for item in url_items
|
||
if normalize_url(item["url"]) not in seen_success
|
||
]
|
||
skipped = before_count - len(url_items)
|
||
else:
|
||
skipped = 0
|
||
|
||
if not url_items:
|
||
print("No URLs to process.")
|
||
if skipped:
|
||
print(f"Skipped already-successful URLs: {skipped}")
|
||
write_combined_outputs(output_dir)
|
||
return 0
|
||
|
||
workers = max(1, min(int(args.workers), len(url_items)))
|
||
chunks = chunk_evenly(url_items, workers)
|
||
|
||
print("=== CONFIG ===")
|
||
print(f"Input: {input_path}")
|
||
print(f"Start offset: {args.start}")
|
||
print(f"URLs to run: {len(url_items)}")
|
||
print(f"Skipped resume: {skipped}")
|
||
print(f"Workers: {workers}")
|
||
print(f"Proxy: {proxy_host}:{proxy_port}" if proxy_host else "Proxy: none")
|
||
print(f"Headless: {args.headless}")
|
||
print(f"Output dir: {output_dir.resolve()}")
|
||
print(f"Download root: {download_root.resolve()}")
|
||
print("==============\n")
|
||
|
||
cfgs = []
|
||
for worker_id, _chunk in enumerate(chunks):
|
||
cfg = WorkerConfig(
|
||
worker_id=worker_id,
|
||
proxy_host=proxy_host,
|
||
proxy_port=proxy_port,
|
||
headless=bool(args.headless),
|
||
page_load_timeout=int(args.page_load_timeout),
|
||
element_timeout=int(args.element_timeout),
|
||
download_timeout=int(args.download_timeout),
|
||
delay_between_urls=float(args.delay),
|
||
workdir=str(workdir),
|
||
output_dir=str(output_dir),
|
||
download_root=str(download_root),
|
||
user_agent=str(args.user_agent),
|
||
)
|
||
cfgs.append(cfg)
|
||
|
||
total_success = 0
|
||
total_failed = 0
|
||
|
||
with ProcessPoolExecutor(max_workers=workers) as executor:
|
||
futures = []
|
||
for chunk, cfg in zip(chunks, cfgs):
|
||
futures.append(executor.submit(worker_main, chunk, cfg.__dict__))
|
||
|
||
for fut in as_completed(futures):
|
||
try:
|
||
summary = fut.result()
|
||
total_success += int(summary.get("success", 0))
|
||
total_failed += int(summary.get("failed", 0))
|
||
print(
|
||
f"\nWorker {summary.get('worker_id')} finished: "
|
||
f"success={summary.get('success')} failed={summary.get('failed')}"
|
||
)
|
||
except Exception as e:
|
||
total_failed += 1
|
||
print(f"[FATAL] Worker crashed: {e}", file=sys.stderr)
|
||
traceback.print_exc()
|
||
|
||
write_combined_outputs(output_dir)
|
||
|
||
print("\n=== FINISHED ===")
|
||
print(f"Success this run: {total_success}")
|
||
print(f"Failed this run: {total_failed}")
|
||
print(f"Registry ID form: R_<16 hex chars from normalized URL SHA256>")
|
||
print("For GDB files, do not raw-patch the binary. Use the registry or a controlled converter/post-processor.")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|
||
```
|
||
|
||
Run it like this:
|
||
|
||
```bash
|
||
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080
|
||
```
|
||
|
||
Headless:
|
||
|
||
```bash
|
||
python jd_parallel_downloader.py --input url-final.txt --start 780 --workers 4 --proxy 192.168.0.38:1080 --headless
|
||
```
|
||
|
||
Important behavior:
|
||
|
||
* Each worker has its own Firefox instance.
|
||
* Each worker has its own temporary download directory.
|
||
* Final files are moved into:
|
||
|
||
```text
|
||
downloads_by_hash/
|
||
R_ABCDEF1234567890/
|
||
downloaded_file.gpx
|
||
downloaded_file.gpx.source.json
|
||
```
|
||
|
||
* Main registry files are:
|
||
|
||
```text
|
||
output/source_registry.jsonl
|
||
output/source_registry.csv
|
||
output/source_registry.md
|
||
```
|
||
|
||
For GPX/GDB injection later:
|
||
|
||
* GPX can safely receive a metadata extension like:
|
||
|
||
```xml
|
||
<source_registry_hash>R_ABCDEF1234567890</source_registry_hash>
|
||
```
|
||
|
||
* GDB should not be patched as raw binary. Better flow is: keep the registry + sidecar, or convert GDB to GPX with GPSBabel, inject the hash into GPX metadata, and keep the original GDB mapped through `source_registry.csv/jsonl`.
|