v4.0.1 Conversion
This commit is contained in:
262
landmarks_csv_to_osmand.py
Normal file
262
landmarks_csv_to_osmand.py
Normal file
@@ -0,0 +1,262 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import sys
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Iterable, Optional
|
||||
from xml.dom import minidom
|
||||
|
||||
OSMAND_NS = "https://osmand.net"
|
||||
ET.register_namespace("osmand", OSMAND_NS)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Landmark:
|
||||
lon: float
|
||||
lat: float
|
||||
name: str
|
||||
garmin_kind: str
|
||||
garmin_type: str
|
||||
garmin_subtype: str
|
||||
gpxsee_classes: list[str]
|
||||
semantic_tags: dict[str, str]
|
||||
source_file: str
|
||||
|
||||
|
||||
DEFAULT_GROUPS = {
|
||||
"water_sources": {"name": "Water sources", "color": "#1E88E5", "icon": "water_drop", "background": "circle"},
|
||||
"settlements": {"name": "Settlements", "color": "#43A047", "icon": "town", "background": "circle"},
|
||||
"summits": {"name": "Summits", "color": "#8E24AA", "icon": "special_star", "background": "circle"},
|
||||
"springs": {"name": "Springs", "color": "#00ACC1", "icon": "water_drop", "background": "circle"},
|
||||
"drinking_water": {"name": "Drinking water", "color": "#039BE5", "icon": "water_drop", "background": "circle"},
|
||||
"other": {"name": "Other landmarks", "color": "#FB8C00", "icon": "marker", "background": "circle"},
|
||||
}
|
||||
|
||||
|
||||
def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
|
||||
items: list[Landmark] = []
|
||||
for path in paths:
|
||||
with path.open("r", encoding="utf-8-sig", newline="") as f:
|
||||
reader = csv.DictReader(f)
|
||||
required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype", "gpxsee_classes_json", "semantic_tags_json"}
|
||||
missing = required - set(reader.fieldnames or [])
|
||||
if missing:
|
||||
raise ValueError(f"{path}: missing columns: {sorted(missing)}")
|
||||
for row in reader:
|
||||
try:
|
||||
lon = float(row["lon"])
|
||||
lat = float(row["lat"])
|
||||
except Exception:
|
||||
continue
|
||||
name = (row.get("name") or "").strip()
|
||||
try:
|
||||
gpxsee_classes = json.loads(row.get("gpxsee_classes_json") or "[]")
|
||||
except Exception:
|
||||
gpxsee_classes = []
|
||||
try:
|
||||
semantic_tags = json.loads(row.get("semantic_tags_json") or "{}")
|
||||
except Exception:
|
||||
semantic_tags = {}
|
||||
items.append(Landmark(
|
||||
lon=lon,
|
||||
lat=lat,
|
||||
name=name,
|
||||
garmin_kind=(row.get("garmin_kind") or "").strip(),
|
||||
garmin_type=(row.get("garmin_type") or "").strip().lower(),
|
||||
garmin_subtype=(row.get("garmin_subtype") or "").strip().lower(),
|
||||
gpxsee_classes=gpxsee_classes if isinstance(gpxsee_classes, list) else [],
|
||||
semantic_tags=semantic_tags if isinstance(semantic_tags, dict) else {},
|
||||
source_file=path.name,
|
||||
))
|
||||
return items
|
||||
|
||||
|
||||
def classify(item: Landmark) -> str:
|
||||
tags = item.semantic_tags
|
||||
if tags.get("amenity") == "drinking_water":
|
||||
return "drinking_water"
|
||||
if tags.get("natural") == "spring":
|
||||
return "springs"
|
||||
if tags.get("place"):
|
||||
return "settlements"
|
||||
if "summit" in item.gpxsee_classes or tags.get("natural") == "peak":
|
||||
return "summits"
|
||||
if tags.get("waterway") == "water_point" or tags.get("natural") == "water":
|
||||
return "water_sources"
|
||||
return "other"
|
||||
|
||||
|
||||
def dedupe(items: list[Landmark], precision: int = 6) -> list[Landmark]:
|
||||
seen: set[tuple] = set()
|
||||
out: list[Landmark] = []
|
||||
for it in items:
|
||||
key = (round(it.lon, precision), round(it.lat, precision), it.name, it.garmin_type, it.garmin_subtype)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(it)
|
||||
return out
|
||||
|
||||
|
||||
def write_gpx(items: list[Landmark], out_path: Path, by: str = "category") -> None:
|
||||
gpx = ET.Element("gpx", {
|
||||
"version": "1.1",
|
||||
"creator": "landmarks_csv_to_osmand.py",
|
||||
"xmlns": "http://www.topografix.com/GPX/1/1",
|
||||
})
|
||||
metadata = ET.SubElement(gpx, "metadata")
|
||||
ET.SubElement(metadata, "name").text = out_path.stem
|
||||
|
||||
groups_el = ET.SubElement(ET.SubElement(gpx, "extensions"), f"{{{OSMAND_NS}}}points_groups")
|
||||
|
||||
def group_key(it: Landmark) -> str:
|
||||
if by == "type":
|
||||
return f"{it.garmin_type}_{it.garmin_subtype}"
|
||||
return classify(it)
|
||||
|
||||
groups: dict[str, dict[str, str]] = {}
|
||||
for it in items:
|
||||
g = group_key(it)
|
||||
if g not in groups:
|
||||
if by == "type":
|
||||
groups[g] = {
|
||||
"name": g,
|
||||
"color": "#FB8C00",
|
||||
"icon": "marker",
|
||||
"background": "circle",
|
||||
}
|
||||
else:
|
||||
groups[g] = DEFAULT_GROUPS.get(g, DEFAULT_GROUPS["other"])
|
||||
|
||||
for key, cfg in sorted(groups.items()):
|
||||
ET.SubElement(groups_el, f"{{{OSMAND_NS}}}group", {
|
||||
"name": key,
|
||||
"color": cfg["color"],
|
||||
"icon": cfg["icon"],
|
||||
"background": cfg["background"],
|
||||
})
|
||||
|
||||
for it in items:
|
||||
wpt = ET.SubElement(gpx, "wpt", {"lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
|
||||
if it.name:
|
||||
ET.SubElement(wpt, "name").text = it.name
|
||||
ET.SubElement(wpt, "type").text = group_key(it)
|
||||
desc_parts = []
|
||||
if it.semantic_tags:
|
||||
desc_parts.append("semantic: " + json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True))
|
||||
desc_parts.append(f"garmin: kind={it.garmin_kind} type={it.garmin_type} subtype={it.garmin_subtype}")
|
||||
if it.source_file:
|
||||
desc_parts.append(f"source={it.source_file}")
|
||||
ET.SubElement(wpt, "desc").text = "\n".join(desc_parts)
|
||||
ext = ET.SubElement(wpt, "extensions")
|
||||
cfg = groups[group_key(it)]
|
||||
ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg["icon"]
|
||||
ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg["color"]
|
||||
ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg["background"]
|
||||
|
||||
xml_bytes = ET.tostring(gpx, encoding="utf-8")
|
||||
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
|
||||
out_path.write_bytes(pretty)
|
||||
|
||||
|
||||
def write_osm(items: list[Landmark], out_path: Path) -> None:
|
||||
osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand.py"})
|
||||
nid = -1
|
||||
for it in items:
|
||||
node = ET.SubElement(osm, "node", {"id": str(nid), "lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
|
||||
nid -= 1
|
||||
if it.name:
|
||||
ET.SubElement(node, "tag", {"k": "name", "v": it.name})
|
||||
for k, v in sorted(it.semantic_tags.items()):
|
||||
if v is None:
|
||||
continue
|
||||
ET.SubElement(node, "tag", {"k": str(k), "v": str(v)})
|
||||
ET.SubElement(node, "tag", {"k": "garmin:kind", "v": it.garmin_kind})
|
||||
ET.SubElement(node, "tag", {"k": "garmin:type", "v": it.garmin_type})
|
||||
ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": it.garmin_subtype})
|
||||
ET.SubElement(node, "tag", {"k": "source:file", "v": it.source_file})
|
||||
xml_bytes = ET.tostring(osm, encoding="utf-8")
|
||||
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
|
||||
out_path.write_bytes(pretty)
|
||||
|
||||
|
||||
def write_summary(items: list[Landmark], out_csv: Optional[Path], out_json: Optional[Path], by: str = "category") -> None:
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
for it in items:
|
||||
key = classify(it) if by == "category" else f"{it.garmin_type}_{it.garmin_subtype}"
|
||||
counts[key] += 1
|
||||
if out_csv:
|
||||
with out_csv.open("w", encoding="utf-8", newline="") as f:
|
||||
w = csv.writer(f)
|
||||
w.writerow(["group", "count"])
|
||||
for k, v in sorted(counts.items()):
|
||||
w.writerow([k, v])
|
||||
if out_json:
|
||||
out_json.write_text(json.dumps(dict(sorted(counts.items())), ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
|
||||
|
||||
def apply_filters(items: list[Landmark], category: Optional[str], filter_tag: list[str], named_only: bool) -> list[Landmark]:
|
||||
out = items
|
||||
if category:
|
||||
out = [it for it in out if classify(it) == category]
|
||||
for expr in filter_tag:
|
||||
if "=" not in expr:
|
||||
raise ValueError(f"invalid --filter-tag value: {expr!r}; expected key=value")
|
||||
k, v = expr.split("=", 1)
|
||||
out = [it for it in out if str(it.semantic_tags.get(k)) == v]
|
||||
if named_only:
|
||||
out = [it for it in out if it.name]
|
||||
return out
|
||||
|
||||
|
||||
def main(argv: Optional[list[str]] = None) -> int:
|
||||
ap = argparse.ArgumentParser(description="Convert landmark CSV exports into OsmAnd-friendly GPX overlay and/or OSM POI input.")
|
||||
ap.add_argument("csv", nargs="+", type=Path, help="Input landmark CSV files")
|
||||
ap.add_argument("--gpx", type=Path, help="Write OsmAnd-friendly GPX waypoint overlay")
|
||||
ap.add_argument("--osm", type=Path, help="Write OSM XML for OsmAndMapCreator generate-poi")
|
||||
ap.add_argument("--summary-csv", type=Path, help="Write category/type counts CSV")
|
||||
ap.add_argument("--summary-json", type=Path, help="Write category/type counts JSON")
|
||||
ap.add_argument("--group-by", choices=["category", "type"], default="category", help="How GPX waypoint groups should be organized")
|
||||
ap.add_argument("--summary-by", choices=["category", "type"], default="category")
|
||||
ap.add_argument("--category", help="Keep only one inferred category, e.g. water_sources, settlements, springs, drinking_water")
|
||||
ap.add_argument("--filter-tag", action="append", default=[], help="Keep only landmarks whose semantic tags contain key=value")
|
||||
ap.add_argument("--named-only", action="store_true", help="Keep only landmarks with non-empty names")
|
||||
ap.add_argument("--no-dedupe", action="store_true", help="Disable de-duplication")
|
||||
args = ap.parse_args(argv)
|
||||
|
||||
items = load_landmarks(args.csv)
|
||||
items = apply_filters(items, args.category, args.filter_tag, args.named_only)
|
||||
if not args.no_dedupe:
|
||||
items = dedupe(items)
|
||||
|
||||
if not any([args.gpx, args.osm, args.summary_csv, args.summary_json]):
|
||||
print(f"loaded {len(items)} landmarks", file=sys.stderr)
|
||||
return 0
|
||||
|
||||
if args.gpx:
|
||||
write_gpx(items, args.gpx, by=args.group_by)
|
||||
if args.osm:
|
||||
write_osm(items, args.osm)
|
||||
if args.summary_csv or args.summary_json:
|
||||
write_summary(items, args.summary_csv, args.summary_json, by=args.summary_by)
|
||||
|
||||
print(f"[info] kept {len(items)} landmarks")
|
||||
if args.gpx:
|
||||
print(f"[info] wrote GPX overlay: {args.gpx}")
|
||||
if args.osm:
|
||||
print(f"[info] wrote OSM XML: {args.osm}")
|
||||
if args.summary_csv:
|
||||
print(f"[info] wrote summary CSV: {args.summary_csv}")
|
||||
if args.summary_json:
|
||||
print(f"[info] wrote summary JSON: {args.summary_json}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user