diff --git a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py index b94b05a..d85e572 100644 --- a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py +++ b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py @@ -3,18 +3,23 @@ from __future__ import annotations import argparse import csv +import gzip import json +import math import sys import xml.etree.ElementTree as ET from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path -from typing import Iterable, Optional +from typing import Callable, Iterable, Optional from xml.dom import minidom OSMAND_NS = "https://osmand.net" +GPX_NS = "http://www.topografix.com/GPX/1/1" ET.register_namespace("osmand", OSMAND_NS) +EARTH_M_PER_DEG_LAT = 111_320.0 + @dataclass class Landmark: @@ -26,25 +31,79 @@ class Landmark: garmin_subtype: str gpxsee_classes: list[str] semantic_tags: dict[str, str] - source_file: str + source_files: list[str] = field(default_factory=list) + duplicate_count: int = 1 + duplicate_names: list[str] = field(default_factory=list) + duplicate_types: list[str] = field(default_factory=list) + duplicate_subtypes: list[str] = field(default_factory=list) + + def clone(self) -> "Landmark": + return Landmark( + lon=self.lon, + lat=self.lat, + name=self.name, + garmin_kind=self.garmin_kind, + garmin_type=self.garmin_type, + garmin_subtype=self.garmin_subtype, + gpxsee_classes=list(self.gpxsee_classes), + semantic_tags=dict(self.semantic_tags), + source_files=list(self.source_files), + duplicate_count=self.duplicate_count, + duplicate_names=list(self.duplicate_names), + duplicate_types=list(self.duplicate_types), + duplicate_subtypes=list(self.duplicate_subtypes), + ) + + +@dataclass +class GroupDefinition: + key: str + name: str + color: str + icon: str + background: str = "circle" + examples: list[str] = field(default_factory=list) + count: int = 0 DEFAULT_GROUPS = { - "water_sources": {"name": "Water sources", "color": "#1E88E5", "icon": "water_drop", "background": "circle"}, - "settlements": {"name": "Settlements", "color": "#43A047", "icon": "town", "background": "circle"}, - "summits": {"name": "Summits", "color": "#8E24AA", "icon": "special_star", "background": "circle"}, - "springs": {"name": "Springs", "color": "#00ACC1", "icon": "water_drop", "background": "circle"}, "drinking_water": {"name": "Drinking water", "color": "#039BE5", "icon": "water_drop", "background": "circle"}, + "springs": {"name": "Springs", "color": "#00ACC1", "icon": "water_drop", "background": "circle"}, + "water_sources": {"name": "Water sources", "color": "#1E88E5", "icon": "water_drop", "background": "circle"}, + "settlements_village": {"name": "Villages", "color": "#43A047", "icon": "town", "background": "circle"}, + "settlements_town": {"name": "Towns", "color": "#2E7D32", "icon": "town", "background": "circle"}, + "settlements_city": {"name": "Cities", "color": "#1B5E20", "icon": "city", "background": "circle"}, + "settlements_other": {"name": "Settlements", "color": "#43A047", "icon": "town", "background": "circle"}, + "summits": {"name": "Summits", "color": "#8E24AA", "icon": "special_star", "background": "circle"}, + "spots": {"name": "Elevation spots", "color": "#AB47BC", "icon": "special_star", "background": "circle"}, + "water_line": {"name": "Water lines", "color": "#1E88E5", "icon": "water_drop", "background": "circle"}, + "water_area": {"name": "Water areas", "color": "#42A5F5", "icon": "water_drop", "background": "circle"}, "other": {"name": "Other landmarks", "color": "#FB8C00", "icon": "marker", "background": "circle"}, } +def open_text_out(path: Path): + if str(path).lower().endswith(".gz"): + return gzip.open(path, "wt", encoding="utf-8", newline="") + return path.open("w", encoding="utf-8", newline="") + + +def write_xml(path: Path, root: ET.Element) -> None: + xml_bytes = ET.tostring(root, encoding="utf-8") + pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8") + if str(path).lower().endswith(".gz"): + with gzip.open(path, "wb") as f: + f.write(pretty) + else: + path.write_bytes(pretty) + + def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: items: list[Landmark] = [] for path in paths: with path.open("r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) - required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype", "gpxsee_classes_json", "semantic_tags_json"} + required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype"} missing = required - set(reader.fieldnames or []) if missing: raise ValueError(f"{path}: missing columns: {sorted(missing)}") @@ -54,7 +113,6 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: lat = float(row["lat"]) except Exception: continue - name = (row.get("name") or "").strip() try: gpxsee_classes = json.loads(row.get("gpxsee_classes_json") or "[]") except Exception: @@ -66,107 +124,369 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: items.append(Landmark( lon=lon, lat=lat, - name=name, + name=(row.get("name") or "").strip(), garmin_kind=(row.get("garmin_kind") or "").strip(), garmin_type=(row.get("garmin_type") or "").strip().lower(), garmin_subtype=(row.get("garmin_subtype") or "").strip().lower(), gpxsee_classes=gpxsee_classes if isinstance(gpxsee_classes, list) else [], semantic_tags=semantic_tags if isinstance(semantic_tags, dict) else {}, - source_file=path.name, + source_files=[path.name], + duplicate_names=[(row.get("name") or "").strip()] if (row.get("name") or "").strip() else [], + duplicate_types=[(row.get("garmin_type") or "").strip().lower()], + duplicate_subtypes=[(row.get("garmin_subtype") or "").strip().lower()], )) return items -def classify(item: Landmark) -> str: +# ---------------------------- +# Semantic inference / groups +# ---------------------------- + +def gpxsee_class_flags(item: Landmark) -> set[str]: + return {str(v).strip().lower() for v in item.gpxsee_classes if str(v).strip()} + + +def inferred_group_key(item: Landmark) -> str: tags = item.semantic_tags + place = str(tags.get("place") or "").strip().lower() + classes = gpxsee_class_flags(item) + if tags.get("amenity") == "drinking_water": return "drinking_water" if tags.get("natural") == "spring": return "springs" - if tags.get("place"): - return "settlements" - if "summit" in item.gpxsee_classes or tags.get("natural") == "peak": + if place == "village": + return "settlements_village" + if place in {"town", "municipality", "suburb"}: + return "settlements_town" + if place == "city": + return "settlements_city" + if place: + return "settlements_other" + if "summit" in classes or tags.get("natural") == "peak": return "summits" - if tags.get("waterway") == "water_point" or tags.get("natural") == "water": + if "spot" in classes: + return "spots" + if "water_line" in classes or tags.get("waterway"): + return "water_line" + if "water_area" in classes or tags.get("natural") == "water": + return "water_area" + if tags.get("waterway") == "water_point": return "water_sources" + if item.garmin_type == "0x64" and item.garmin_subtype == "0x14": + return "drinking_water" + if item.garmin_type == "0x65" and item.garmin_subtype == "0x11": + return "springs" return "other" -def dedupe(items: list[Landmark], precision: int = 6) -> list[Landmark]: - seen: set[tuple] = set() - out: list[Landmark] = [] +def auto_group_key(item: Landmark) -> str: + tags = item.semantic_tags + place = str(tags.get("place") or "").strip().lower() + classes = sorted(gpxsee_class_flags(item)) + + if tags.get("amenity") == "drinking_water": + return "amenity_drinking_water" + if tags.get("natural") == "spring": + return "natural_spring" + if tags.get("natural") == "peak": + return "natural_peak" + if place: + return f"place_{place}" + if tags.get("waterway"): + return f"waterway_{tags['waterway']}" + if tags.get("natural"): + return f"natural_{tags['natural']}" + if classes: + return f"gpxsee_{classes[0]}" + return f"garmin_{item.garmin_kind}_{item.garmin_type}_{item.garmin_subtype}" + + +def humanize_group_name(key: str) -> str: + if key in DEFAULT_GROUPS: + return DEFAULT_GROUPS[key]["name"] + raw = key.replace("garmin_", "Garmin ").replace("gpxsee_", "GPXSee ") + raw = raw.replace("_", " ").strip() + if not raw: + return "Landmarks" + return raw[:1].upper() + raw[1:] + + +def infer_group_style(key: str) -> dict[str, str]: + if key in DEFAULT_GROUPS: + return dict(DEFAULT_GROUPS[key]) + if "water" in key or "spring" in key: + return {"name": humanize_group_name(key), "color": "#1E88E5", "icon": "water_drop", "background": "circle"} + if key.startswith("place_"): + return {"name": humanize_group_name(key), "color": "#43A047", "icon": "town", "background": "circle"} + if "peak" in key or "summit" in key or "spot" in key: + return {"name": humanize_group_name(key), "color": "#8E24AA", "icon": "special_star", "background": "circle"} + return {"name": humanize_group_name(key), "color": "#FB8C00", "icon": "marker", "background": "circle"} + + +# ---------------------------- +# Dedupe +# ---------------------------- + +def meters_per_deg_lon(lat_deg: float) -> float: + return EARTH_M_PER_DEG_LAT * max(0.01, math.cos(math.radians(lat_deg))) + + +def coord_distance_m(a: Landmark, b: Landmark) -> float: + mean_lat = (a.lat + b.lat) / 2.0 + dx = (a.lon - b.lon) * meters_per_deg_lon(mean_lat) + dy = (a.lat - b.lat) * EARTH_M_PER_DEG_LAT + return math.hypot(dx, dy) + + +def score_landmark(it: Landmark) -> tuple: + semantic_weight = len([k for k, v in it.semantic_tags.items() if v not in (None, "")]) + return ( + 1 if it.name else 0, + semantic_weight, + len(gpxsee_class_flags(it)), + 1 if it.garmin_subtype not in {"", "0x00"} else 0, + len(set(it.source_files)), + -len(it.name), + ) + + +def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark: + best = primary if score_landmark(primary) >= score_landmark(other) else other + merged = best.clone() + merged.duplicate_count = primary.duplicate_count + other.duplicate_count + merged.source_files = sorted(set(primary.source_files + other.source_files)) + merged.duplicate_names = sorted({n for n in primary.duplicate_names + other.duplicate_names if n}) + merged.duplicate_types = sorted(set(primary.duplicate_types + other.duplicate_types)) + merged.duplicate_subtypes = sorted(set(primary.duplicate_subtypes + other.duplicate_subtypes)) + + # Prefer the richest semantic tag set, but merge missing keys from the other side. + richer = primary.semantic_tags if len(primary.semantic_tags) >= len(other.semantic_tags) else other.semantic_tags + poorer = other.semantic_tags if richer is primary.semantic_tags else primary.semantic_tags + merged.semantic_tags = dict(richer) + for k, v in poorer.items(): + if k not in merged.semantic_tags and v not in (None, ""): + merged.semantic_tags[k] = v + + merged.gpxsee_classes = sorted(set(primary.gpxsee_classes + other.gpxsee_classes)) + if not merged.name: + merged.name = primary.name or other.name + return merged + + +def dedupe_signature(it: Landmark, mode: str) -> tuple: + if mode == "coord_name": + return (it.name.casefold(),) + if mode == "coord_type": + return (it.garmin_kind, it.garmin_type, it.garmin_subtype) + if mode == "coord_name_type": + return (it.name.casefold(), it.garmin_kind, it.garmin_type, it.garmin_subtype) + return tuple() + + +def dedupe(items: list[Landmark], radius_m: float = 12.0, mode: str = "coord") -> list[Landmark]: + if mode == "off": + return list(items) + + clusters: list[Landmark] = [] + grid: dict[tuple[int, int], list[int]] = defaultdict(list) + for it in items: - key = (round(it.lon, precision), round(it.lat, precision), it.name, it.garmin_type, it.garmin_subtype) - if key in seen: - continue - seen.add(key) - out.append(it) + gx = int(math.floor(it.lon * meters_per_deg_lon(it.lat) / radius_m)) + gy = int(math.floor(it.lat * EARTH_M_PER_DEG_LAT / radius_m)) + extra_sig = dedupe_signature(it, mode) + matched_index: Optional[int] = None + + for nx in range(gx - 1, gx + 2): + for ny in range(gy - 1, gy + 2): + for idx in grid.get((nx, ny), []): + cand = clusters[idx] + if mode != "coord" and dedupe_signature(cand, mode) != extra_sig: + continue + if coord_distance_m(it, cand) <= radius_m: + matched_index = idx + break + if matched_index is not None: + break + if matched_index is not None: + break + + if matched_index is None: + clusters.append(it.clone()) + grid[(gx, gy)].append(len(clusters) - 1) + else: + clusters[matched_index] = merge_landmarks(clusters[matched_index], it) + + return clusters + + +# ---------------------------- +# Filtering and grouping +# ---------------------------- + +def apply_filters( + items: list[Landmark], + category: Optional[str], + filter_tag: list[str], + named_only: bool, + filter_kind: Optional[str], + filter_type: Optional[str], + filter_subtype: Optional[str], + gpxsee_class: Optional[str], +) -> list[Landmark]: + out = items + if category: + out = [it for it in out if inferred_group_key(it) == category] + for expr in filter_tag: + if "=" not in expr: + raise ValueError(f"invalid --filter-tag value: {expr!r}; expected key=value") + k, v = expr.split("=", 1) + out = [it for it in out if str(it.semantic_tags.get(k)) == v] + if named_only: + out = [it for it in out if it.name] + if filter_kind: + out = [it for it in out if it.garmin_kind == filter_kind] + if filter_type: + out = [it for it in out if it.garmin_type == filter_type.lower()] + if filter_subtype: + out = [it for it in out if it.garmin_subtype == filter_subtype.lower()] + if gpxsee_class: + target = gpxsee_class.strip().lower() + out = [it for it in out if target in gpxsee_class_flags(it)] return out -def write_gpx(items: list[Landmark], out_path: Path, by: str = "category") -> None: +def make_group_key(it: Landmark, mode: str) -> str: + if mode == "category": + return inferred_group_key(it) + if mode == "type": + return f"{it.garmin_type}_{it.garmin_subtype}" + if mode == "source": + return Path(it.source_files[0]).stem if it.source_files else "unknown_source" + return auto_group_key(it) + + +def sample_label(it: Landmark) -> str: + parts = [] + if it.name: + parts.append(it.name) + parts.append(f"{it.lon:.5f},{it.lat:.5f}") + parts.append(f"{it.garmin_type}/{it.garmin_subtype}") + if it.semantic_tags: + cleaned = {k: v for k, v in it.semantic_tags.items() if k != "name"} + if cleaned: + parts.append(json.dumps(cleaned, ensure_ascii=False, sort_keys=True)) + return " | ".join(parts) + + +def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> dict[str, GroupDefinition]: + grouped: dict[str, list[Landmark]] = defaultdict(list) + for it in items: + grouped[make_group_key(it, mode)].append(it) + + result: dict[str, GroupDefinition] = {} + for key, group_items in sorted(grouped.items()): + style = infer_group_style(key) + examples = [] + seen = set() + for it in group_items: + lbl = sample_label(it) + if lbl in seen: + continue + seen.add(lbl) + examples.append(lbl) + if len(examples) >= example_count: + break + result[key] = GroupDefinition( + key=key, + name=style["name"], + color=style["color"], + icon=style["icon"], + background=style["background"], + examples=examples, + count=len(group_items), + ) + return result + + +def interactive_rename_groups(groups: dict[str, GroupDefinition], enabled: bool, examples: int) -> dict[str, GroupDefinition]: + if not enabled: + return groups + if not sys.stdin.isatty(): + print("[warn] --interactive-group-names requested, but stdin is not interactive; keeping automatic names", file=sys.stderr) + return groups + + print("\nInteractive group naming\n", file=sys.stderr) + print("Press Enter to keep the proposed name. Type a new name to rename the group. Type ! to keep all remaining names.\n", file=sys.stderr) + keep_all = False + for key in sorted(groups): + group = groups[key] + if keep_all: + continue + print(f"Group key: {group.key}", file=sys.stderr) + print(f"Proposed name: {group.name}", file=sys.stderr) + print(f"Items: {group.count}", file=sys.stderr) + if examples > 0 and group.examples: + print("Examples:", file=sys.stderr) + for ex in group.examples[:examples]: + print(f" - {ex}", file=sys.stderr) + reply = input("Rename? [Enter keep / ! keep all / custom text]: ").strip() + if reply == "!": + keep_all = True + elif reply: + group.name = reply + print(file=sys.stderr) + return groups + + +# ---------------------------- +# Writers +# ---------------------------- + +def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_path: Path, by: str = "auto") -> None: gpx = ET.Element("gpx", { "version": "1.1", - "creator": "landmarks_csv_to_osmand.py", - "xmlns": "http://www.topografix.com/GPX/1/1", + "creator": "landmarks_csv_to_osmand_v2.py", + "xmlns": GPX_NS, }) metadata = ET.SubElement(gpx, "metadata") ET.SubElement(metadata, "name").text = out_path.stem groups_el = ET.SubElement(ET.SubElement(gpx, "extensions"), f"{{{OSMAND_NS}}}points_groups") - - def group_key(it: Landmark) -> str: - if by == "type": - return f"{it.garmin_type}_{it.garmin_subtype}" - return classify(it) - - groups: dict[str, dict[str, str]] = {} - for it in items: - g = group_key(it) - if g not in groups: - if by == "type": - groups[g] = { - "name": g, - "color": "#FB8C00", - "icon": "marker", - "background": "circle", - } - else: - groups[g] = DEFAULT_GROUPS.get(g, DEFAULT_GROUPS["other"]) - - for key, cfg in sorted(groups.items()): + for key in sorted(groups): + cfg = groups[key] ET.SubElement(groups_el, f"{{{OSMAND_NS}}}group", { - "name": key, - "color": cfg["color"], - "icon": cfg["icon"], - "background": cfg["background"], + "name": cfg.name, + "color": cfg.color, + "icon": cfg.icon, + "background": cfg.background, }) for it in items: + gkey = make_group_key(it, by) + cfg = groups[gkey] wpt = ET.SubElement(gpx, "wpt", {"lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"}) if it.name: ET.SubElement(wpt, "name").text = it.name - ET.SubElement(wpt, "type").text = group_key(it) + ET.SubElement(wpt, "type").text = cfg.name desc_parts = [] if it.semantic_tags: desc_parts.append("semantic: " + json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True)) desc_parts.append(f"garmin: kind={it.garmin_kind} type={it.garmin_type} subtype={it.garmin_subtype}") - if it.source_file: - desc_parts.append(f"source={it.source_file}") + if it.duplicate_count > 1: + desc_parts.append(f"dedupe: merged {it.duplicate_count} records") + if it.source_files: + desc_parts.append("sources=" + ", ".join(it.source_files)) ET.SubElement(wpt, "desc").text = "\n".join(desc_parts) ext = ET.SubElement(wpt, "extensions") - cfg = groups[group_key(it)] - ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg["icon"] - ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg["color"] - ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg["background"] - - xml_bytes = ET.tostring(gpx, encoding="utf-8") - pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8") - out_path.write_bytes(pretty) + ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg.icon + ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg.color + ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg.background + write_xml(out_path, gpx) def write_osm(items: list[Landmark], out_path: Path) -> None: - osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand.py"}) + osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v2.py"}) nid = -1 for it in items: node = ET.SubElement(osm, "node", {"id": str(nid), "lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"}) @@ -180,73 +500,106 @@ def write_osm(items: list[Landmark], out_path: Path) -> None: ET.SubElement(node, "tag", {"k": "garmin:kind", "v": it.garmin_kind}) ET.SubElement(node, "tag", {"k": "garmin:type", "v": it.garmin_type}) ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": it.garmin_subtype}) - ET.SubElement(node, "tag", {"k": "source:file", "v": it.source_file}) - xml_bytes = ET.tostring(osm, encoding="utf-8") - pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8") - out_path.write_bytes(pretty) + if it.duplicate_count > 1: + ET.SubElement(node, "tag", {"k": "source:merge_count", "v": str(it.duplicate_count)}) + if it.source_files: + ET.SubElement(node, "tag", {"k": "source:file", "v": ",".join(it.source_files)}) + write_xml(out_path, osm) -def write_summary(items: list[Landmark], out_csv: Optional[Path], out_json: Optional[Path], by: str = "category") -> None: +def write_summary(items: list[Landmark], groups: dict[str, GroupDefinition], out_csv: Optional[Path], out_json: Optional[Path], by: str = "auto") -> None: counts: dict[str, int] = defaultdict(int) for it in items: - key = classify(it) if by == "category" else f"{it.garmin_type}_{it.garmin_subtype}" - counts[key] += 1 + counts[make_group_key(it, by)] += 1 if out_csv: - with out_csv.open("w", encoding="utf-8", newline="") as f: + with open_text_out(out_csv) as f: w = csv.writer(f) - w.writerow(["group", "count"]) - for k, v in sorted(counts.items()): - w.writerow([k, v]) + w.writerow(["group_key", "group_name", "count", "examples"]) + for key, value in sorted(counts.items()): + group = groups[key] + w.writerow([key, group.name, value, " || ".join(group.examples)]) if out_json: - out_json.write_text(json.dumps(dict(sorted(counts.items())), ensure_ascii=False, indent=2), encoding="utf-8") + payload = { + key: { + "name": groups[key].name, + "count": value, + "examples": groups[key].examples, + "icon": groups[key].icon, + "color": groups[key].color, + } + for key, value in sorted(counts.items()) + } + if str(out_json).lower().endswith(".gz"): + with gzip.open(out_json, "wt", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + else: + out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") -def apply_filters(items: list[Landmark], category: Optional[str], filter_tag: list[str], named_only: bool) -> list[Landmark]: - out = items - if category: - out = [it for it in out if classify(it) == category] - for expr in filter_tag: - if "=" not in expr: - raise ValueError(f"invalid --filter-tag value: {expr!r}; expected key=value") - k, v = expr.split("=", 1) - out = [it for it in out if str(it.semantic_tags.get(k)) == v] - if named_only: - out = [it for it in out if it.name] - return out +def print_groups(groups: dict[str, GroupDefinition]) -> None: + for key in sorted(groups): + group = groups[key] + print(f"{key}\t{group.count}\t{group.name}") + for ex in group.examples: + print(f" - {ex}") def main(argv: Optional[list[str]] = None) -> int: - ap = argparse.ArgumentParser(description="Convert landmark CSV exports into OsmAnd-friendly GPX overlay and/or OSM POI input.") + ap = argparse.ArgumentParser( + description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping and modular coordinate-based dedupe." + ) ap.add_argument("csv", nargs="+", type=Path, help="Input landmark CSV files") ap.add_argument("--gpx", type=Path, help="Write OsmAnd-friendly GPX waypoint overlay") ap.add_argument("--osm", type=Path, help="Write OSM XML for OsmAndMapCreator generate-poi") - ap.add_argument("--summary-csv", type=Path, help="Write category/type counts CSV") - ap.add_argument("--summary-json", type=Path, help="Write category/type counts JSON") - ap.add_argument("--group-by", choices=["category", "type"], default="category", help="How GPX waypoint groups should be organized") - ap.add_argument("--summary-by", choices=["category", "type"], default="category") - ap.add_argument("--category", help="Keep only one inferred category, e.g. water_sources, settlements, springs, drinking_water") + ap.add_argument("--summary-csv", type=Path, help="Write group counts CSV") + ap.add_argument("--summary-json", type=Path, help="Write group counts JSON") + ap.add_argument("--group-by", choices=["auto", "category", "type", "source"], default="auto", help="How GPX waypoint groups should be organized") + ap.add_argument("--show-groups", action="store_true", help="Print detected groups with a few examples") + ap.add_argument("--interactive-group-names", action="store_true", help="Interactively rename auto-detected groups") + ap.add_argument("--group-examples", type=int, default=3, help="How many example rows to show for each group") + ap.add_argument("--category", help="Keep only one inferred category, e.g. drinking_water, springs, settlements_village") ap.add_argument("--filter-tag", action="append", default=[], help="Keep only landmarks whose semantic tags contain key=value") + ap.add_argument("--filter-kind", help="Keep only one Garmin kind, e.g. point") + ap.add_argument("--filter-type", help="Keep only one Garmin type, e.g. 0x64") + ap.add_argument("--filter-subtype", help="Keep only one Garmin subtype, e.g. 0x14") + ap.add_argument("--gpxsee-class", help="Keep only landmarks with the given GPXSee class") ap.add_argument("--named-only", action="store_true", help="Keep only landmarks with non-empty names") - ap.add_argument("--no-dedupe", action="store_true", help="Disable de-duplication") + ap.add_argument("--dedupe-mode", choices=["coord", "coord_name", "coord_type", "coord_name_type", "off"], default="coord", help="Dedupe strategy; default is coordinate-based only") + ap.add_argument("--dedupe-radius-m", type=float, default=12.0, help="Coordinate dedupe radius in meters") args = ap.parse_args(argv) items = load_landmarks(args.csv) - items = apply_filters(items, args.category, args.filter_tag, args.named_only) - if not args.no_dedupe: - items = dedupe(items) + items = apply_filters( + items, + args.category, + args.filter_tag, + args.named_only, + args.filter_kind, + args.filter_type, + args.filter_subtype, + args.gpxsee_class, + ) + items = dedupe(items, radius_m=args.dedupe_radius_m, mode=args.dedupe_mode) - if not any([args.gpx, args.osm, args.summary_csv, args.summary_json]): + groups = build_groups(items, args.group_by, example_count=max(0, args.group_examples)) + groups = interactive_rename_groups(groups, args.interactive_group_names, args.group_examples) + + if args.show_groups: + print_groups(groups) + + if not any([args.gpx, args.osm, args.summary_csv, args.summary_json, args.show_groups]): print(f"loaded {len(items)} landmarks", file=sys.stderr) return 0 if args.gpx: - write_gpx(items, args.gpx, by=args.group_by) + write_gpx(items, groups, args.gpx, by=args.group_by) if args.osm: write_osm(items, args.osm) if args.summary_csv or args.summary_json: - write_summary(items, args.summary_csv, args.summary_json, by=args.summary_by) + write_summary(items, groups, args.summary_csv, args.summary_json, by=args.group_by) print(f"[info] kept {len(items)} landmarks") + print(f"[info] dedupe mode={args.dedupe_mode} radius={args.dedupe_radius_m:g}m") if args.gpx: print(f"[info] wrote GPX overlay: {args.gpx}") if args.osm: diff --git a/stage-2-parse-stage-1/readme.md b/stage-2-parse-stage-1/readme.md index c25b3cf..0c13b0b 100644 --- a/stage-2-parse-stage-1/readme.md +++ b/stage-2-parse-stage-1/readme.md @@ -1,122 +1,89 @@ -You have **two good OsmAnd targets** now. +Done. -The fast, practical one is **GPX waypoint overlay**. OsmAnd supports GPX import, waypoint icons/colors, and **waypoint grouping** through OsmAnd-specific GPX extensions, so this is the closest match to a toggleable landmark layer without fighting the full map compiler. Favorites are also stored/imported as GPX waypoints, which confirms GPX is a native path for point overlays. ([OsmAnd][1]) +Updated script: +[landmarks_csv_to_osmand_v2.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v2.py) -The more native/searchable one is **POI-only OBF**. OsmAndMapCreator officially supports `generate-poi` separately from `generate-obf`, and it accepts OSM-family input such as `.osm`, `.osm.gz`, `.osm.bz2`, and `.pbf`. That makes it the right target when you want searchable/filterable POIs but do **not** want the routing stage that is currently crashing. ([OsmAnd][2]) +What changed: -I packaged both paths into a converter: +* default dedupe is now **coordinate-based only** +* dedupe is modular: -[landmarks_csv_to_osmand.py](sandbox:/mnt/data/landmarks_csv_to_osmand.py) + * `coord` + * `coord_name` + * `coord_type` + * `coord_name_type` + * `off` +* dedupe uses a meter radius instead of naive decimal rounding +* duplicates are **merged**, not just dropped +* merged records keep: -Sample outputs: + * best name + * richer semantic tags + * union of GPXSee classes + * source file list + * duplicate count +* grouping is improved: -* [sample GPX overlay](sandbox:/mnt/data/water_sources_02335140.gpx) -* [sample OSM for POI build](sandbox:/mnt/data/water_sources_02335140.osm) + * `--group-by auto` + * `--group-by category` + * `--group-by type` + * `--group-by source` +* automatic grouping now distinguishes things like: -### What I recommend + * `amenity_drinking_water` + * `natural_spring` + * `place_village` + * fallback Garmin or GPXSee groups +* interactive group naming added: -For a “BGMountains-like thing I can switch on and off,” use **GPX overlays** first. -For a “native searchable POI pack,” use **POI-only OBF** second. + * `--interactive-group-names` + * shows sample examples from each detected group + * lets you keep or rename the proposed group name +* loader is now tolerant of both: -### Path 1: GPX overlay layer + * the raw landmark CSVs + * the slimmer per-category CSVs you are iterating through +* `.gpx.gz`, `.osm.gz`, `.json.gz`, `.csv.gz` now actually get gzip-written correctly -This is the quickest route. +Useful commands: -It converts your landmark CSVs into a GPX file with: +Show detected groups with examples: -* one waypoint per landmark -* grouped categories using `` -* OsmAnd waypoint group styling via `osmand:points_groups` -* icons/colors per category - -Build it like this: - -```bash id="14483" -python landmarks_csv_to_osmand.py *.csv --gpx bg_landmarks.gpx --group-by category +```bash +python landmarks_csv_to_osmand_v2.py *.csv --show-groups ``` -For just water sources: +Interactive rename flow: -```bash id="52386" -python landmarks_csv_to_osmand.py *.csv --category water_sources --gpx bg_water_sources.gpx +```bash +python landmarks_csv_to_osmand_v2.py *.csv --show-groups --interactive-group-names --group-by auto --gpx landmarks.gpx ``` -Then import that GPX into OsmAnd. GPX waypoints and waypoint groups are supported officially, including icon/color/background customization. ([OsmAnd][1]) +Tighter coordinate dedupe: -**Best practice for toggling:** make **one GPX file per theme**: - -* `water_sources.gpx` -* `villages.gpx` -* `summits.gpx` -* `springs.gpx` - -That gives you clean on/off control by file instead of stuffing everything into one blob. - -### Path 2: native POI pack for OsmAnd - -This is better when you want it to behave more like a built-in searchable POI dataset. - -First convert CSVs to OSM XML: - -```bash id="51512" -python landmarks_csv_to_osmand.py *.csv --category water_sources --osm water_sources.osm +```bash +python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord --dedupe-radius-m 6 --gpx landmarks.gpx ``` -Then build a POI-only OBF: +Coordinate dedupe, but only merge when type also matches: -```bash id="68855" -java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^ - net.osmand.MainUtilities generate-poi C:\path\to\water_sources.osm +```bash +python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord_type --dedupe-radius-m 10 --gpx landmarks.gpx ``` -That uses OsmAndMapCreator’s documented `generate-poi` mode and avoids the routing stage entirely. ([OsmAnd][2]) +Water-only overlay: -### Which one should you choose? - -Use **GPX** when you want: - -* fast import -* easy toggle on/off -* visual overlay behavior -* category grouping with custom icons/colors - -Use **POI-only OBF** when you want: - -* more native OsmAnd integration -* better POI-style searching/indexing -* a cleaner long-term pack - -### Important limitation - -For now, your landmark CSV path is strongest for **exact-coordinate point features**. That is why it works well for springs, drinking water, villages, summits, and similar point landmarks. It is not the same as a full vector map overlay with arbitrary line/polygon filtering. - -### Ready-to-use commands - -All CSVs into one grouped GPX: - -```bash id="40684" -python landmarks_csv_to_osmand.py *.csv --gpx all_landmarks.gpx --group-by category --summary-json all_landmarks_summary.json +```bash +python landmarks_csv_to_osmand_v2.py *.csv --filter-tag amenity=drinking_water --gpx drinking_water.gpx ``` -Only water sources into GPX: +Springs-only overlay: -```bash id="33043" -python landmarks_csv_to_osmand.py *.csv --category water_sources --gpx water_sources.gpx +```bash +python landmarks_csv_to_osmand_v2.py *.csv --filter-tag natural=spring --gpx springs.gpx ``` -Only water sources into OSM for POI build: +If you want the next revision, I’d push it further in two directions: -```bash id="57957" -python landmarks_csv_to_osmand.py *.csv --category water_sources --osm water_sources.osm -``` - -Only named villages into GPX: - -```bash id="12262" -python landmarks_csv_to_osmand.py *.csv --filter-tag place=village --named-only --gpx villages.gpx -``` - -The cleanest deployment pattern is: **one GPX per category for toggles, plus one POI-only OBF per high-value category for native search**. - -[1]: https://osmand.net/docs/technical/osmand-file-formats/osmand-gpx/?utm_source=chatgpt.com "OsmAnd GPX | OsmAnd" -[2]: https://osmand.net/docs/technical/map-creation/create-offline-maps-yourself/?utm_source=chatgpt.com "Create Offline Raster & Vector Maps | OsmAnd" +* add **dedupe reports** showing which rows got merged into which canonical landmark +* add **interactive split/merge controls** for ambiguous groups, not just rename controls