Files
bg-mountains-scraping-routes/packaging/route_packager.py
2026-06-04 01:35:55 +03:00

1112 lines
47 KiB
Python

#!/usr/bin/env python3
"""
route_packager.py
Parse a scraped/downloaded route archive tree and build importable route bundles:
- OsmAnd: .osf package (zip container) containing one normalized GPX per route,
plus an aggregate GPX fallback and machine-readable manifests.
- Google: .kmz package (zip container) containing a KML document with one
toggleable Folder/Placemark per route, plus optional embedded media.
Designed for trees like:
v4/downloads_by_hash/R_*/<download>.rar|zip|7z + <download>.source.json
Important reality check:
- OsmAnd can import GPX/KML/KMZ and OSF backup/package containers. This script
creates an OSF (renamed zip) with tracks and metadata, and also writes a GPX
fallback because GPX import is the most predictable OsmAnd route workflow.
- Google Maps / My Maps / Earth do not accept arbitrary custom binary route
formats. The closest binary importable bundle is KMZ, a zipped KML package.
No network required. Python stdlib only for ZIP/TAR/KMZ/KML/GPX. Optional tools:
- RAR extraction: unrar / unar / 7z / 7zz / bsdtar, or python rarfile with tool.
- Garmin GDB conversion: gpsbabel, if present.
Examples:
python3 route_packager.py --input ./v4 --out ./out --target both
python3 route_packager.py --input ./v4-scrape-675.tar.gz --out ./out --target google --keep-temp
python3 route_packager.py --input ./v4 --out ./out --target osmand --skip-rar
Exit codes:
0 = completed, possibly with skipped unsupported files unless --strict was used
2 = validation / input error
3 = strict mode detected skipped route archives or unsupported route files
"""
from __future__ import annotations
import argparse
import csv
import datetime as _dt
import hashlib
import html
import io
import json
import math
import os
import posixpath
import re
import shutil
import subprocess
import sys
import tarfile
import tempfile
import textwrap
import zipfile
from dataclasses import dataclass, field, asdict
from pathlib import Path, PurePosixPath
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple
from xml.etree import ElementTree as ET
# ----------------------------- constants ---------------------------------
ROUTE_EXTS = {".gpx", ".kml", ".kmz", ".gdb"}
ARCHIVE_EXTS = {".zip", ".rar", ".7z"}
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tif", ".tiff"}
TEXT_EXTS = {".txt", ".md", ".csv", ".json", ".html", ".htm", ".nfo", ".log"}
GPX_NS = "http://www.topografix.com/GPX/1/1"
KML_NS = "http://www.opengis.net/kml/2.2"
ET.register_namespace("", GPX_NS)
ET.register_namespace("kml", KML_NS)
# ----------------------------- models ------------------------------------
@dataclass
class Point:
lat: float
lon: float
ele: Optional[float] = None
time: Optional[str] = None
@dataclass
class Segment:
points: List[Point] = field(default_factory=list)
@dataclass
class Route:
id: str
name: str
source_archive: str
source_archive_hash_dir: str
inner_path: str
source_title: str = ""
source_license: str = ""
source_url: str = ""
source_created_at_utc: str = ""
download_sha256: str = ""
route_kind: str = "track"
segments: List[Segment] = field(default_factory=list)
waypoints: List[Point] = field(default_factory=list)
media: List[str] = field(default_factory=list)
text_notes: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def point_count(self) -> int:
return sum(len(s.points) for s in self.segments)
def bbox(self) -> Optional[Tuple[float, float, float, float]]:
pts = [p for s in self.segments for p in s.points] + self.waypoints
if not pts:
return None
return (min(p.lat for p in pts), min(p.lon for p in pts), max(p.lat for p in pts), max(p.lon for p in pts))
def distance_km(self) -> float:
total = 0.0
for seg in self.segments:
for a, b in zip(seg.points, seg.points[1:]):
total += haversine_km(a.lat, a.lon, b.lat, b.lon)
return total
@dataclass
class ScanReport:
input: str
generated_at_utc: str
archives_seen: int = 0
archives_extracted: int = 0
route_files_seen: int = 0
route_files_imported: int = 0
media_files_seen: int = 0
text_files_seen: int = 0
routes_written: int = 0
skipped: List[Dict[str, str]] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
# ----------------------------- utility -----------------------------------
def now_utc() -> str:
return _dt.datetime.now(_dt.timezone.utc).isoformat(timespec="seconds")
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def sha1_text(s: str) -> str:
return hashlib.sha1(s.encode("utf-8", "replace")).hexdigest()
def slugify(value: str, fallback: str = "route", max_len: int = 96) -> str:
value = value.strip() or fallback
value = re.sub(r"[\\/\0]+", "-", value)
value = re.sub(r"\s+", "_", value)
# Keep Bulgarian/Cyrillic and most Unicode word chars. Remove filesystem-hostile chars.
value = re.sub(r"[^\w\-.()\u0400-\u04FF]+", "-", value, flags=re.UNICODE)
value = value.strip("._- ") or fallback
if len(value) > max_len:
value = value[:max_len].rstrip("._- ")
return value
def decode_text(data: bytes) -> str:
for enc in ("utf-8-sig", "utf-8", "cp1251", "windows-1251", "cp866", "latin-1"):
try:
return data.decode(enc)
except UnicodeDecodeError:
continue
return data.decode("utf-8", "replace")
def strip_ns(tag: str) -> str:
return tag.rsplit("}", 1)[-1] if "}" in tag else tag
def find_child_text(el: ET.Element, names: Sequence[str]) -> str:
names_l = {n.lower() for n in names}
for child in list(el):
if strip_ns(child.tag).lower() in names_l:
return (child.text or "").strip()
return ""
def safe_relpath(name: str) -> Optional[str]:
if not name:
return None
p = PurePosixPath(name.replace("\\", "/"))
if p.is_absolute() or any(part in ("..", "") for part in p.parts):
return None
return str(p)
def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
R = 6371.0088
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
return 2 * R * math.asin(math.sqrt(a))
def which_any(names: Sequence[str]) -> Optional[str]:
for n in names:
p = shutil.which(n)
if p:
return p
return None
# ----------------------------- metadata ----------------------------------
def load_source_meta(archive_path: Path) -> Dict[str, str]:
"""Load <archive>.source.json if present and normalize useful fields."""
meta_path = archive_path.with_name(archive_path.name + ".source.json")
if not meta_path.exists():
return {}
try:
raw = json.loads(meta_path.read_text(encoding="utf-8", errors="replace"))
except Exception as e:
return {"source_meta_error": str(e)}
title = ""
license_text = ""
url = ""
summary_items = (((raw.get("from_remote_metadata") or {}).get("jd_summary_items")) or [])
if summary_items:
item = summary_items[0]
title = (item.get("text") or "").strip()
# Common scrape text form: "Title License: Creative Commons Size: ..."
title = re.sub(r"\s*Лиценз:\s*.*$", "", title).strip()
title = re.sub(r"\s*License:\s*.*$", "", title).strip()
for link in item.get("links") or []:
if "creative" in (link.get("text") or "").lower():
license_text = link.get("text") or "Creative Commons"
if not url and link.get("href"):
url = link.get("href")
# Try additional likely fields without assuming exact scrape schema.
for k in ("source_url", "url", "page_url", "download_url"):
if raw.get(k) and not url:
url = str(raw[k])
return {
"source_title": title,
"source_license": license_text,
"source_url": url,
"source_created_at_utc": str(raw.get("created_at_utc") or ""),
"download_sha256": str(raw.get("download_file_sha256") or ""),
"download_original_filename": str(raw.get("download_original_filename") or archive_path.name),
}
# ----------------------------- archive extraction -------------------------
@dataclass
class ExtractedFile:
relpath: str
data: bytes
def iter_zip_files(path: Path) -> Iterator[ExtractedFile]:
with zipfile.ZipFile(path) as zf:
for info in zf.infolist():
if info.is_dir():
continue
rel = safe_relpath(info.filename)
if not rel:
continue
yield ExtractedFile(rel, zf.read(info))
def iter_kmz_files(data: bytes) -> Iterator[ExtractedFile]:
with zipfile.ZipFile(io.BytesIO(data)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
rel = safe_relpath(info.filename)
if rel:
yield ExtractedFile(rel, zf.read(info))
def iter_rar_files(path: Path, skip_rar: bool = False) -> Iterator[ExtractedFile]:
if skip_rar:
return
try:
import rarfile # type: ignore
except ImportError as e:
raise RuntimeError("RAR support needs python package 'rarfile' plus unrar/unar/7z/bsdtar") from e
# rarfile can use unrar/unar/bsdtar. Give it the best available tool if present.
tool = which_any(["unrar", "unar", "bsdtar", "7z", "7zz", "unrar-free"])
if tool:
# rarfile expects a command name/path in these globals. It knows unrar/unar/bsdtar best.
name = Path(tool).name
if name in {"unrar", "unrar-free"}:
rarfile.UNRAR_TOOL = tool
elif name == "unar":
rarfile.UNAR_TOOL = tool
elif name == "bsdtar":
rarfile.BSDTAR_TOOL = tool
# For 7z/7zz, direct fallback below is more reliable.
if tool and Path(tool).name in {"7z", "7zz"}:
yield from iter_with_7z(path)
return
try:
with rarfile.RarFile(path) as rf:
for info in rf.infolist():
if info.isdir():
continue
rel = safe_relpath(info.filename)
if not rel:
continue
yield ExtractedFile(rel, rf.read(info))
except Exception as e:
raise RuntimeError(
f"Could not extract RAR {path.name}. Install one of: unrar, unar, bsdtar, 7z/7zz. Original error: {e}"
) from e
def iter_with_7z(path: Path) -> Iterator[ExtractedFile]:
tool = which_any(["7zz", "7z"])
if not tool:
raise RuntimeError("7z/7zz not found")
with tempfile.TemporaryDirectory(prefix="routepkg_7z_") as td:
out = Path(td) / "x"
out.mkdir()
cmd = [tool, "x", "-y", f"-o{out}", str(path)]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(f"7z extraction failed for {path.name}: {p.stderr[-1000:] or p.stdout[-1000:]}")
for fp in out.rglob("*"):
if fp.is_file():
rel = safe_relpath(str(fp.relative_to(out).as_posix()))
if rel:
yield ExtractedFile(rel, fp.read_bytes())
def iter_archive_files(path: Path, skip_rar: bool = False) -> Iterator[ExtractedFile]:
ext = path.suffix.lower()
if ext == ".zip":
yield from iter_zip_files(path)
elif ext == ".rar":
yield from iter_rar_files(path, skip_rar=skip_rar)
elif ext == ".7z":
yield from iter_with_7z(path)
else:
raise RuntimeError(f"Unsupported archive extension: {path}")
# ----------------------------- route parsing -----------------------------
def parse_gpx(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
text = decode_text(data)
try:
root = ET.fromstring(text.encode("utf-8"))
except Exception as e:
# Some old GPX files include bad entities/control chars. Try a sanitized pass.
sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
try:
root = ET.fromstring(sanitized.encode("utf-8"))
except Exception as e2:
raise ValueError(f"GPX XML parse failed: {e2}; first error: {e}")
meta_name = ""
for el in root.iter():
if strip_ns(el.tag).lower() == "metadata":
meta_name = find_child_text(el, ["name"])
break
routes: List[Route] = []
trk_index = 0
rte_index = 0
for trk in [e for e in root.iter() if strip_ns(e.tag).lower() == "trk"]:
trk_index += 1
name = find_child_text(trk, ["name"]) or meta_name or route_base.get("fallback_name", "track")
segs: List[Segment] = []
for seg_el in [e for e in list(trk) if strip_ns(e.tag).lower() == "trkseg"]:
pts = []
for pt_el in [e for e in list(seg_el) if strip_ns(e.tag).lower() == "trkpt"]:
p = parse_point_el(pt_el)
if p:
pts.append(p)
if pts:
segs.append(Segment(pts))
if segs:
rid = build_route_id(route_base, name, f"trk{trk_index}")
routes.append(Route(id=rid, name=name, route_kind="track", segments=segs, **route_base_to_route_kwargs(route_base)))
for rte in [e for e in root.iter() if strip_ns(e.tag).lower() == "rte"]:
rte_index += 1
name = find_child_text(rte, ["name"]) or meta_name or route_base.get("fallback_name", "route")
pts = []
for pt_el in [e for e in list(rte) if strip_ns(e.tag).lower() == "rtept"]:
p = parse_point_el(pt_el)
if p:
pts.append(p)
if len(pts) >= 2:
rid = build_route_id(route_base, name, f"rte{rte_index}")
routes.append(Route(id=rid, name=name, route_kind="route", segments=[Segment(pts)], **route_base_to_route_kwargs(route_base)))
# If GPX only contains waypoints, preserve as a route object with waypoints.
wpts = []
for wpt in [e for e in list(root) if strip_ns(e.tag).lower() == "wpt"]:
p = parse_point_el(wpt)
if p:
wpts.append(p)
if wpts and not routes:
name = meta_name or route_base.get("fallback_name", "waypoints")
rid = build_route_id(route_base, name, "wpt")
routes.append(Route(id=rid, name=name, route_kind="waypoints", waypoints=wpts, **route_base_to_route_kwargs(route_base)))
if not routes:
report_warnings.append(f"No tracks/routes/waypoints found in {route_base.get('inner_path','?')}")
return routes
def parse_point_el(el: ET.Element) -> Optional[Point]:
try:
lat = float(el.attrib.get("lat", ""))
lon = float(el.attrib.get("lon", ""))
except ValueError:
return None
ele = None
time = None
for ch in list(el):
lname = strip_ns(ch.tag).lower()
if lname == "ele":
try:
ele = float((ch.text or "").strip())
except ValueError:
pass
elif lname == "time":
time = (ch.text or "").strip() or None
return Point(lat=lat, lon=lon, ele=ele, time=time)
def parse_kml(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
text = decode_text(data)
try:
root = ET.fromstring(text.encode("utf-8"))
except Exception as e:
raise ValueError(f"KML XML parse failed: {e}")
routes: List[Route] = []
placemarks = [e for e in root.iter() if strip_ns(e.tag).lower() == "placemark"]
idx = 0
for pm in placemarks:
name = find_child_text(pm, ["name"]) or route_base.get("fallback_name", "kml-route")
for line in [e for e in pm.iter() if strip_ns(e.tag).lower() == "linestring"]:
coord_text = ""
for ch in list(line):
if strip_ns(ch.tag).lower() == "coordinates":
coord_text = ch.text or ""
break
pts = parse_kml_coordinates(coord_text)
if len(pts) >= 2:
idx += 1
rid = build_route_id(route_base, name, f"kml{idx}")
routes.append(Route(id=rid, name=name, route_kind="kml-linestring", segments=[Segment(pts)], **route_base_to_route_kwargs(route_base)))
if not routes:
report_warnings.append(f"No KML LineString routes found in {route_base.get('inner_path','?')}")
return routes
def parse_kml_coordinates(coord_text: str) -> List[Point]:
pts: List[Point] = []
for token in coord_text.replace("\n", " ").replace("\t", " ").split():
parts = token.split(",")
if len(parts) < 2:
continue
try:
lon = float(parts[0])
lat = float(parts[1])
ele = float(parts[2]) if len(parts) > 2 and parts[2] != "" else None
pts.append(Point(lat=lat, lon=lon, ele=ele))
except ValueError:
continue
return pts
def parse_kmz(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> Tuple[List[Route], List[ExtractedFile]]:
routes: List[Route] = []
media: List[ExtractedFile] = []
for f in iter_kmz_files(data):
ext = Path(f.relpath).suffix.lower()
if ext == ".kml":
rb = dict(route_base)
rb["inner_path"] = route_base.get("inner_path", "") + "!" + f.relpath
rb["fallback_name"] = Path(f.relpath).stem
routes.extend(parse_kml(f.data, rb, report_warnings))
elif ext in IMAGE_EXTS:
media.append(f)
return routes, media
def convert_gdb_to_gpx(gdb_data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
gpsbabel = shutil.which("gpsbabel")
if not gpsbabel:
raise RuntimeError("GDB file found but gpsbabel is not installed; install gpsbabel or keep the source GPX/KML files only")
with tempfile.TemporaryDirectory(prefix="routepkg_gdb_") as td:
src = Path(td) / "in.gdb"
dst = Path(td) / "out.gpx"
src.write_bytes(gdb_data)
cmd = [gpsbabel, "-i", "gdb", "-f", str(src), "-o", "gpx", "-F", str(dst)]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0 or not dst.exists():
raise RuntimeError(f"gpsbabel failed: {p.stderr[-1000:] or p.stdout[-1000:]}")
return parse_gpx(dst.read_bytes(), route_base, report_warnings)
def route_base_to_route_kwargs(base: Dict[str, str]) -> Dict[str, str]:
return {
"source_archive": base.get("source_archive", ""),
"source_archive_hash_dir": base.get("source_archive_hash_dir", ""),
"inner_path": base.get("inner_path", ""),
"source_title": base.get("source_title", ""),
"source_license": base.get("source_license", ""),
"source_url": base.get("source_url", ""),
"source_created_at_utc": base.get("source_created_at_utc", ""),
"download_sha256": base.get("download_sha256", ""),
}
def build_route_id(base: Dict[str, str], name: str, suffix: str) -> str:
seed = "|".join([
base.get("source_archive_hash_dir", ""),
base.get("source_archive", ""),
base.get("inner_path", ""),
name,
suffix,
])
return sha1_text(seed)[:16]
# ----------------------------- scanning ----------------------------------
def materialize_input(input_path: Path, keep_temp: bool = False) -> Tuple[Path, Optional[tempfile.TemporaryDirectory]]:
if input_path.is_dir():
return input_path, None
# Accept tar even when extension lies (.tar.gz that is actually plain tar).
if tarfile.is_tarfile(input_path):
td = tempfile.TemporaryDirectory(prefix="routepkg_input_")
root = Path(td.name)
with tarfile.open(input_path, mode="r:*") as tf:
# Safe tar extraction.
for m in tf.getmembers():
rel = safe_relpath(m.name)
if not rel:
continue
target = root / rel
if m.isdir():
target.mkdir(parents=True, exist_ok=True)
elif m.isfile():
target.parent.mkdir(parents=True, exist_ok=True)
f = tf.extractfile(m)
if f is not None:
target.write_bytes(f.read())
return root, td
raise ValueError(f"Input is neither directory nor tar archive: {input_path}")
def find_route_archives(root: Path, scan_all: bool = False) -> List[Path]:
archives = []
search_root = root
# Scrape bundles often contain retry/failure snapshots that duplicate downloads_by_hash.
# By default, process the canonical downloads_by_hash tree if it exists.
canonical = root / "downloads_by_hash"
if canonical.exists() and canonical.is_dir() and not scan_all:
search_root = canonical
elif not scan_all:
# Tar inputs often extract to a single top-level folder, e.g. ./v4/downloads_by_hash.
candidates = [p / "downloads_by_hash" for p in root.iterdir() if p.is_dir()] if root.exists() else []
candidates = [p for p in candidates if p.exists() and p.is_dir()]
if len(candidates) == 1:
search_root = candidates[0]
for p in search_root.rglob("*"):
if p.is_file() and p.suffix.lower() in ARCHIVE_EXTS:
archives.append(p)
return sorted(archives)
def scan_routes(root: Path, args: argparse.Namespace, report: ScanReport) -> Tuple[List[Route], Dict[str, bytes]]:
routes: List[Route] = []
media_store: Dict[str, bytes] = {}
archives = find_route_archives(root, scan_all=args.scan_all)
report.archives_seen = len(archives)
for i, ap in enumerate(archives, 1):
if args.limit_archives and i > args.limit_archives:
break
if args.skip_rar and ap.suffix.lower() == ".rar":
report.skipped.append({"path": str(ap), "reason": "RAR skipped by --skip-rar"})
continue
meta = load_source_meta(ap)
hash_dir = ap.parent.name
archive_route_count_before = len(routes)
archive_media_paths: List[str] = []
archive_text_notes: List[str] = []
archive_warnings: List[str] = []
try:
extracted = list(iter_archive_files(ap, skip_rar=args.skip_rar))
report.archives_extracted += 1
except Exception as e:
report.skipped.append({"path": str(ap), "reason": str(e)})
continue
# First collect media/text so route descriptions can reference them.
for f in extracted:
ext = Path(f.relpath).suffix.lower()
if ext in IMAGE_EXTS:
report.media_files_seen += 1
media_key = f"media/{slugify(hash_dir)}/{slugify(f.relpath, fallback='image')}"
media_store[media_key] = f.data
archive_media_paths.append(media_key)
elif ext in TEXT_EXTS and len(f.data) <= args.max_text_note_bytes:
report.text_files_seen += 1
txt = decode_text(f.data).strip()
if txt:
archive_text_notes.append(f"[{f.relpath}]\n{txt[:args.max_text_note_chars]}")
for f in extracted:
ext = Path(f.relpath).suffix.lower()
if ext not in ROUTE_EXTS:
continue
report.route_files_seen += 1
base = {
"source_archive": ap.name,
"source_archive_hash_dir": hash_dir,
"inner_path": f.relpath,
"fallback_name": Path(f.relpath).stem or ap.stem,
**meta,
}
try:
parsed: List[Route]
if ext == ".gpx":
parsed = parse_gpx(f.data, base, archive_warnings)
elif ext == ".kml":
parsed = parse_kml(f.data, base, archive_warnings)
elif ext == ".kmz":
parsed, kmz_media = parse_kmz(f.data, base, archive_warnings)
for mf in kmz_media:
media_key = f"media/{slugify(hash_dir)}/{slugify(f.relpath)}__{slugify(mf.relpath, fallback='image')}"
media_store[media_key] = mf.data
archive_media_paths.append(media_key)
report.media_files_seen += 1
elif ext == ".gdb":
parsed = convert_gdb_to_gpx(f.data, base, archive_warnings)
else:
parsed = []
for r in parsed:
r.media.extend(sorted(set(archive_media_paths)))
r.text_notes.extend(archive_text_notes[: args.max_text_notes_per_route])
r.warnings.extend(archive_warnings)
if r.point_count() or r.waypoints:
routes.append(r)
report.route_files_imported += 1
except Exception as e:
report.skipped.append({"path": f"{ap}::{f.relpath}", "reason": str(e)})
if args.verbose:
added = len(routes) - archive_route_count_before
print(f"[{i}/{len(archives)}] {ap.name}: +{added} routes", file=sys.stderr)
# De-duplicate exact geometry+name-ish collisions.
deduped: List[Route] = []
seen: set[str] = set()
for r in routes:
geom_sig = geometry_signature(r)
if geom_sig in seen and not args.keep_duplicates:
continue
seen.add(geom_sig)
deduped.append(r)
if len(deduped) != len(routes):
report.warnings.append(f"Deduplicated {len(routes) - len(deduped)} duplicate route geometries")
report.routes_written = len(deduped)
return deduped, media_store
def geometry_signature(r: Route) -> str:
h = hashlib.sha1()
h.update(r.name.strip().lower().encode("utf-8", "replace"))
for seg in r.segments:
h.update(b"|")
for p in seg.points[:: max(1, len(seg.points)//200)]:
h.update(f"{p.lat:.6f},{p.lon:.6f};".encode())
for p in r.waypoints[:200]:
h.update(f"w{p.lat:.6f},{p.lon:.6f};".encode())
return h.hexdigest()
# ----------------------------- output GPX --------------------------------
def route_to_gpx_tree(route: Route) -> ET.ElementTree:
root = ET.Element(f"{{{GPX_NS}}}gpx", attrib={
"version": "1.1",
"creator": "route_packager.py",
})
meta = ET.SubElement(root, f"{{{GPX_NS}}}metadata")
ET.SubElement(meta, f"{{{GPX_NS}}}name").text = route.name
desc = build_plain_description(route)
if desc:
ET.SubElement(meta, f"{{{GPX_NS}}}desc").text = desc
for wp in route.waypoints:
w = ET.SubElement(root, f"{{{GPX_NS}}}wpt", attrib={"lat": f"{wp.lat:.8f}", "lon": f"{wp.lon:.8f}"})
if wp.ele is not None:
ET.SubElement(w, f"{{{GPX_NS}}}ele").text = f"{wp.ele:.2f}"
if wp.time:
ET.SubElement(w, f"{{{GPX_NS}}}time").text = wp.time
trk = ET.SubElement(root, f"{{{GPX_NS}}}trk")
ET.SubElement(trk, f"{{{GPX_NS}}}name").text = route.name
ET.SubElement(trk, f"{{{GPX_NS}}}desc").text = desc
for seg in route.segments:
seg_el = ET.SubElement(trk, f"{{{GPX_NS}}}trkseg")
for p in seg.points:
pt = ET.SubElement(seg_el, f"{{{GPX_NS}}}trkpt", attrib={"lat": f"{p.lat:.8f}", "lon": f"{p.lon:.8f}"})
if p.ele is not None:
ET.SubElement(pt, f"{{{GPX_NS}}}ele").text = f"{p.ele:.2f}"
if p.time:
ET.SubElement(pt, f"{{{GPX_NS}}}time").text = p.time
return ET.ElementTree(root)
def write_xml_tree(tree: ET.ElementTree, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tree.write(path, encoding="utf-8", xml_declaration=True, short_empty_elements=True)
def build_aggregate_gpx(routes: Sequence[Route]) -> ET.ElementTree:
root = ET.Element(f"{{{GPX_NS}}}gpx", attrib={"version": "1.1", "creator": "route_packager.py"})
meta = ET.SubElement(root, f"{{{GPX_NS}}}metadata")
ET.SubElement(meta, f"{{{GPX_NS}}}name").text = "Bulgarian mountain routes aggregate"
ET.SubElement(meta, f"{{{GPX_NS}}}desc").text = f"Generated {now_utc()} with {len(routes)} routes."
for r in routes:
trk = ET.SubElement(root, f"{{{GPX_NS}}}trk")
ET.SubElement(trk, f"{{{GPX_NS}}}name").text = r.name
ET.SubElement(trk, f"{{{GPX_NS}}}desc").text = build_plain_description(r)
for seg in r.segments:
seg_el = ET.SubElement(trk, f"{{{GPX_NS}}}trkseg")
for p in seg.points:
pt = ET.SubElement(seg_el, f"{{{GPX_NS}}}trkpt", attrib={"lat": f"{p.lat:.8f}", "lon": f"{p.lon:.8f}"})
if p.ele is not None:
ET.SubElement(pt, f"{{{GPX_NS}}}ele").text = f"{p.ele:.2f}"
if p.time:
ET.SubElement(pt, f"{{{GPX_NS}}}time").text = p.time
for wp in r.waypoints:
w = ET.SubElement(root, f"{{{GPX_NS}}}wpt", attrib={"lat": f"{wp.lat:.8f}", "lon": f"{wp.lon:.8f}"})
ET.SubElement(w, f"{{{GPX_NS}}}name").text = r.name
return ET.ElementTree(root)
# ----------------------------- output KML/KMZ ----------------------------
def limit_points(points: Sequence[Point], max_points: int) -> List[Point]:
if max_points <= 0 or len(points) <= max_points:
return list(points)
if max_points <= 2:
return [points[0], points[-1]]
step = (len(points) - 1) / float(max_points - 1)
picked = [points[round(i * step)] for i in range(max_points)]
picked[0] = points[0]
picked[-1] = points[-1]
# Remove accidental duplicates from rounding while preserving order.
out: List[Point] = []
last_key = None
for p in picked:
key = (p.lat, p.lon, p.ele, p.time)
if key != last_key:
out.append(p)
last_key = key
return out
def kml_segments(route: Route, max_points_per_route: int = 0) -> List[List[Point]]:
if max_points_per_route <= 0:
return [list(s.points) for s in route.segments]
total = max(1, route.point_count())
out: List[List[Point]] = []
for seg in route.segments:
share = max(2, int(max_points_per_route * (len(seg.points) / total)))
out.append(limit_points(seg.points, share))
return out
def build_kml(routes: Sequence[Route], media_store: Dict[str, bytes], max_points_per_route: int = 0, lean_descriptions: bool = False) -> bytes:
kml = ET.Element(f"{{{KML_NS}}}kml")
doc = ET.SubElement(kml, f"{{{KML_NS}}}Document")
ET.SubElement(doc, f"{{{KML_NS}}}name").text = "Bulgarian mountain routes"
ET.SubElement(doc, f"{{{KML_NS}}}description").text = f"Generated {now_utc()} from scraped route archives."
style = ET.SubElement(doc, f"{{{KML_NS}}}Style", id="routeLine")
line = ET.SubElement(style, f"{{{KML_NS}}}LineStyle")
ET.SubElement(line, f"{{{KML_NS}}}color").text = "ff0066cc"
ET.SubElement(line, f"{{{KML_NS}}}width").text = "4"
folder = ET.SubElement(doc, f"{{{KML_NS}}}Folder")
ET.SubElement(folder, f"{{{KML_NS}}}name").text = "Routes"
ET.SubElement(folder, f"{{{KML_NS}}}open").text = "0"
for r in routes:
pm = ET.SubElement(folder, f"{{{KML_NS}}}Placemark")
ET.SubElement(pm, f"{{{KML_NS}}}name").text = r.name
ET.SubElement(pm, f"{{{KML_NS}}}visibility").text = "1"
ET.SubElement(pm, f"{{{KML_NS}}}styleUrl").text = "#routeLine"
ET.SubElement(pm, f"{{{KML_NS}}}description").text = build_html_description(r, lean=lean_descriptions)
segs_for_google = kml_segments(r, max_points_per_route=max_points_per_route)
if len(segs_for_google) > 1:
multi = ET.SubElement(pm, f"{{{KML_NS}}}MultiGeometry")
for pts in segs_for_google:
append_kml_linestring(multi, pts)
elif segs_for_google:
append_kml_linestring(pm, segs_for_google[0])
elif r.waypoints:
# For waypoint-only GPX, emit the first point as Point and leave all points in desc.
p = r.waypoints[0]
point = ET.SubElement(pm, f"{{{KML_NS}}}Point")
ET.SubElement(point, f"{{{KML_NS}}}coordinates").text = f"{p.lon:.8f},{p.lat:.8f},{p.ele or 0:.2f}"
buf = io.BytesIO()
ET.ElementTree(kml).write(buf, encoding="utf-8", xml_declaration=True, short_empty_elements=True)
return buf.getvalue()
def append_kml_linestring(parent: ET.Element, pts: Sequence[Point]) -> None:
ls = ET.SubElement(parent, f"{{{KML_NS}}}LineString")
ET.SubElement(ls, f"{{{KML_NS}}}tessellate").text = "1"
coords = " ".join(f"{p.lon:.8f},{p.lat:.8f},{p.ele if p.ele is not None else 0:.2f}" for p in pts)
ET.SubElement(ls, f"{{{KML_NS}}}coordinates").text = coords
def build_plain_description(route: Route) -> str:
rows = []
if route.source_title:
rows.append(f"Title: {route.source_title}")
rows.append(f"Source archive: {route.source_archive}")
rows.append(f"Inner file: {route.inner_path}")
if route.source_license:
rows.append(f"License: {route.source_license}")
if route.source_created_at_utc:
rows.append(f"Scrape created: {route.source_created_at_utc}")
if route.download_sha256:
rows.append(f"Download SHA256: {route.download_sha256}")
rows.append(f"Points: {route.point_count()}")
rows.append(f"Distance km approx: {route.distance_km():.2f}")
if route.media:
rows.append("Media: " + ", ".join(route.media[:12]) + (" ..." if len(route.media) > 12 else ""))
if route.text_notes:
rows.append("Text notes:\n" + "\n\n".join(route.text_notes[:3]))
if route.warnings:
rows.append("Warnings: " + "; ".join(sorted(set(route.warnings))[:5]))
return "\n".join(rows)
def build_html_description(route: Route, lean: bool = False) -> str:
def row(k: str, v: str) -> str:
return f"<tr><th align='left'>{html.escape(k)}</th><td>{html.escape(v)}</td></tr>"
rows = []
if route.source_title:
rows.append(row("Source title", route.source_title))
rows.append(row("Source archive", route.source_archive))
rows.append(row("Inner file", route.inner_path))
if route.source_license:
rows.append(row("License", route.source_license))
if route.source_created_at_utc:
rows.append(row("Scraped", route.source_created_at_utc))
rows.append(row("Points", str(route.point_count())))
rows.append(row("Approx distance", f"{route.distance_km():.2f} km"))
if route.download_sha256:
rows.append(row("Download SHA256", route.download_sha256))
media_html = ""
if not lean:
media_iter = route.media[:8]
else:
media_iter = []
for m in media_iter:
if Path(m).suffix.lower() in IMAGE_EXTS:
media_html += f"<p><img src='{html.escape(m)}' width='420'/><br/>{html.escape(m)}</p>"
notes = ""
if route.text_notes and not lean:
notes = "<h3>Text notes</h3>" + "".join(f"<pre>{html.escape(n)}</pre>" for n in route.text_notes[:3])
return f"<table>{''.join(rows)}</table>{media_html}{notes}"
# ----------------------------- writers -----------------------------------
def write_osmand_outputs(routes: Sequence[Route], media_store: Dict[str, bytes], out_dir: Path, base_name: str, report: ScanReport) -> Dict[str, str]:
paths: Dict[str, str] = {}
tracks_dir = out_dir / "osmand_tracks"
if tracks_dir.exists():
shutil.rmtree(tracks_dir)
tracks_dir.mkdir(parents=True)
manifest_routes = []
used_names: Dict[str, int] = {}
for r in routes:
stem = slugify(r.name, fallback=r.id)
used_names[stem] = used_names.get(stem, 0) + 1
if used_names[stem] > 1:
stem = f"{stem}_{r.id}"
rel = Path("tracks") / f"{stem}.gpx"
write_xml_tree(route_to_gpx_tree(r), tracks_dir / rel)
manifest_routes.append(route_manifest(r, str(rel).replace(os.sep, "/")))
aggregate_gpx = out_dir / f"{base_name}.all-routes.gpx"
write_xml_tree(build_aggregate_gpx(routes), aggregate_gpx)
paths["osmand_gpx_fallback"] = str(aggregate_gpx)
manifest = {
"type": "route-packager-osmand",
"generated_at_utc": report.generated_at_utc,
"route_count": len(routes),
"routes": manifest_routes,
"notes": [
"This .osf is a zip-style OsmAnd package containing GPX tracks.",
"If your OsmAnd build refuses package import, import the .all-routes.gpx fallback or unzip tracks/*.gpx.",
],
}
(tracks_dir / "manifest.json").write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
# A lightweight items.json helps plugin-style OSF importers identify contents, while not harming zip usage.
items = {
"items": [{"type": "tracks", "path": m["package_path"], "name": m["name"]} for m in manifest_routes]
}
(tracks_dir / "items.json").write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
for media_path, data in media_store.items():
dest = tracks_dir / media_path
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(data)
osf_path = out_dir / f"{base_name}.osmand-tracks.osf"
zip_dir_as(tracks_dir, osf_path)
paths["osmand_osf"] = str(osf_path)
return paths
def write_google_outputs(routes: Sequence[Route], media_store: Dict[str, bytes], out_dir: Path, base_name: str, args: argparse.Namespace, report: ScanReport) -> Dict[str, str]:
paths: Dict[str, str] = {}
max_points = max(0, int(args.google_max_points_per_route or 0))
lean = bool(args.google_lean_descriptions)
kml_bytes = build_kml(routes, media_store, max_points_per_route=max_points, lean_descriptions=lean)
if args.google_my_maps_safe:
limit = int(float(args.google_limit_mb) * 1024 * 1024)
# Google My Maps has a small KML/KMZ import ceiling. Try progressively
# smaller per-route geometry budgets until the uncompressed KML fits.
candidates = [max_points] if max_points else []
candidates += [1200, 800, 500, 300, 200, 120, 80, 50, 30]
tried = []
for candidate in candidates:
if candidate in tried:
continue
tried.append(candidate)
kb = build_kml(routes, media_store, max_points_per_route=candidate, lean_descriptions=True)
if len(kb) <= limit:
kml_bytes = kb
max_points = candidate
lean = True
report.warnings.append(f"Google My Maps safe mode: simplified KML to <= {args.google_limit_mb} MB using max {candidate} points/route and lean descriptions")
break
else:
report.warnings.append(f"Google My Maps safe mode could not reduce KML below {args.google_limit_mb} MB; output may import in Google Earth but fail in My Maps")
else:
# Warn when a user tries to feed a large one-file KMZ into My Maps.
if len(kml_bytes) > int(float(args.google_limit_mb) * 1024 * 1024):
report.warnings.append(f"Google KML is {len(kml_bytes)/1024/1024:.2f} MB uncompressed; My Maps commonly rejects KML/KMZ above {args.google_limit_mb} MB. Re-run with --google-my-maps-safe for a simplified one-file KMZ.")
kml_path = out_dir / f"{base_name}.google-earth-maps.kml"
kml_path.write_bytes(kml_bytes)
paths["google_kml"] = str(kml_path)
kmz_path = out_dir / f"{base_name}.google-earth-maps.kmz"
with zipfile.ZipFile(kmz_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
zf.writestr("doc.kml", kml_bytes)
for media_path, data in media_store.items():
zf.writestr(media_path, data)
paths["google_kmz"] = str(kmz_path)
paths["google_kml_uncompressed_mb"] = f"{len(kml_bytes)/1024/1024:.2f}"
paths["google_max_points_per_route"] = str(max_points)
paths["google_lean_descriptions"] = str(lean)
return paths
def route_manifest(r: Route, package_path: str = "") -> Dict[str, object]:
bbox = r.bbox()
return {
"id": r.id,
"name": r.name,
"kind": r.route_kind,
"points": r.point_count(),
"segments": len(r.segments),
"distance_km_approx": round(r.distance_km(), 3),
"bbox": bbox,
"source_archive": r.source_archive,
"source_hash_dir": r.source_archive_hash_dir,
"inner_path": r.inner_path,
"source_title": r.source_title,
"license": r.source_license,
"source_created_at_utc": r.source_created_at_utc,
"download_sha256": r.download_sha256,
"media_count": len(r.media),
"package_path": package_path,
}
def zip_dir_as(src_dir: Path, dest_zip: Path) -> None:
if dest_zip.exists():
dest_zip.unlink()
with zipfile.ZipFile(dest_zip, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for fp in sorted(src_dir.rglob("*")):
if fp.is_file():
zf.write(fp, fp.relative_to(src_dir).as_posix())
def write_reports(routes: Sequence[Route], report: ScanReport, out_dir: Path, base_name: str, output_paths: Dict[str, str]) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
full = {
"report": asdict(report),
"outputs": output_paths,
"routes": [route_manifest(r) for r in routes],
}
(out_dir / f"{base_name}.report.json").write_text(json.dumps(full, ensure_ascii=False, indent=2), encoding="utf-8")
with (out_dir / f"{base_name}.routes.csv").open("w", newline="", encoding="utf-8") as f:
fieldnames = [
"id", "name", "kind", "points", "segments", "distance_km_approx", "bbox",
"source_archive", "source_hash_dir", "inner_path", "source_title", "license",
"source_created_at_utc", "download_sha256", "media_count", "package_path",
]
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
for r in routes:
row = route_manifest(r)
row["bbox"] = json.dumps(row["bbox"], ensure_ascii=False)
w.writerow(row)
# ----------------------------- CLI ---------------------------------------
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Parse downloaded route archives and create OsmAnd OSF/GPX and Google KMZ/KML outputs.",
epilog=textwrap.dedent("""
Output reality:
--target osmand writes <name>.osmand-tracks.osf and <name>.all-routes.gpx
--target google writes <name>.google-earth-maps.kmz and <name>.google-earth-maps.kml
RAR extraction needs an installed backend. On Debian/Ubuntu, try:
sudo apt update && sudo apt install unrar-free p7zip-full gpsbabel
or install non-free unrar / unar if available.
"""),
)
p.add_argument("--input", required=True, help="Input directory or tar archive containing downloads_by_hash")
p.add_argument("--out", required=True, help="Output directory")
p.add_argument("--target", choices=["osmand", "google", "both"], default="both")
p.add_argument("--name", default="bg-mountain-routes", help="Base name for output files")
p.add_argument("--skip-rar", action="store_true", help="Skip RAR archives instead of trying to extract them")
p.add_argument("--scan-all", action="store_true", help="Scan every archive below input, including retry/failure duplicate folders. Default uses downloads_by_hash when present.")
p.add_argument("--strict", action="store_true", help="Exit non-zero if anything is skipped/unsupported")
p.add_argument("--keep-duplicates", action="store_true", help="Do not de-duplicate identical route geometries")
p.add_argument("--limit-archives", type=int, default=0, help="Debug/test: process only N archives")
p.add_argument("--max-text-note-bytes", type=int, default=128_000, help="Max text file size to include in metadata notes")
p.add_argument("--max-text-note-chars", type=int, default=8_000, help="Max chars per text note copied into descriptions")
p.add_argument("--max-text-notes-per-route", type=int, default=4, help="Max text notes attached to each route")
p.add_argument("--google-my-maps-safe", action="store_true", help="Try to keep the Google KMZ importable by My Maps by simplifying geometry and using lean descriptions")
p.add_argument("--google-limit-mb", type=float, default=4.8, help="Uncompressed KML size target for --google-my-maps-safe; Google documents 5 MB for KML/KMZ imports")
p.add_argument("--google-max-points-per-route", type=int, default=0, help="Limit points per route in Google KML/KMZ only; 0 keeps full geometry")
p.add_argument("--google-lean-descriptions", action="store_true", help="Do not include image previews/text note bodies in Google KML descriptions")
p.add_argument("--keep-temp", action="store_true", help="Keep temp input extraction only when manually debugging; normal cleanup still applies")
p.add_argument("--verbose", "-v", action="store_true")
return p.parse_args(argv)
def main(argv: Optional[Sequence[str]] = None) -> int:
args = parse_args(argv)
input_path = Path(args.input).expanduser().resolve()
out_dir = Path(args.out).expanduser().resolve()
out_dir.mkdir(parents=True, exist_ok=True)
report = ScanReport(input=str(input_path), generated_at_utc=now_utc())
try:
root, temp_handle = materialize_input(input_path, keep_temp=args.keep_temp)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 2
try:
routes, media_store = scan_routes(root, args, report)
if not routes:
print("ERROR: no usable GPX/KML/KMZ/GDB route geometries were imported", file=sys.stderr)
write_reports([], report, out_dir, args.name, {})
return 3 if args.strict else 2
output_paths: Dict[str, str] = {}
if args.target in {"osmand", "both"}:
output_paths.update(write_osmand_outputs(routes, media_store, out_dir, args.name, report))
if args.target in {"google", "both"}:
output_paths.update(write_google_outputs(routes, media_store, out_dir, args.name, args, report))
write_reports(routes, report, out_dir, args.name, output_paths)
print(json.dumps({
"ok": True,
"routes": len(routes),
"archives_seen": report.archives_seen,
"archives_extracted": report.archives_extracted,
"skipped": len(report.skipped),
"warnings": report.warnings,
"outputs": output_paths,
"report_json": str(out_dir / f"{args.name}.report.json"),
"routes_csv": str(out_dir / f"{args.name}.routes.csv"),
}, ensure_ascii=False, indent=2))
if args.strict and report.skipped:
return 3
return 0
finally:
if temp_handle is not None and not args.keep_temp:
temp_handle.cleanup()
if __name__ == "__main__":
raise SystemExit(main())