1112 lines
47 KiB
Python
1112 lines
47 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
route_packager.py
|
|
|
|
Parse a scraped/downloaded route archive tree and build importable route bundles:
|
|
|
|
- OsmAnd: .osf package (zip container) containing one normalized GPX per route,
|
|
plus an aggregate GPX fallback and machine-readable manifests.
|
|
- Google: .kmz package (zip container) containing a KML document with one
|
|
toggleable Folder/Placemark per route, plus optional embedded media.
|
|
|
|
Designed for trees like:
|
|
v4/downloads_by_hash/R_*/<download>.rar|zip|7z + <download>.source.json
|
|
|
|
Important reality check:
|
|
- OsmAnd can import GPX/KML/KMZ and OSF backup/package containers. This script
|
|
creates an OSF (renamed zip) with tracks and metadata, and also writes a GPX
|
|
fallback because GPX import is the most predictable OsmAnd route workflow.
|
|
- Google Maps / My Maps / Earth do not accept arbitrary custom binary route
|
|
formats. The closest binary importable bundle is KMZ, a zipped KML package.
|
|
|
|
No network required. Python stdlib only for ZIP/TAR/KMZ/KML/GPX. Optional tools:
|
|
- RAR extraction: unrar / unar / 7z / 7zz / bsdtar, or python rarfile with tool.
|
|
- Garmin GDB conversion: gpsbabel, if present.
|
|
|
|
Examples:
|
|
python3 route_packager.py --input ./v4 --out ./out --target both
|
|
python3 route_packager.py --input ./v4-scrape-675.tar.gz --out ./out --target google --keep-temp
|
|
python3 route_packager.py --input ./v4 --out ./out --target osmand --skip-rar
|
|
|
|
Exit codes:
|
|
0 = completed, possibly with skipped unsupported files unless --strict was used
|
|
2 = validation / input error
|
|
3 = strict mode detected skipped route archives or unsupported route files
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import datetime as _dt
|
|
import hashlib
|
|
import html
|
|
import io
|
|
import json
|
|
import math
|
|
import os
|
|
import posixpath
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tarfile
|
|
import tempfile
|
|
import textwrap
|
|
import zipfile
|
|
from dataclasses import dataclass, field, asdict
|
|
from pathlib import Path, PurePosixPath
|
|
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple
|
|
from xml.etree import ElementTree as ET
|
|
|
|
# ----------------------------- constants ---------------------------------
|
|
|
|
ROUTE_EXTS = {".gpx", ".kml", ".kmz", ".gdb"}
|
|
ARCHIVE_EXTS = {".zip", ".rar", ".7z"}
|
|
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tif", ".tiff"}
|
|
TEXT_EXTS = {".txt", ".md", ".csv", ".json", ".html", ".htm", ".nfo", ".log"}
|
|
GPX_NS = "http://www.topografix.com/GPX/1/1"
|
|
KML_NS = "http://www.opengis.net/kml/2.2"
|
|
ET.register_namespace("", GPX_NS)
|
|
ET.register_namespace("kml", KML_NS)
|
|
|
|
# ----------------------------- models ------------------------------------
|
|
|
|
@dataclass
|
|
class Point:
|
|
lat: float
|
|
lon: float
|
|
ele: Optional[float] = None
|
|
time: Optional[str] = None
|
|
|
|
@dataclass
|
|
class Segment:
|
|
points: List[Point] = field(default_factory=list)
|
|
|
|
@dataclass
|
|
class Route:
|
|
id: str
|
|
name: str
|
|
source_archive: str
|
|
source_archive_hash_dir: str
|
|
inner_path: str
|
|
source_title: str = ""
|
|
source_license: str = ""
|
|
source_url: str = ""
|
|
source_created_at_utc: str = ""
|
|
download_sha256: str = ""
|
|
route_kind: str = "track"
|
|
segments: List[Segment] = field(default_factory=list)
|
|
waypoints: List[Point] = field(default_factory=list)
|
|
media: List[str] = field(default_factory=list)
|
|
text_notes: List[str] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
def point_count(self) -> int:
|
|
return sum(len(s.points) for s in self.segments)
|
|
|
|
def bbox(self) -> Optional[Tuple[float, float, float, float]]:
|
|
pts = [p for s in self.segments for p in s.points] + self.waypoints
|
|
if not pts:
|
|
return None
|
|
return (min(p.lat for p in pts), min(p.lon for p in pts), max(p.lat for p in pts), max(p.lon for p in pts))
|
|
|
|
def distance_km(self) -> float:
|
|
total = 0.0
|
|
for seg in self.segments:
|
|
for a, b in zip(seg.points, seg.points[1:]):
|
|
total += haversine_km(a.lat, a.lon, b.lat, b.lon)
|
|
return total
|
|
|
|
@dataclass
|
|
class ScanReport:
|
|
input: str
|
|
generated_at_utc: str
|
|
archives_seen: int = 0
|
|
archives_extracted: int = 0
|
|
route_files_seen: int = 0
|
|
route_files_imported: int = 0
|
|
media_files_seen: int = 0
|
|
text_files_seen: int = 0
|
|
routes_written: int = 0
|
|
skipped: List[Dict[str, str]] = field(default_factory=list)
|
|
warnings: List[str] = field(default_factory=list)
|
|
|
|
# ----------------------------- utility -----------------------------------
|
|
|
|
def now_utc() -> str:
|
|
return _dt.datetime.now(_dt.timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
def sha256_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def sha1_text(s: str) -> str:
|
|
return hashlib.sha1(s.encode("utf-8", "replace")).hexdigest()
|
|
|
|
|
|
def slugify(value: str, fallback: str = "route", max_len: int = 96) -> str:
|
|
value = value.strip() or fallback
|
|
value = re.sub(r"[\\/\0]+", "-", value)
|
|
value = re.sub(r"\s+", "_", value)
|
|
# Keep Bulgarian/Cyrillic and most Unicode word chars. Remove filesystem-hostile chars.
|
|
value = re.sub(r"[^\w\-.()\u0400-\u04FF]+", "-", value, flags=re.UNICODE)
|
|
value = value.strip("._- ") or fallback
|
|
if len(value) > max_len:
|
|
value = value[:max_len].rstrip("._- ")
|
|
return value
|
|
|
|
|
|
def decode_text(data: bytes) -> str:
|
|
for enc in ("utf-8-sig", "utf-8", "cp1251", "windows-1251", "cp866", "latin-1"):
|
|
try:
|
|
return data.decode(enc)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
return data.decode("utf-8", "replace")
|
|
|
|
|
|
def strip_ns(tag: str) -> str:
|
|
return tag.rsplit("}", 1)[-1] if "}" in tag else tag
|
|
|
|
|
|
def find_child_text(el: ET.Element, names: Sequence[str]) -> str:
|
|
names_l = {n.lower() for n in names}
|
|
for child in list(el):
|
|
if strip_ns(child.tag).lower() in names_l:
|
|
return (child.text or "").strip()
|
|
return ""
|
|
|
|
|
|
def safe_relpath(name: str) -> Optional[str]:
|
|
if not name:
|
|
return None
|
|
p = PurePosixPath(name.replace("\\", "/"))
|
|
if p.is_absolute() or any(part in ("..", "") for part in p.parts):
|
|
return None
|
|
return str(p)
|
|
|
|
|
|
def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
R = 6371.0088
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlambda = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
|
|
return 2 * R * math.asin(math.sqrt(a))
|
|
|
|
|
|
def which_any(names: Sequence[str]) -> Optional[str]:
|
|
for n in names:
|
|
p = shutil.which(n)
|
|
if p:
|
|
return p
|
|
return None
|
|
|
|
# ----------------------------- metadata ----------------------------------
|
|
|
|
def load_source_meta(archive_path: Path) -> Dict[str, str]:
|
|
"""Load <archive>.source.json if present and normalize useful fields."""
|
|
meta_path = archive_path.with_name(archive_path.name + ".source.json")
|
|
if not meta_path.exists():
|
|
return {}
|
|
try:
|
|
raw = json.loads(meta_path.read_text(encoding="utf-8", errors="replace"))
|
|
except Exception as e:
|
|
return {"source_meta_error": str(e)}
|
|
|
|
title = ""
|
|
license_text = ""
|
|
url = ""
|
|
summary_items = (((raw.get("from_remote_metadata") or {}).get("jd_summary_items")) or [])
|
|
if summary_items:
|
|
item = summary_items[0]
|
|
title = (item.get("text") or "").strip()
|
|
# Common scrape text form: "Title License: Creative Commons Size: ..."
|
|
title = re.sub(r"\s*Лиценз:\s*.*$", "", title).strip()
|
|
title = re.sub(r"\s*License:\s*.*$", "", title).strip()
|
|
for link in item.get("links") or []:
|
|
if "creative" in (link.get("text") or "").lower():
|
|
license_text = link.get("text") or "Creative Commons"
|
|
if not url and link.get("href"):
|
|
url = link.get("href")
|
|
# Try additional likely fields without assuming exact scrape schema.
|
|
for k in ("source_url", "url", "page_url", "download_url"):
|
|
if raw.get(k) and not url:
|
|
url = str(raw[k])
|
|
return {
|
|
"source_title": title,
|
|
"source_license": license_text,
|
|
"source_url": url,
|
|
"source_created_at_utc": str(raw.get("created_at_utc") or ""),
|
|
"download_sha256": str(raw.get("download_file_sha256") or ""),
|
|
"download_original_filename": str(raw.get("download_original_filename") or archive_path.name),
|
|
}
|
|
|
|
# ----------------------------- archive extraction -------------------------
|
|
|
|
@dataclass
|
|
class ExtractedFile:
|
|
relpath: str
|
|
data: bytes
|
|
|
|
|
|
def iter_zip_files(path: Path) -> Iterator[ExtractedFile]:
|
|
with zipfile.ZipFile(path) as zf:
|
|
for info in zf.infolist():
|
|
if info.is_dir():
|
|
continue
|
|
rel = safe_relpath(info.filename)
|
|
if not rel:
|
|
continue
|
|
yield ExtractedFile(rel, zf.read(info))
|
|
|
|
|
|
def iter_kmz_files(data: bytes) -> Iterator[ExtractedFile]:
|
|
with zipfile.ZipFile(io.BytesIO(data)) as zf:
|
|
for info in zf.infolist():
|
|
if info.is_dir():
|
|
continue
|
|
rel = safe_relpath(info.filename)
|
|
if rel:
|
|
yield ExtractedFile(rel, zf.read(info))
|
|
|
|
|
|
def iter_rar_files(path: Path, skip_rar: bool = False) -> Iterator[ExtractedFile]:
|
|
if skip_rar:
|
|
return
|
|
try:
|
|
import rarfile # type: ignore
|
|
except ImportError as e:
|
|
raise RuntimeError("RAR support needs python package 'rarfile' plus unrar/unar/7z/bsdtar") from e
|
|
|
|
# rarfile can use unrar/unar/bsdtar. Give it the best available tool if present.
|
|
tool = which_any(["unrar", "unar", "bsdtar", "7z", "7zz", "unrar-free"])
|
|
if tool:
|
|
# rarfile expects a command name/path in these globals. It knows unrar/unar/bsdtar best.
|
|
name = Path(tool).name
|
|
if name in {"unrar", "unrar-free"}:
|
|
rarfile.UNRAR_TOOL = tool
|
|
elif name == "unar":
|
|
rarfile.UNAR_TOOL = tool
|
|
elif name == "bsdtar":
|
|
rarfile.BSDTAR_TOOL = tool
|
|
# For 7z/7zz, direct fallback below is more reliable.
|
|
|
|
if tool and Path(tool).name in {"7z", "7zz"}:
|
|
yield from iter_with_7z(path)
|
|
return
|
|
|
|
try:
|
|
with rarfile.RarFile(path) as rf:
|
|
for info in rf.infolist():
|
|
if info.isdir():
|
|
continue
|
|
rel = safe_relpath(info.filename)
|
|
if not rel:
|
|
continue
|
|
yield ExtractedFile(rel, rf.read(info))
|
|
except Exception as e:
|
|
raise RuntimeError(
|
|
f"Could not extract RAR {path.name}. Install one of: unrar, unar, bsdtar, 7z/7zz. Original error: {e}"
|
|
) from e
|
|
|
|
|
|
def iter_with_7z(path: Path) -> Iterator[ExtractedFile]:
|
|
tool = which_any(["7zz", "7z"])
|
|
if not tool:
|
|
raise RuntimeError("7z/7zz not found")
|
|
with tempfile.TemporaryDirectory(prefix="routepkg_7z_") as td:
|
|
out = Path(td) / "x"
|
|
out.mkdir()
|
|
cmd = [tool, "x", "-y", f"-o{out}", str(path)]
|
|
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
if p.returncode != 0:
|
|
raise RuntimeError(f"7z extraction failed for {path.name}: {p.stderr[-1000:] or p.stdout[-1000:]}")
|
|
for fp in out.rglob("*"):
|
|
if fp.is_file():
|
|
rel = safe_relpath(str(fp.relative_to(out).as_posix()))
|
|
if rel:
|
|
yield ExtractedFile(rel, fp.read_bytes())
|
|
|
|
|
|
def iter_archive_files(path: Path, skip_rar: bool = False) -> Iterator[ExtractedFile]:
|
|
ext = path.suffix.lower()
|
|
if ext == ".zip":
|
|
yield from iter_zip_files(path)
|
|
elif ext == ".rar":
|
|
yield from iter_rar_files(path, skip_rar=skip_rar)
|
|
elif ext == ".7z":
|
|
yield from iter_with_7z(path)
|
|
else:
|
|
raise RuntimeError(f"Unsupported archive extension: {path}")
|
|
|
|
# ----------------------------- route parsing -----------------------------
|
|
|
|
def parse_gpx(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
|
|
text = decode_text(data)
|
|
try:
|
|
root = ET.fromstring(text.encode("utf-8"))
|
|
except Exception as e:
|
|
# Some old GPX files include bad entities/control chars. Try a sanitized pass.
|
|
sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
|
|
try:
|
|
root = ET.fromstring(sanitized.encode("utf-8"))
|
|
except Exception as e2:
|
|
raise ValueError(f"GPX XML parse failed: {e2}; first error: {e}")
|
|
|
|
meta_name = ""
|
|
for el in root.iter():
|
|
if strip_ns(el.tag).lower() == "metadata":
|
|
meta_name = find_child_text(el, ["name"])
|
|
break
|
|
|
|
routes: List[Route] = []
|
|
trk_index = 0
|
|
rte_index = 0
|
|
|
|
for trk in [e for e in root.iter() if strip_ns(e.tag).lower() == "trk"]:
|
|
trk_index += 1
|
|
name = find_child_text(trk, ["name"]) or meta_name or route_base.get("fallback_name", "track")
|
|
segs: List[Segment] = []
|
|
for seg_el in [e for e in list(trk) if strip_ns(e.tag).lower() == "trkseg"]:
|
|
pts = []
|
|
for pt_el in [e for e in list(seg_el) if strip_ns(e.tag).lower() == "trkpt"]:
|
|
p = parse_point_el(pt_el)
|
|
if p:
|
|
pts.append(p)
|
|
if pts:
|
|
segs.append(Segment(pts))
|
|
if segs:
|
|
rid = build_route_id(route_base, name, f"trk{trk_index}")
|
|
routes.append(Route(id=rid, name=name, route_kind="track", segments=segs, **route_base_to_route_kwargs(route_base)))
|
|
|
|
for rte in [e for e in root.iter() if strip_ns(e.tag).lower() == "rte"]:
|
|
rte_index += 1
|
|
name = find_child_text(rte, ["name"]) or meta_name or route_base.get("fallback_name", "route")
|
|
pts = []
|
|
for pt_el in [e for e in list(rte) if strip_ns(e.tag).lower() == "rtept"]:
|
|
p = parse_point_el(pt_el)
|
|
if p:
|
|
pts.append(p)
|
|
if len(pts) >= 2:
|
|
rid = build_route_id(route_base, name, f"rte{rte_index}")
|
|
routes.append(Route(id=rid, name=name, route_kind="route", segments=[Segment(pts)], **route_base_to_route_kwargs(route_base)))
|
|
|
|
# If GPX only contains waypoints, preserve as a route object with waypoints.
|
|
wpts = []
|
|
for wpt in [e for e in list(root) if strip_ns(e.tag).lower() == "wpt"]:
|
|
p = parse_point_el(wpt)
|
|
if p:
|
|
wpts.append(p)
|
|
if wpts and not routes:
|
|
name = meta_name or route_base.get("fallback_name", "waypoints")
|
|
rid = build_route_id(route_base, name, "wpt")
|
|
routes.append(Route(id=rid, name=name, route_kind="waypoints", waypoints=wpts, **route_base_to_route_kwargs(route_base)))
|
|
|
|
if not routes:
|
|
report_warnings.append(f"No tracks/routes/waypoints found in {route_base.get('inner_path','?')}")
|
|
return routes
|
|
|
|
|
|
def parse_point_el(el: ET.Element) -> Optional[Point]:
|
|
try:
|
|
lat = float(el.attrib.get("lat", ""))
|
|
lon = float(el.attrib.get("lon", ""))
|
|
except ValueError:
|
|
return None
|
|
ele = None
|
|
time = None
|
|
for ch in list(el):
|
|
lname = strip_ns(ch.tag).lower()
|
|
if lname == "ele":
|
|
try:
|
|
ele = float((ch.text or "").strip())
|
|
except ValueError:
|
|
pass
|
|
elif lname == "time":
|
|
time = (ch.text or "").strip() or None
|
|
return Point(lat=lat, lon=lon, ele=ele, time=time)
|
|
|
|
|
|
def parse_kml(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
|
|
text = decode_text(data)
|
|
try:
|
|
root = ET.fromstring(text.encode("utf-8"))
|
|
except Exception as e:
|
|
raise ValueError(f"KML XML parse failed: {e}")
|
|
|
|
routes: List[Route] = []
|
|
placemarks = [e for e in root.iter() if strip_ns(e.tag).lower() == "placemark"]
|
|
idx = 0
|
|
for pm in placemarks:
|
|
name = find_child_text(pm, ["name"]) or route_base.get("fallback_name", "kml-route")
|
|
for line in [e for e in pm.iter() if strip_ns(e.tag).lower() == "linestring"]:
|
|
coord_text = ""
|
|
for ch in list(line):
|
|
if strip_ns(ch.tag).lower() == "coordinates":
|
|
coord_text = ch.text or ""
|
|
break
|
|
pts = parse_kml_coordinates(coord_text)
|
|
if len(pts) >= 2:
|
|
idx += 1
|
|
rid = build_route_id(route_base, name, f"kml{idx}")
|
|
routes.append(Route(id=rid, name=name, route_kind="kml-linestring", segments=[Segment(pts)], **route_base_to_route_kwargs(route_base)))
|
|
if not routes:
|
|
report_warnings.append(f"No KML LineString routes found in {route_base.get('inner_path','?')}")
|
|
return routes
|
|
|
|
|
|
def parse_kml_coordinates(coord_text: str) -> List[Point]:
|
|
pts: List[Point] = []
|
|
for token in coord_text.replace("\n", " ").replace("\t", " ").split():
|
|
parts = token.split(",")
|
|
if len(parts) < 2:
|
|
continue
|
|
try:
|
|
lon = float(parts[0])
|
|
lat = float(parts[1])
|
|
ele = float(parts[2]) if len(parts) > 2 and parts[2] != "" else None
|
|
pts.append(Point(lat=lat, lon=lon, ele=ele))
|
|
except ValueError:
|
|
continue
|
|
return pts
|
|
|
|
|
|
def parse_kmz(data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> Tuple[List[Route], List[ExtractedFile]]:
|
|
routes: List[Route] = []
|
|
media: List[ExtractedFile] = []
|
|
for f in iter_kmz_files(data):
|
|
ext = Path(f.relpath).suffix.lower()
|
|
if ext == ".kml":
|
|
rb = dict(route_base)
|
|
rb["inner_path"] = route_base.get("inner_path", "") + "!" + f.relpath
|
|
rb["fallback_name"] = Path(f.relpath).stem
|
|
routes.extend(parse_kml(f.data, rb, report_warnings))
|
|
elif ext in IMAGE_EXTS:
|
|
media.append(f)
|
|
return routes, media
|
|
|
|
|
|
def convert_gdb_to_gpx(gdb_data: bytes, route_base: Dict[str, str], report_warnings: List[str]) -> List[Route]:
|
|
gpsbabel = shutil.which("gpsbabel")
|
|
if not gpsbabel:
|
|
raise RuntimeError("GDB file found but gpsbabel is not installed; install gpsbabel or keep the source GPX/KML files only")
|
|
with tempfile.TemporaryDirectory(prefix="routepkg_gdb_") as td:
|
|
src = Path(td) / "in.gdb"
|
|
dst = Path(td) / "out.gpx"
|
|
src.write_bytes(gdb_data)
|
|
cmd = [gpsbabel, "-i", "gdb", "-f", str(src), "-o", "gpx", "-F", str(dst)]
|
|
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
if p.returncode != 0 or not dst.exists():
|
|
raise RuntimeError(f"gpsbabel failed: {p.stderr[-1000:] or p.stdout[-1000:]}")
|
|
return parse_gpx(dst.read_bytes(), route_base, report_warnings)
|
|
|
|
|
|
def route_base_to_route_kwargs(base: Dict[str, str]) -> Dict[str, str]:
|
|
return {
|
|
"source_archive": base.get("source_archive", ""),
|
|
"source_archive_hash_dir": base.get("source_archive_hash_dir", ""),
|
|
"inner_path": base.get("inner_path", ""),
|
|
"source_title": base.get("source_title", ""),
|
|
"source_license": base.get("source_license", ""),
|
|
"source_url": base.get("source_url", ""),
|
|
"source_created_at_utc": base.get("source_created_at_utc", ""),
|
|
"download_sha256": base.get("download_sha256", ""),
|
|
}
|
|
|
|
|
|
def build_route_id(base: Dict[str, str], name: str, suffix: str) -> str:
|
|
seed = "|".join([
|
|
base.get("source_archive_hash_dir", ""),
|
|
base.get("source_archive", ""),
|
|
base.get("inner_path", ""),
|
|
name,
|
|
suffix,
|
|
])
|
|
return sha1_text(seed)[:16]
|
|
|
|
# ----------------------------- scanning ----------------------------------
|
|
|
|
def materialize_input(input_path: Path, keep_temp: bool = False) -> Tuple[Path, Optional[tempfile.TemporaryDirectory]]:
|
|
if input_path.is_dir():
|
|
return input_path, None
|
|
# Accept tar even when extension lies (.tar.gz that is actually plain tar).
|
|
if tarfile.is_tarfile(input_path):
|
|
td = tempfile.TemporaryDirectory(prefix="routepkg_input_")
|
|
root = Path(td.name)
|
|
with tarfile.open(input_path, mode="r:*") as tf:
|
|
# Safe tar extraction.
|
|
for m in tf.getmembers():
|
|
rel = safe_relpath(m.name)
|
|
if not rel:
|
|
continue
|
|
target = root / rel
|
|
if m.isdir():
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
elif m.isfile():
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
f = tf.extractfile(m)
|
|
if f is not None:
|
|
target.write_bytes(f.read())
|
|
return root, td
|
|
raise ValueError(f"Input is neither directory nor tar archive: {input_path}")
|
|
|
|
|
|
def find_route_archives(root: Path, scan_all: bool = False) -> List[Path]:
|
|
archives = []
|
|
search_root = root
|
|
# Scrape bundles often contain retry/failure snapshots that duplicate downloads_by_hash.
|
|
# By default, process the canonical downloads_by_hash tree if it exists.
|
|
canonical = root / "downloads_by_hash"
|
|
if canonical.exists() and canonical.is_dir() and not scan_all:
|
|
search_root = canonical
|
|
elif not scan_all:
|
|
# Tar inputs often extract to a single top-level folder, e.g. ./v4/downloads_by_hash.
|
|
candidates = [p / "downloads_by_hash" for p in root.iterdir() if p.is_dir()] if root.exists() else []
|
|
candidates = [p for p in candidates if p.exists() and p.is_dir()]
|
|
if len(candidates) == 1:
|
|
search_root = candidates[0]
|
|
for p in search_root.rglob("*"):
|
|
if p.is_file() and p.suffix.lower() in ARCHIVE_EXTS:
|
|
archives.append(p)
|
|
return sorted(archives)
|
|
|
|
|
|
def scan_routes(root: Path, args: argparse.Namespace, report: ScanReport) -> Tuple[List[Route], Dict[str, bytes]]:
|
|
routes: List[Route] = []
|
|
media_store: Dict[str, bytes] = {}
|
|
archives = find_route_archives(root, scan_all=args.scan_all)
|
|
report.archives_seen = len(archives)
|
|
|
|
for i, ap in enumerate(archives, 1):
|
|
if args.limit_archives and i > args.limit_archives:
|
|
break
|
|
if args.skip_rar and ap.suffix.lower() == ".rar":
|
|
report.skipped.append({"path": str(ap), "reason": "RAR skipped by --skip-rar"})
|
|
continue
|
|
meta = load_source_meta(ap)
|
|
hash_dir = ap.parent.name
|
|
archive_route_count_before = len(routes)
|
|
archive_media_paths: List[str] = []
|
|
archive_text_notes: List[str] = []
|
|
archive_warnings: List[str] = []
|
|
try:
|
|
extracted = list(iter_archive_files(ap, skip_rar=args.skip_rar))
|
|
report.archives_extracted += 1
|
|
except Exception as e:
|
|
report.skipped.append({"path": str(ap), "reason": str(e)})
|
|
continue
|
|
|
|
# First collect media/text so route descriptions can reference them.
|
|
for f in extracted:
|
|
ext = Path(f.relpath).suffix.lower()
|
|
if ext in IMAGE_EXTS:
|
|
report.media_files_seen += 1
|
|
media_key = f"media/{slugify(hash_dir)}/{slugify(f.relpath, fallback='image')}"
|
|
media_store[media_key] = f.data
|
|
archive_media_paths.append(media_key)
|
|
elif ext in TEXT_EXTS and len(f.data) <= args.max_text_note_bytes:
|
|
report.text_files_seen += 1
|
|
txt = decode_text(f.data).strip()
|
|
if txt:
|
|
archive_text_notes.append(f"[{f.relpath}]\n{txt[:args.max_text_note_chars]}")
|
|
|
|
for f in extracted:
|
|
ext = Path(f.relpath).suffix.lower()
|
|
if ext not in ROUTE_EXTS:
|
|
continue
|
|
report.route_files_seen += 1
|
|
base = {
|
|
"source_archive": ap.name,
|
|
"source_archive_hash_dir": hash_dir,
|
|
"inner_path": f.relpath,
|
|
"fallback_name": Path(f.relpath).stem or ap.stem,
|
|
**meta,
|
|
}
|
|
try:
|
|
parsed: List[Route]
|
|
if ext == ".gpx":
|
|
parsed = parse_gpx(f.data, base, archive_warnings)
|
|
elif ext == ".kml":
|
|
parsed = parse_kml(f.data, base, archive_warnings)
|
|
elif ext == ".kmz":
|
|
parsed, kmz_media = parse_kmz(f.data, base, archive_warnings)
|
|
for mf in kmz_media:
|
|
media_key = f"media/{slugify(hash_dir)}/{slugify(f.relpath)}__{slugify(mf.relpath, fallback='image')}"
|
|
media_store[media_key] = mf.data
|
|
archive_media_paths.append(media_key)
|
|
report.media_files_seen += 1
|
|
elif ext == ".gdb":
|
|
parsed = convert_gdb_to_gpx(f.data, base, archive_warnings)
|
|
else:
|
|
parsed = []
|
|
for r in parsed:
|
|
r.media.extend(sorted(set(archive_media_paths)))
|
|
r.text_notes.extend(archive_text_notes[: args.max_text_notes_per_route])
|
|
r.warnings.extend(archive_warnings)
|
|
if r.point_count() or r.waypoints:
|
|
routes.append(r)
|
|
report.route_files_imported += 1
|
|
except Exception as e:
|
|
report.skipped.append({"path": f"{ap}::{f.relpath}", "reason": str(e)})
|
|
|
|
if args.verbose:
|
|
added = len(routes) - archive_route_count_before
|
|
print(f"[{i}/{len(archives)}] {ap.name}: +{added} routes", file=sys.stderr)
|
|
|
|
# De-duplicate exact geometry+name-ish collisions.
|
|
deduped: List[Route] = []
|
|
seen: set[str] = set()
|
|
for r in routes:
|
|
geom_sig = geometry_signature(r)
|
|
if geom_sig in seen and not args.keep_duplicates:
|
|
continue
|
|
seen.add(geom_sig)
|
|
deduped.append(r)
|
|
if len(deduped) != len(routes):
|
|
report.warnings.append(f"Deduplicated {len(routes) - len(deduped)} duplicate route geometries")
|
|
report.routes_written = len(deduped)
|
|
return deduped, media_store
|
|
|
|
|
|
def geometry_signature(r: Route) -> str:
|
|
h = hashlib.sha1()
|
|
h.update(r.name.strip().lower().encode("utf-8", "replace"))
|
|
for seg in r.segments:
|
|
h.update(b"|")
|
|
for p in seg.points[:: max(1, len(seg.points)//200)]:
|
|
h.update(f"{p.lat:.6f},{p.lon:.6f};".encode())
|
|
for p in r.waypoints[:200]:
|
|
h.update(f"w{p.lat:.6f},{p.lon:.6f};".encode())
|
|
return h.hexdigest()
|
|
|
|
# ----------------------------- output GPX --------------------------------
|
|
|
|
def route_to_gpx_tree(route: Route) -> ET.ElementTree:
|
|
root = ET.Element(f"{{{GPX_NS}}}gpx", attrib={
|
|
"version": "1.1",
|
|
"creator": "route_packager.py",
|
|
})
|
|
meta = ET.SubElement(root, f"{{{GPX_NS}}}metadata")
|
|
ET.SubElement(meta, f"{{{GPX_NS}}}name").text = route.name
|
|
desc = build_plain_description(route)
|
|
if desc:
|
|
ET.SubElement(meta, f"{{{GPX_NS}}}desc").text = desc
|
|
|
|
for wp in route.waypoints:
|
|
w = ET.SubElement(root, f"{{{GPX_NS}}}wpt", attrib={"lat": f"{wp.lat:.8f}", "lon": f"{wp.lon:.8f}"})
|
|
if wp.ele is not None:
|
|
ET.SubElement(w, f"{{{GPX_NS}}}ele").text = f"{wp.ele:.2f}"
|
|
if wp.time:
|
|
ET.SubElement(w, f"{{{GPX_NS}}}time").text = wp.time
|
|
|
|
trk = ET.SubElement(root, f"{{{GPX_NS}}}trk")
|
|
ET.SubElement(trk, f"{{{GPX_NS}}}name").text = route.name
|
|
ET.SubElement(trk, f"{{{GPX_NS}}}desc").text = desc
|
|
for seg in route.segments:
|
|
seg_el = ET.SubElement(trk, f"{{{GPX_NS}}}trkseg")
|
|
for p in seg.points:
|
|
pt = ET.SubElement(seg_el, f"{{{GPX_NS}}}trkpt", attrib={"lat": f"{p.lat:.8f}", "lon": f"{p.lon:.8f}"})
|
|
if p.ele is not None:
|
|
ET.SubElement(pt, f"{{{GPX_NS}}}ele").text = f"{p.ele:.2f}"
|
|
if p.time:
|
|
ET.SubElement(pt, f"{{{GPX_NS}}}time").text = p.time
|
|
return ET.ElementTree(root)
|
|
|
|
|
|
def write_xml_tree(tree: ET.ElementTree, path: Path) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tree.write(path, encoding="utf-8", xml_declaration=True, short_empty_elements=True)
|
|
|
|
|
|
def build_aggregate_gpx(routes: Sequence[Route]) -> ET.ElementTree:
|
|
root = ET.Element(f"{{{GPX_NS}}}gpx", attrib={"version": "1.1", "creator": "route_packager.py"})
|
|
meta = ET.SubElement(root, f"{{{GPX_NS}}}metadata")
|
|
ET.SubElement(meta, f"{{{GPX_NS}}}name").text = "Bulgarian mountain routes aggregate"
|
|
ET.SubElement(meta, f"{{{GPX_NS}}}desc").text = f"Generated {now_utc()} with {len(routes)} routes."
|
|
for r in routes:
|
|
trk = ET.SubElement(root, f"{{{GPX_NS}}}trk")
|
|
ET.SubElement(trk, f"{{{GPX_NS}}}name").text = r.name
|
|
ET.SubElement(trk, f"{{{GPX_NS}}}desc").text = build_plain_description(r)
|
|
for seg in r.segments:
|
|
seg_el = ET.SubElement(trk, f"{{{GPX_NS}}}trkseg")
|
|
for p in seg.points:
|
|
pt = ET.SubElement(seg_el, f"{{{GPX_NS}}}trkpt", attrib={"lat": f"{p.lat:.8f}", "lon": f"{p.lon:.8f}"})
|
|
if p.ele is not None:
|
|
ET.SubElement(pt, f"{{{GPX_NS}}}ele").text = f"{p.ele:.2f}"
|
|
if p.time:
|
|
ET.SubElement(pt, f"{{{GPX_NS}}}time").text = p.time
|
|
for wp in r.waypoints:
|
|
w = ET.SubElement(root, f"{{{GPX_NS}}}wpt", attrib={"lat": f"{wp.lat:.8f}", "lon": f"{wp.lon:.8f}"})
|
|
ET.SubElement(w, f"{{{GPX_NS}}}name").text = r.name
|
|
return ET.ElementTree(root)
|
|
|
|
# ----------------------------- output KML/KMZ ----------------------------
|
|
|
|
def limit_points(points: Sequence[Point], max_points: int) -> List[Point]:
|
|
if max_points <= 0 or len(points) <= max_points:
|
|
return list(points)
|
|
if max_points <= 2:
|
|
return [points[0], points[-1]]
|
|
step = (len(points) - 1) / float(max_points - 1)
|
|
picked = [points[round(i * step)] for i in range(max_points)]
|
|
picked[0] = points[0]
|
|
picked[-1] = points[-1]
|
|
# Remove accidental duplicates from rounding while preserving order.
|
|
out: List[Point] = []
|
|
last_key = None
|
|
for p in picked:
|
|
key = (p.lat, p.lon, p.ele, p.time)
|
|
if key != last_key:
|
|
out.append(p)
|
|
last_key = key
|
|
return out
|
|
|
|
|
|
def kml_segments(route: Route, max_points_per_route: int = 0) -> List[List[Point]]:
|
|
if max_points_per_route <= 0:
|
|
return [list(s.points) for s in route.segments]
|
|
total = max(1, route.point_count())
|
|
out: List[List[Point]] = []
|
|
for seg in route.segments:
|
|
share = max(2, int(max_points_per_route * (len(seg.points) / total)))
|
|
out.append(limit_points(seg.points, share))
|
|
return out
|
|
|
|
|
|
def build_kml(routes: Sequence[Route], media_store: Dict[str, bytes], max_points_per_route: int = 0, lean_descriptions: bool = False) -> bytes:
|
|
kml = ET.Element(f"{{{KML_NS}}}kml")
|
|
doc = ET.SubElement(kml, f"{{{KML_NS}}}Document")
|
|
ET.SubElement(doc, f"{{{KML_NS}}}name").text = "Bulgarian mountain routes"
|
|
ET.SubElement(doc, f"{{{KML_NS}}}description").text = f"Generated {now_utc()} from scraped route archives."
|
|
|
|
style = ET.SubElement(doc, f"{{{KML_NS}}}Style", id="routeLine")
|
|
line = ET.SubElement(style, f"{{{KML_NS}}}LineStyle")
|
|
ET.SubElement(line, f"{{{KML_NS}}}color").text = "ff0066cc"
|
|
ET.SubElement(line, f"{{{KML_NS}}}width").text = "4"
|
|
|
|
folder = ET.SubElement(doc, f"{{{KML_NS}}}Folder")
|
|
ET.SubElement(folder, f"{{{KML_NS}}}name").text = "Routes"
|
|
ET.SubElement(folder, f"{{{KML_NS}}}open").text = "0"
|
|
|
|
for r in routes:
|
|
pm = ET.SubElement(folder, f"{{{KML_NS}}}Placemark")
|
|
ET.SubElement(pm, f"{{{KML_NS}}}name").text = r.name
|
|
ET.SubElement(pm, f"{{{KML_NS}}}visibility").text = "1"
|
|
ET.SubElement(pm, f"{{{KML_NS}}}styleUrl").text = "#routeLine"
|
|
ET.SubElement(pm, f"{{{KML_NS}}}description").text = build_html_description(r, lean=lean_descriptions)
|
|
segs_for_google = kml_segments(r, max_points_per_route=max_points_per_route)
|
|
if len(segs_for_google) > 1:
|
|
multi = ET.SubElement(pm, f"{{{KML_NS}}}MultiGeometry")
|
|
for pts in segs_for_google:
|
|
append_kml_linestring(multi, pts)
|
|
elif segs_for_google:
|
|
append_kml_linestring(pm, segs_for_google[0])
|
|
elif r.waypoints:
|
|
# For waypoint-only GPX, emit the first point as Point and leave all points in desc.
|
|
p = r.waypoints[0]
|
|
point = ET.SubElement(pm, f"{{{KML_NS}}}Point")
|
|
ET.SubElement(point, f"{{{KML_NS}}}coordinates").text = f"{p.lon:.8f},{p.lat:.8f},{p.ele or 0:.2f}"
|
|
|
|
buf = io.BytesIO()
|
|
ET.ElementTree(kml).write(buf, encoding="utf-8", xml_declaration=True, short_empty_elements=True)
|
|
return buf.getvalue()
|
|
|
|
|
|
def append_kml_linestring(parent: ET.Element, pts: Sequence[Point]) -> None:
|
|
ls = ET.SubElement(parent, f"{{{KML_NS}}}LineString")
|
|
ET.SubElement(ls, f"{{{KML_NS}}}tessellate").text = "1"
|
|
coords = " ".join(f"{p.lon:.8f},{p.lat:.8f},{p.ele if p.ele is not None else 0:.2f}" for p in pts)
|
|
ET.SubElement(ls, f"{{{KML_NS}}}coordinates").text = coords
|
|
|
|
|
|
def build_plain_description(route: Route) -> str:
|
|
rows = []
|
|
if route.source_title:
|
|
rows.append(f"Title: {route.source_title}")
|
|
rows.append(f"Source archive: {route.source_archive}")
|
|
rows.append(f"Inner file: {route.inner_path}")
|
|
if route.source_license:
|
|
rows.append(f"License: {route.source_license}")
|
|
if route.source_created_at_utc:
|
|
rows.append(f"Scrape created: {route.source_created_at_utc}")
|
|
if route.download_sha256:
|
|
rows.append(f"Download SHA256: {route.download_sha256}")
|
|
rows.append(f"Points: {route.point_count()}")
|
|
rows.append(f"Distance km approx: {route.distance_km():.2f}")
|
|
if route.media:
|
|
rows.append("Media: " + ", ".join(route.media[:12]) + (" ..." if len(route.media) > 12 else ""))
|
|
if route.text_notes:
|
|
rows.append("Text notes:\n" + "\n\n".join(route.text_notes[:3]))
|
|
if route.warnings:
|
|
rows.append("Warnings: " + "; ".join(sorted(set(route.warnings))[:5]))
|
|
return "\n".join(rows)
|
|
|
|
|
|
def build_html_description(route: Route, lean: bool = False) -> str:
|
|
def row(k: str, v: str) -> str:
|
|
return f"<tr><th align='left'>{html.escape(k)}</th><td>{html.escape(v)}</td></tr>"
|
|
rows = []
|
|
if route.source_title:
|
|
rows.append(row("Source title", route.source_title))
|
|
rows.append(row("Source archive", route.source_archive))
|
|
rows.append(row("Inner file", route.inner_path))
|
|
if route.source_license:
|
|
rows.append(row("License", route.source_license))
|
|
if route.source_created_at_utc:
|
|
rows.append(row("Scraped", route.source_created_at_utc))
|
|
rows.append(row("Points", str(route.point_count())))
|
|
rows.append(row("Approx distance", f"{route.distance_km():.2f} km"))
|
|
if route.download_sha256:
|
|
rows.append(row("Download SHA256", route.download_sha256))
|
|
media_html = ""
|
|
if not lean:
|
|
media_iter = route.media[:8]
|
|
else:
|
|
media_iter = []
|
|
for m in media_iter:
|
|
if Path(m).suffix.lower() in IMAGE_EXTS:
|
|
media_html += f"<p><img src='{html.escape(m)}' width='420'/><br/>{html.escape(m)}</p>"
|
|
notes = ""
|
|
if route.text_notes and not lean:
|
|
notes = "<h3>Text notes</h3>" + "".join(f"<pre>{html.escape(n)}</pre>" for n in route.text_notes[:3])
|
|
return f"<table>{''.join(rows)}</table>{media_html}{notes}"
|
|
|
|
# ----------------------------- writers -----------------------------------
|
|
|
|
def write_osmand_outputs(routes: Sequence[Route], media_store: Dict[str, bytes], out_dir: Path, base_name: str, report: ScanReport) -> Dict[str, str]:
|
|
paths: Dict[str, str] = {}
|
|
tracks_dir = out_dir / "osmand_tracks"
|
|
if tracks_dir.exists():
|
|
shutil.rmtree(tracks_dir)
|
|
tracks_dir.mkdir(parents=True)
|
|
|
|
manifest_routes = []
|
|
used_names: Dict[str, int] = {}
|
|
for r in routes:
|
|
stem = slugify(r.name, fallback=r.id)
|
|
used_names[stem] = used_names.get(stem, 0) + 1
|
|
if used_names[stem] > 1:
|
|
stem = f"{stem}_{r.id}"
|
|
rel = Path("tracks") / f"{stem}.gpx"
|
|
write_xml_tree(route_to_gpx_tree(r), tracks_dir / rel)
|
|
manifest_routes.append(route_manifest(r, str(rel).replace(os.sep, "/")))
|
|
|
|
aggregate_gpx = out_dir / f"{base_name}.all-routes.gpx"
|
|
write_xml_tree(build_aggregate_gpx(routes), aggregate_gpx)
|
|
paths["osmand_gpx_fallback"] = str(aggregate_gpx)
|
|
|
|
manifest = {
|
|
"type": "route-packager-osmand",
|
|
"generated_at_utc": report.generated_at_utc,
|
|
"route_count": len(routes),
|
|
"routes": manifest_routes,
|
|
"notes": [
|
|
"This .osf is a zip-style OsmAnd package containing GPX tracks.",
|
|
"If your OsmAnd build refuses package import, import the .all-routes.gpx fallback or unzip tracks/*.gpx.",
|
|
],
|
|
}
|
|
(tracks_dir / "manifest.json").write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
# A lightweight items.json helps plugin-style OSF importers identify contents, while not harming zip usage.
|
|
items = {
|
|
"items": [{"type": "tracks", "path": m["package_path"], "name": m["name"]} for m in manifest_routes]
|
|
}
|
|
(tracks_dir / "items.json").write_text(json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
for media_path, data in media_store.items():
|
|
dest = tracks_dir / media_path
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_bytes(data)
|
|
|
|
osf_path = out_dir / f"{base_name}.osmand-tracks.osf"
|
|
zip_dir_as(tracks_dir, osf_path)
|
|
paths["osmand_osf"] = str(osf_path)
|
|
return paths
|
|
|
|
|
|
def write_google_outputs(routes: Sequence[Route], media_store: Dict[str, bytes], out_dir: Path, base_name: str, args: argparse.Namespace, report: ScanReport) -> Dict[str, str]:
|
|
paths: Dict[str, str] = {}
|
|
max_points = max(0, int(args.google_max_points_per_route or 0))
|
|
lean = bool(args.google_lean_descriptions)
|
|
kml_bytes = build_kml(routes, media_store, max_points_per_route=max_points, lean_descriptions=lean)
|
|
|
|
if args.google_my_maps_safe:
|
|
limit = int(float(args.google_limit_mb) * 1024 * 1024)
|
|
# Google My Maps has a small KML/KMZ import ceiling. Try progressively
|
|
# smaller per-route geometry budgets until the uncompressed KML fits.
|
|
candidates = [max_points] if max_points else []
|
|
candidates += [1200, 800, 500, 300, 200, 120, 80, 50, 30]
|
|
tried = []
|
|
for candidate in candidates:
|
|
if candidate in tried:
|
|
continue
|
|
tried.append(candidate)
|
|
kb = build_kml(routes, media_store, max_points_per_route=candidate, lean_descriptions=True)
|
|
if len(kb) <= limit:
|
|
kml_bytes = kb
|
|
max_points = candidate
|
|
lean = True
|
|
report.warnings.append(f"Google My Maps safe mode: simplified KML to <= {args.google_limit_mb} MB using max {candidate} points/route and lean descriptions")
|
|
break
|
|
else:
|
|
report.warnings.append(f"Google My Maps safe mode could not reduce KML below {args.google_limit_mb} MB; output may import in Google Earth but fail in My Maps")
|
|
else:
|
|
# Warn when a user tries to feed a large one-file KMZ into My Maps.
|
|
if len(kml_bytes) > int(float(args.google_limit_mb) * 1024 * 1024):
|
|
report.warnings.append(f"Google KML is {len(kml_bytes)/1024/1024:.2f} MB uncompressed; My Maps commonly rejects KML/KMZ above {args.google_limit_mb} MB. Re-run with --google-my-maps-safe for a simplified one-file KMZ.")
|
|
|
|
kml_path = out_dir / f"{base_name}.google-earth-maps.kml"
|
|
kml_path.write_bytes(kml_bytes)
|
|
paths["google_kml"] = str(kml_path)
|
|
kmz_path = out_dir / f"{base_name}.google-earth-maps.kmz"
|
|
with zipfile.ZipFile(kmz_path, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
|
|
zf.writestr("doc.kml", kml_bytes)
|
|
for media_path, data in media_store.items():
|
|
zf.writestr(media_path, data)
|
|
paths["google_kmz"] = str(kmz_path)
|
|
paths["google_kml_uncompressed_mb"] = f"{len(kml_bytes)/1024/1024:.2f}"
|
|
paths["google_max_points_per_route"] = str(max_points)
|
|
paths["google_lean_descriptions"] = str(lean)
|
|
return paths
|
|
|
|
|
|
def route_manifest(r: Route, package_path: str = "") -> Dict[str, object]:
|
|
bbox = r.bbox()
|
|
return {
|
|
"id": r.id,
|
|
"name": r.name,
|
|
"kind": r.route_kind,
|
|
"points": r.point_count(),
|
|
"segments": len(r.segments),
|
|
"distance_km_approx": round(r.distance_km(), 3),
|
|
"bbox": bbox,
|
|
"source_archive": r.source_archive,
|
|
"source_hash_dir": r.source_archive_hash_dir,
|
|
"inner_path": r.inner_path,
|
|
"source_title": r.source_title,
|
|
"license": r.source_license,
|
|
"source_created_at_utc": r.source_created_at_utc,
|
|
"download_sha256": r.download_sha256,
|
|
"media_count": len(r.media),
|
|
"package_path": package_path,
|
|
}
|
|
|
|
|
|
def zip_dir_as(src_dir: Path, dest_zip: Path) -> None:
|
|
if dest_zip.exists():
|
|
dest_zip.unlink()
|
|
with zipfile.ZipFile(dest_zip, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
|
|
for fp in sorted(src_dir.rglob("*")):
|
|
if fp.is_file():
|
|
zf.write(fp, fp.relative_to(src_dir).as_posix())
|
|
|
|
|
|
def write_reports(routes: Sequence[Route], report: ScanReport, out_dir: Path, base_name: str, output_paths: Dict[str, str]) -> None:
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
full = {
|
|
"report": asdict(report),
|
|
"outputs": output_paths,
|
|
"routes": [route_manifest(r) for r in routes],
|
|
}
|
|
(out_dir / f"{base_name}.report.json").write_text(json.dumps(full, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
with (out_dir / f"{base_name}.routes.csv").open("w", newline="", encoding="utf-8") as f:
|
|
fieldnames = [
|
|
"id", "name", "kind", "points", "segments", "distance_km_approx", "bbox",
|
|
"source_archive", "source_hash_dir", "inner_path", "source_title", "license",
|
|
"source_created_at_utc", "download_sha256", "media_count", "package_path",
|
|
]
|
|
w = csv.DictWriter(f, fieldnames=fieldnames)
|
|
w.writeheader()
|
|
for r in routes:
|
|
row = route_manifest(r)
|
|
row["bbox"] = json.dumps(row["bbox"], ensure_ascii=False)
|
|
w.writerow(row)
|
|
|
|
# ----------------------------- CLI ---------------------------------------
|
|
|
|
def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description="Parse downloaded route archives and create OsmAnd OSF/GPX and Google KMZ/KML outputs.",
|
|
epilog=textwrap.dedent("""
|
|
Output reality:
|
|
--target osmand writes <name>.osmand-tracks.osf and <name>.all-routes.gpx
|
|
--target google writes <name>.google-earth-maps.kmz and <name>.google-earth-maps.kml
|
|
|
|
RAR extraction needs an installed backend. On Debian/Ubuntu, try:
|
|
sudo apt update && sudo apt install unrar-free p7zip-full gpsbabel
|
|
or install non-free unrar / unar if available.
|
|
"""),
|
|
)
|
|
p.add_argument("--input", required=True, help="Input directory or tar archive containing downloads_by_hash")
|
|
p.add_argument("--out", required=True, help="Output directory")
|
|
p.add_argument("--target", choices=["osmand", "google", "both"], default="both")
|
|
p.add_argument("--name", default="bg-mountain-routes", help="Base name for output files")
|
|
p.add_argument("--skip-rar", action="store_true", help="Skip RAR archives instead of trying to extract them")
|
|
p.add_argument("--scan-all", action="store_true", help="Scan every archive below input, including retry/failure duplicate folders. Default uses downloads_by_hash when present.")
|
|
p.add_argument("--strict", action="store_true", help="Exit non-zero if anything is skipped/unsupported")
|
|
p.add_argument("--keep-duplicates", action="store_true", help="Do not de-duplicate identical route geometries")
|
|
p.add_argument("--limit-archives", type=int, default=0, help="Debug/test: process only N archives")
|
|
p.add_argument("--max-text-note-bytes", type=int, default=128_000, help="Max text file size to include in metadata notes")
|
|
p.add_argument("--max-text-note-chars", type=int, default=8_000, help="Max chars per text note copied into descriptions")
|
|
p.add_argument("--max-text-notes-per-route", type=int, default=4, help="Max text notes attached to each route")
|
|
p.add_argument("--google-my-maps-safe", action="store_true", help="Try to keep the Google KMZ importable by My Maps by simplifying geometry and using lean descriptions")
|
|
p.add_argument("--google-limit-mb", type=float, default=4.8, help="Uncompressed KML size target for --google-my-maps-safe; Google documents 5 MB for KML/KMZ imports")
|
|
p.add_argument("--google-max-points-per-route", type=int, default=0, help="Limit points per route in Google KML/KMZ only; 0 keeps full geometry")
|
|
p.add_argument("--google-lean-descriptions", action="store_true", help="Do not include image previews/text note bodies in Google KML descriptions")
|
|
p.add_argument("--keep-temp", action="store_true", help="Keep temp input extraction only when manually debugging; normal cleanup still applies")
|
|
p.add_argument("--verbose", "-v", action="store_true")
|
|
return p.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[Sequence[str]] = None) -> int:
|
|
args = parse_args(argv)
|
|
input_path = Path(args.input).expanduser().resolve()
|
|
out_dir = Path(args.out).expanduser().resolve()
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
report = ScanReport(input=str(input_path), generated_at_utc=now_utc())
|
|
try:
|
|
root, temp_handle = materialize_input(input_path, keep_temp=args.keep_temp)
|
|
except Exception as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
return 2
|
|
|
|
try:
|
|
routes, media_store = scan_routes(root, args, report)
|
|
if not routes:
|
|
print("ERROR: no usable GPX/KML/KMZ/GDB route geometries were imported", file=sys.stderr)
|
|
write_reports([], report, out_dir, args.name, {})
|
|
return 3 if args.strict else 2
|
|
|
|
output_paths: Dict[str, str] = {}
|
|
if args.target in {"osmand", "both"}:
|
|
output_paths.update(write_osmand_outputs(routes, media_store, out_dir, args.name, report))
|
|
if args.target in {"google", "both"}:
|
|
output_paths.update(write_google_outputs(routes, media_store, out_dir, args.name, args, report))
|
|
write_reports(routes, report, out_dir, args.name, output_paths)
|
|
|
|
print(json.dumps({
|
|
"ok": True,
|
|
"routes": len(routes),
|
|
"archives_seen": report.archives_seen,
|
|
"archives_extracted": report.archives_extracted,
|
|
"skipped": len(report.skipped),
|
|
"warnings": report.warnings,
|
|
"outputs": output_paths,
|
|
"report_json": str(out_dir / f"{args.name}.report.json"),
|
|
"routes_csv": str(out_dir / f"{args.name}.routes.csv"),
|
|
}, ensure_ascii=False, indent=2))
|
|
|
|
if args.strict and report.skipped:
|
|
return 3
|
|
return 0
|
|
finally:
|
|
if temp_handle is not None and not args.keep_temp:
|
|
temp_handle.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|