This commit is contained in:
nq
2026-04-14 16:48:13 -07:00
parent 03b42ce8a5
commit 07521af1ed
2 changed files with 513 additions and 193 deletions

View File

@@ -3,18 +3,23 @@ from __future__ import annotations
import argparse
import csv
import gzip
import json
import math
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Optional
from typing import Callable, Iterable, Optional
from xml.dom import minidom
OSMAND_NS = "https://osmand.net"
GPX_NS = "http://www.topografix.com/GPX/1/1"
ET.register_namespace("osmand", OSMAND_NS)
EARTH_M_PER_DEG_LAT = 111_320.0
@dataclass
class Landmark:
@@ -26,25 +31,79 @@ class Landmark:
garmin_subtype: str
gpxsee_classes: list[str]
semantic_tags: dict[str, str]
source_file: str
source_files: list[str] = field(default_factory=list)
duplicate_count: int = 1
duplicate_names: list[str] = field(default_factory=list)
duplicate_types: list[str] = field(default_factory=list)
duplicate_subtypes: list[str] = field(default_factory=list)
def clone(self) -> "Landmark":
return Landmark(
lon=self.lon,
lat=self.lat,
name=self.name,
garmin_kind=self.garmin_kind,
garmin_type=self.garmin_type,
garmin_subtype=self.garmin_subtype,
gpxsee_classes=list(self.gpxsee_classes),
semantic_tags=dict(self.semantic_tags),
source_files=list(self.source_files),
duplicate_count=self.duplicate_count,
duplicate_names=list(self.duplicate_names),
duplicate_types=list(self.duplicate_types),
duplicate_subtypes=list(self.duplicate_subtypes),
)
@dataclass
class GroupDefinition:
key: str
name: str
color: str
icon: str
background: str = "circle"
examples: list[str] = field(default_factory=list)
count: int = 0
DEFAULT_GROUPS = {
"water_sources": {"name": "Water sources", "color": "#1E88E5", "icon": "water_drop", "background": "circle"},
"settlements": {"name": "Settlements", "color": "#43A047", "icon": "town", "background": "circle"},
"summits": {"name": "Summits", "color": "#8E24AA", "icon": "special_star", "background": "circle"},
"springs": {"name": "Springs", "color": "#00ACC1", "icon": "water_drop", "background": "circle"},
"drinking_water": {"name": "Drinking water", "color": "#039BE5", "icon": "water_drop", "background": "circle"},
"springs": {"name": "Springs", "color": "#00ACC1", "icon": "water_drop", "background": "circle"},
"water_sources": {"name": "Water sources", "color": "#1E88E5", "icon": "water_drop", "background": "circle"},
"settlements_village": {"name": "Villages", "color": "#43A047", "icon": "town", "background": "circle"},
"settlements_town": {"name": "Towns", "color": "#2E7D32", "icon": "town", "background": "circle"},
"settlements_city": {"name": "Cities", "color": "#1B5E20", "icon": "city", "background": "circle"},
"settlements_other": {"name": "Settlements", "color": "#43A047", "icon": "town", "background": "circle"},
"summits": {"name": "Summits", "color": "#8E24AA", "icon": "special_star", "background": "circle"},
"spots": {"name": "Elevation spots", "color": "#AB47BC", "icon": "special_star", "background": "circle"},
"water_line": {"name": "Water lines", "color": "#1E88E5", "icon": "water_drop", "background": "circle"},
"water_area": {"name": "Water areas", "color": "#42A5F5", "icon": "water_drop", "background": "circle"},
"other": {"name": "Other landmarks", "color": "#FB8C00", "icon": "marker", "background": "circle"},
}
def open_text_out(path: Path):
if str(path).lower().endswith(".gz"):
return gzip.open(path, "wt", encoding="utf-8", newline="")
return path.open("w", encoding="utf-8", newline="")
def write_xml(path: Path, root: ET.Element) -> None:
xml_bytes = ET.tostring(root, encoding="utf-8")
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
if str(path).lower().endswith(".gz"):
with gzip.open(path, "wb") as f:
f.write(pretty)
else:
path.write_bytes(pretty)
def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
items: list[Landmark] = []
for path in paths:
with path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype", "gpxsee_classes_json", "semantic_tags_json"}
required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype"}
missing = required - set(reader.fieldnames or [])
if missing:
raise ValueError(f"{path}: missing columns: {sorted(missing)}")
@@ -54,7 +113,6 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
lat = float(row["lat"])
except Exception:
continue
name = (row.get("name") or "").strip()
try:
gpxsee_classes = json.loads(row.get("gpxsee_classes_json") or "[]")
except Exception:
@@ -66,107 +124,369 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
items.append(Landmark(
lon=lon,
lat=lat,
name=name,
name=(row.get("name") or "").strip(),
garmin_kind=(row.get("garmin_kind") or "").strip(),
garmin_type=(row.get("garmin_type") or "").strip().lower(),
garmin_subtype=(row.get("garmin_subtype") or "").strip().lower(),
gpxsee_classes=gpxsee_classes if isinstance(gpxsee_classes, list) else [],
semantic_tags=semantic_tags if isinstance(semantic_tags, dict) else {},
source_file=path.name,
source_files=[path.name],
duplicate_names=[(row.get("name") or "").strip()] if (row.get("name") or "").strip() else [],
duplicate_types=[(row.get("garmin_type") or "").strip().lower()],
duplicate_subtypes=[(row.get("garmin_subtype") or "").strip().lower()],
))
return items
def classify(item: Landmark) -> str:
# ----------------------------
# Semantic inference / groups
# ----------------------------
def gpxsee_class_flags(item: Landmark) -> set[str]:
return {str(v).strip().lower() for v in item.gpxsee_classes if str(v).strip()}
def inferred_group_key(item: Landmark) -> str:
tags = item.semantic_tags
place = str(tags.get("place") or "").strip().lower()
classes = gpxsee_class_flags(item)
if tags.get("amenity") == "drinking_water":
return "drinking_water"
if tags.get("natural") == "spring":
return "springs"
if tags.get("place"):
return "settlements"
if "summit" in item.gpxsee_classes or tags.get("natural") == "peak":
if place == "village":
return "settlements_village"
if place in {"town", "municipality", "suburb"}:
return "settlements_town"
if place == "city":
return "settlements_city"
if place:
return "settlements_other"
if "summit" in classes or tags.get("natural") == "peak":
return "summits"
if tags.get("waterway") == "water_point" or tags.get("natural") == "water":
if "spot" in classes:
return "spots"
if "water_line" in classes or tags.get("waterway"):
return "water_line"
if "water_area" in classes or tags.get("natural") == "water":
return "water_area"
if tags.get("waterway") == "water_point":
return "water_sources"
if item.garmin_type == "0x64" and item.garmin_subtype == "0x14":
return "drinking_water"
if item.garmin_type == "0x65" and item.garmin_subtype == "0x11":
return "springs"
return "other"
def dedupe(items: list[Landmark], precision: int = 6) -> list[Landmark]:
seen: set[tuple] = set()
out: list[Landmark] = []
def auto_group_key(item: Landmark) -> str:
tags = item.semantic_tags
place = str(tags.get("place") or "").strip().lower()
classes = sorted(gpxsee_class_flags(item))
if tags.get("amenity") == "drinking_water":
return "amenity_drinking_water"
if tags.get("natural") == "spring":
return "natural_spring"
if tags.get("natural") == "peak":
return "natural_peak"
if place:
return f"place_{place}"
if tags.get("waterway"):
return f"waterway_{tags['waterway']}"
if tags.get("natural"):
return f"natural_{tags['natural']}"
if classes:
return f"gpxsee_{classes[0]}"
return f"garmin_{item.garmin_kind}_{item.garmin_type}_{item.garmin_subtype}"
def humanize_group_name(key: str) -> str:
if key in DEFAULT_GROUPS:
return DEFAULT_GROUPS[key]["name"]
raw = key.replace("garmin_", "Garmin ").replace("gpxsee_", "GPXSee ")
raw = raw.replace("_", " ").strip()
if not raw:
return "Landmarks"
return raw[:1].upper() + raw[1:]
def infer_group_style(key: str) -> dict[str, str]:
if key in DEFAULT_GROUPS:
return dict(DEFAULT_GROUPS[key])
if "water" in key or "spring" in key:
return {"name": humanize_group_name(key), "color": "#1E88E5", "icon": "water_drop", "background": "circle"}
if key.startswith("place_"):
return {"name": humanize_group_name(key), "color": "#43A047", "icon": "town", "background": "circle"}
if "peak" in key or "summit" in key or "spot" in key:
return {"name": humanize_group_name(key), "color": "#8E24AA", "icon": "special_star", "background": "circle"}
return {"name": humanize_group_name(key), "color": "#FB8C00", "icon": "marker", "background": "circle"}
# ----------------------------
# Dedupe
# ----------------------------
def meters_per_deg_lon(lat_deg: float) -> float:
return EARTH_M_PER_DEG_LAT * max(0.01, math.cos(math.radians(lat_deg)))
def coord_distance_m(a: Landmark, b: Landmark) -> float:
mean_lat = (a.lat + b.lat) / 2.0
dx = (a.lon - b.lon) * meters_per_deg_lon(mean_lat)
dy = (a.lat - b.lat) * EARTH_M_PER_DEG_LAT
return math.hypot(dx, dy)
def score_landmark(it: Landmark) -> tuple:
semantic_weight = len([k for k, v in it.semantic_tags.items() if v not in (None, "")])
return (
1 if it.name else 0,
semantic_weight,
len(gpxsee_class_flags(it)),
1 if it.garmin_subtype not in {"", "0x00"} else 0,
len(set(it.source_files)),
-len(it.name),
)
def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark:
best = primary if score_landmark(primary) >= score_landmark(other) else other
merged = best.clone()
merged.duplicate_count = primary.duplicate_count + other.duplicate_count
merged.source_files = sorted(set(primary.source_files + other.source_files))
merged.duplicate_names = sorted({n for n in primary.duplicate_names + other.duplicate_names if n})
merged.duplicate_types = sorted(set(primary.duplicate_types + other.duplicate_types))
merged.duplicate_subtypes = sorted(set(primary.duplicate_subtypes + other.duplicate_subtypes))
# Prefer the richest semantic tag set, but merge missing keys from the other side.
richer = primary.semantic_tags if len(primary.semantic_tags) >= len(other.semantic_tags) else other.semantic_tags
poorer = other.semantic_tags if richer is primary.semantic_tags else primary.semantic_tags
merged.semantic_tags = dict(richer)
for k, v in poorer.items():
if k not in merged.semantic_tags and v not in (None, ""):
merged.semantic_tags[k] = v
merged.gpxsee_classes = sorted(set(primary.gpxsee_classes + other.gpxsee_classes))
if not merged.name:
merged.name = primary.name or other.name
return merged
def dedupe_signature(it: Landmark, mode: str) -> tuple:
if mode == "coord_name":
return (it.name.casefold(),)
if mode == "coord_type":
return (it.garmin_kind, it.garmin_type, it.garmin_subtype)
if mode == "coord_name_type":
return (it.name.casefold(), it.garmin_kind, it.garmin_type, it.garmin_subtype)
return tuple()
def dedupe(items: list[Landmark], radius_m: float = 12.0, mode: str = "coord") -> list[Landmark]:
if mode == "off":
return list(items)
clusters: list[Landmark] = []
grid: dict[tuple[int, int], list[int]] = defaultdict(list)
for it in items:
key = (round(it.lon, precision), round(it.lat, precision), it.name, it.garmin_type, it.garmin_subtype)
if key in seen:
continue
seen.add(key)
out.append(it)
gx = int(math.floor(it.lon * meters_per_deg_lon(it.lat) / radius_m))
gy = int(math.floor(it.lat * EARTH_M_PER_DEG_LAT / radius_m))
extra_sig = dedupe_signature(it, mode)
matched_index: Optional[int] = None
for nx in range(gx - 1, gx + 2):
for ny in range(gy - 1, gy + 2):
for idx in grid.get((nx, ny), []):
cand = clusters[idx]
if mode != "coord" and dedupe_signature(cand, mode) != extra_sig:
continue
if coord_distance_m(it, cand) <= radius_m:
matched_index = idx
break
if matched_index is not None:
break
if matched_index is not None:
break
if matched_index is None:
clusters.append(it.clone())
grid[(gx, gy)].append(len(clusters) - 1)
else:
clusters[matched_index] = merge_landmarks(clusters[matched_index], it)
return clusters
# ----------------------------
# Filtering and grouping
# ----------------------------
def apply_filters(
items: list[Landmark],
category: Optional[str],
filter_tag: list[str],
named_only: bool,
filter_kind: Optional[str],
filter_type: Optional[str],
filter_subtype: Optional[str],
gpxsee_class: Optional[str],
) -> list[Landmark]:
out = items
if category:
out = [it for it in out if inferred_group_key(it) == category]
for expr in filter_tag:
if "=" not in expr:
raise ValueError(f"invalid --filter-tag value: {expr!r}; expected key=value")
k, v = expr.split("=", 1)
out = [it for it in out if str(it.semantic_tags.get(k)) == v]
if named_only:
out = [it for it in out if it.name]
if filter_kind:
out = [it for it in out if it.garmin_kind == filter_kind]
if filter_type:
out = [it for it in out if it.garmin_type == filter_type.lower()]
if filter_subtype:
out = [it for it in out if it.garmin_subtype == filter_subtype.lower()]
if gpxsee_class:
target = gpxsee_class.strip().lower()
out = [it for it in out if target in gpxsee_class_flags(it)]
return out
def write_gpx(items: list[Landmark], out_path: Path, by: str = "category") -> None:
def make_group_key(it: Landmark, mode: str) -> str:
if mode == "category":
return inferred_group_key(it)
if mode == "type":
return f"{it.garmin_type}_{it.garmin_subtype}"
if mode == "source":
return Path(it.source_files[0]).stem if it.source_files else "unknown_source"
return auto_group_key(it)
def sample_label(it: Landmark) -> str:
parts = []
if it.name:
parts.append(it.name)
parts.append(f"{it.lon:.5f},{it.lat:.5f}")
parts.append(f"{it.garmin_type}/{it.garmin_subtype}")
if it.semantic_tags:
cleaned = {k: v for k, v in it.semantic_tags.items() if k != "name"}
if cleaned:
parts.append(json.dumps(cleaned, ensure_ascii=False, sort_keys=True))
return " | ".join(parts)
def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> dict[str, GroupDefinition]:
grouped: dict[str, list[Landmark]] = defaultdict(list)
for it in items:
grouped[make_group_key(it, mode)].append(it)
result: dict[str, GroupDefinition] = {}
for key, group_items in sorted(grouped.items()):
style = infer_group_style(key)
examples = []
seen = set()
for it in group_items:
lbl = sample_label(it)
if lbl in seen:
continue
seen.add(lbl)
examples.append(lbl)
if len(examples) >= example_count:
break
result[key] = GroupDefinition(
key=key,
name=style["name"],
color=style["color"],
icon=style["icon"],
background=style["background"],
examples=examples,
count=len(group_items),
)
return result
def interactive_rename_groups(groups: dict[str, GroupDefinition], enabled: bool, examples: int) -> dict[str, GroupDefinition]:
if not enabled:
return groups
if not sys.stdin.isatty():
print("[warn] --interactive-group-names requested, but stdin is not interactive; keeping automatic names", file=sys.stderr)
return groups
print("\nInteractive group naming\n", file=sys.stderr)
print("Press Enter to keep the proposed name. Type a new name to rename the group. Type ! to keep all remaining names.\n", file=sys.stderr)
keep_all = False
for key in sorted(groups):
group = groups[key]
if keep_all:
continue
print(f"Group key: {group.key}", file=sys.stderr)
print(f"Proposed name: {group.name}", file=sys.stderr)
print(f"Items: {group.count}", file=sys.stderr)
if examples > 0 and group.examples:
print("Examples:", file=sys.stderr)
for ex in group.examples[:examples]:
print(f" - {ex}", file=sys.stderr)
reply = input("Rename? [Enter keep / ! keep all / custom text]: ").strip()
if reply == "!":
keep_all = True
elif reply:
group.name = reply
print(file=sys.stderr)
return groups
# ----------------------------
# Writers
# ----------------------------
def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_path: Path, by: str = "auto") -> None:
gpx = ET.Element("gpx", {
"version": "1.1",
"creator": "landmarks_csv_to_osmand.py",
"xmlns": "http://www.topografix.com/GPX/1/1",
"creator": "landmarks_csv_to_osmand_v2.py",
"xmlns": GPX_NS,
})
metadata = ET.SubElement(gpx, "metadata")
ET.SubElement(metadata, "name").text = out_path.stem
groups_el = ET.SubElement(ET.SubElement(gpx, "extensions"), f"{{{OSMAND_NS}}}points_groups")
def group_key(it: Landmark) -> str:
if by == "type":
return f"{it.garmin_type}_{it.garmin_subtype}"
return classify(it)
groups: dict[str, dict[str, str]] = {}
for it in items:
g = group_key(it)
if g not in groups:
if by == "type":
groups[g] = {
"name": g,
"color": "#FB8C00",
"icon": "marker",
"background": "circle",
}
else:
groups[g] = DEFAULT_GROUPS.get(g, DEFAULT_GROUPS["other"])
for key, cfg in sorted(groups.items()):
for key in sorted(groups):
cfg = groups[key]
ET.SubElement(groups_el, f"{{{OSMAND_NS}}}group", {
"name": key,
"color": cfg["color"],
"icon": cfg["icon"],
"background": cfg["background"],
"name": cfg.name,
"color": cfg.color,
"icon": cfg.icon,
"background": cfg.background,
})
for it in items:
gkey = make_group_key(it, by)
cfg = groups[gkey]
wpt = ET.SubElement(gpx, "wpt", {"lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
if it.name:
ET.SubElement(wpt, "name").text = it.name
ET.SubElement(wpt, "type").text = group_key(it)
ET.SubElement(wpt, "type").text = cfg.name
desc_parts = []
if it.semantic_tags:
desc_parts.append("semantic: " + json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True))
desc_parts.append(f"garmin: kind={it.garmin_kind} type={it.garmin_type} subtype={it.garmin_subtype}")
if it.source_file:
desc_parts.append(f"source={it.source_file}")
if it.duplicate_count > 1:
desc_parts.append(f"dedupe: merged {it.duplicate_count} records")
if it.source_files:
desc_parts.append("sources=" + ", ".join(it.source_files))
ET.SubElement(wpt, "desc").text = "\n".join(desc_parts)
ext = ET.SubElement(wpt, "extensions")
cfg = groups[group_key(it)]
ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg["icon"]
ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg["color"]
ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg["background"]
xml_bytes = ET.tostring(gpx, encoding="utf-8")
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
out_path.write_bytes(pretty)
ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg.icon
ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg.color
ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg.background
write_xml(out_path, gpx)
def write_osm(items: list[Landmark], out_path: Path) -> None:
osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand.py"})
osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v2.py"})
nid = -1
for it in items:
node = ET.SubElement(osm, "node", {"id": str(nid), "lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
@@ -180,73 +500,106 @@ def write_osm(items: list[Landmark], out_path: Path) -> None:
ET.SubElement(node, "tag", {"k": "garmin:kind", "v": it.garmin_kind})
ET.SubElement(node, "tag", {"k": "garmin:type", "v": it.garmin_type})
ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": it.garmin_subtype})
ET.SubElement(node, "tag", {"k": "source:file", "v": it.source_file})
xml_bytes = ET.tostring(osm, encoding="utf-8")
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
out_path.write_bytes(pretty)
if it.duplicate_count > 1:
ET.SubElement(node, "tag", {"k": "source:merge_count", "v": str(it.duplicate_count)})
if it.source_files:
ET.SubElement(node, "tag", {"k": "source:file", "v": ",".join(it.source_files)})
write_xml(out_path, osm)
def write_summary(items: list[Landmark], out_csv: Optional[Path], out_json: Optional[Path], by: str = "category") -> None:
def write_summary(items: list[Landmark], groups: dict[str, GroupDefinition], out_csv: Optional[Path], out_json: Optional[Path], by: str = "auto") -> None:
counts: dict[str, int] = defaultdict(int)
for it in items:
key = classify(it) if by == "category" else f"{it.garmin_type}_{it.garmin_subtype}"
counts[key] += 1
counts[make_group_key(it, by)] += 1
if out_csv:
with out_csv.open("w", encoding="utf-8", newline="") as f:
with open_text_out(out_csv) as f:
w = csv.writer(f)
w.writerow(["group", "count"])
for k, v in sorted(counts.items()):
w.writerow([k, v])
w.writerow(["group_key", "group_name", "count", "examples"])
for key, value in sorted(counts.items()):
group = groups[key]
w.writerow([key, group.name, value, " || ".join(group.examples)])
if out_json:
out_json.write_text(json.dumps(dict(sorted(counts.items())), ensure_ascii=False, indent=2), encoding="utf-8")
payload = {
key: {
"name": groups[key].name,
"count": value,
"examples": groups[key].examples,
"icon": groups[key].icon,
"color": groups[key].color,
}
for key, value in sorted(counts.items())
}
if str(out_json).lower().endswith(".gz"):
with gzip.open(out_json, "wt", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
else:
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def apply_filters(items: list[Landmark], category: Optional[str], filter_tag: list[str], named_only: bool) -> list[Landmark]:
out = items
if category:
out = [it for it in out if classify(it) == category]
for expr in filter_tag:
if "=" not in expr:
raise ValueError(f"invalid --filter-tag value: {expr!r}; expected key=value")
k, v = expr.split("=", 1)
out = [it for it in out if str(it.semantic_tags.get(k)) == v]
if named_only:
out = [it for it in out if it.name]
return out
def print_groups(groups: dict[str, GroupDefinition]) -> None:
for key in sorted(groups):
group = groups[key]
print(f"{key}\t{group.count}\t{group.name}")
for ex in group.examples:
print(f" - {ex}")
def main(argv: Optional[list[str]] = None) -> int:
ap = argparse.ArgumentParser(description="Convert landmark CSV exports into OsmAnd-friendly GPX overlay and/or OSM POI input.")
ap = argparse.ArgumentParser(
description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping and modular coordinate-based dedupe."
)
ap.add_argument("csv", nargs="+", type=Path, help="Input landmark CSV files")
ap.add_argument("--gpx", type=Path, help="Write OsmAnd-friendly GPX waypoint overlay")
ap.add_argument("--osm", type=Path, help="Write OSM XML for OsmAndMapCreator generate-poi")
ap.add_argument("--summary-csv", type=Path, help="Write category/type counts CSV")
ap.add_argument("--summary-json", type=Path, help="Write category/type counts JSON")
ap.add_argument("--group-by", choices=["category", "type"], default="category", help="How GPX waypoint groups should be organized")
ap.add_argument("--summary-by", choices=["category", "type"], default="category")
ap.add_argument("--category", help="Keep only one inferred category, e.g. water_sources, settlements, springs, drinking_water")
ap.add_argument("--summary-csv", type=Path, help="Write group counts CSV")
ap.add_argument("--summary-json", type=Path, help="Write group counts JSON")
ap.add_argument("--group-by", choices=["auto", "category", "type", "source"], default="auto", help="How GPX waypoint groups should be organized")
ap.add_argument("--show-groups", action="store_true", help="Print detected groups with a few examples")
ap.add_argument("--interactive-group-names", action="store_true", help="Interactively rename auto-detected groups")
ap.add_argument("--group-examples", type=int, default=3, help="How many example rows to show for each group")
ap.add_argument("--category", help="Keep only one inferred category, e.g. drinking_water, springs, settlements_village")
ap.add_argument("--filter-tag", action="append", default=[], help="Keep only landmarks whose semantic tags contain key=value")
ap.add_argument("--filter-kind", help="Keep only one Garmin kind, e.g. point")
ap.add_argument("--filter-type", help="Keep only one Garmin type, e.g. 0x64")
ap.add_argument("--filter-subtype", help="Keep only one Garmin subtype, e.g. 0x14")
ap.add_argument("--gpxsee-class", help="Keep only landmarks with the given GPXSee class")
ap.add_argument("--named-only", action="store_true", help="Keep only landmarks with non-empty names")
ap.add_argument("--no-dedupe", action="store_true", help="Disable de-duplication")
ap.add_argument("--dedupe-mode", choices=["coord", "coord_name", "coord_type", "coord_name_type", "off"], default="coord", help="Dedupe strategy; default is coordinate-based only")
ap.add_argument("--dedupe-radius-m", type=float, default=12.0, help="Coordinate dedupe radius in meters")
args = ap.parse_args(argv)
items = load_landmarks(args.csv)
items = apply_filters(items, args.category, args.filter_tag, args.named_only)
if not args.no_dedupe:
items = dedupe(items)
items = apply_filters(
items,
args.category,
args.filter_tag,
args.named_only,
args.filter_kind,
args.filter_type,
args.filter_subtype,
args.gpxsee_class,
)
items = dedupe(items, radius_m=args.dedupe_radius_m, mode=args.dedupe_mode)
if not any([args.gpx, args.osm, args.summary_csv, args.summary_json]):
groups = build_groups(items, args.group_by, example_count=max(0, args.group_examples))
groups = interactive_rename_groups(groups, args.interactive_group_names, args.group_examples)
if args.show_groups:
print_groups(groups)
if not any([args.gpx, args.osm, args.summary_csv, args.summary_json, args.show_groups]):
print(f"loaded {len(items)} landmarks", file=sys.stderr)
return 0
if args.gpx:
write_gpx(items, args.gpx, by=args.group_by)
write_gpx(items, groups, args.gpx, by=args.group_by)
if args.osm:
write_osm(items, args.osm)
if args.summary_csv or args.summary_json:
write_summary(items, args.summary_csv, args.summary_json, by=args.summary_by)
write_summary(items, groups, args.summary_csv, args.summary_json, by=args.group_by)
print(f"[info] kept {len(items)} landmarks")
print(f"[info] dedupe mode={args.dedupe_mode} radius={args.dedupe_radius_m:g}m")
if args.gpx:
print(f"[info] wrote GPX overlay: {args.gpx}")
if args.osm:

View File

@@ -1,122 +1,89 @@
You have **two good OsmAnd targets** now.
Done.
The fast, practical one is **GPX waypoint overlay**. OsmAnd supports GPX import, waypoint icons/colors, and **waypoint grouping** through OsmAnd-specific GPX extensions, so this is the closest match to a toggleable landmark layer without fighting the full map compiler. Favorites are also stored/imported as GPX waypoints, which confirms GPX is a native path for point overlays. ([OsmAnd][1])
Updated script:
[landmarks_csv_to_osmand_v2.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v2.py)
The more native/searchable one is **POI-only OBF**. OsmAndMapCreator officially supports `generate-poi` separately from `generate-obf`, and it accepts OSM-family input such as `.osm`, `.osm.gz`, `.osm.bz2`, and `.pbf`. That makes it the right target when you want searchable/filterable POIs but do **not** want the routing stage that is currently crashing. ([OsmAnd][2])
What changed:
I packaged both paths into a converter:
* default dedupe is now **coordinate-based only**
* dedupe is modular:
[landmarks_csv_to_osmand.py](sandbox:/mnt/data/landmarks_csv_to_osmand.py)
* `coord`
* `coord_name`
* `coord_type`
* `coord_name_type`
* `off`
* dedupe uses a meter radius instead of naive decimal rounding
* duplicates are **merged**, not just dropped
* merged records keep:
Sample outputs:
* best name
* richer semantic tags
* union of GPXSee classes
* source file list
* duplicate count
* grouping is improved:
* [sample GPX overlay](sandbox:/mnt/data/water_sources_02335140.gpx)
* [sample OSM for POI build](sandbox:/mnt/data/water_sources_02335140.osm)
* `--group-by auto`
* `--group-by category`
* `--group-by type`
* `--group-by source`
* automatic grouping now distinguishes things like:
### What I recommend
* `amenity_drinking_water`
* `natural_spring`
* `place_village`
* fallback Garmin or GPXSee groups
* interactive group naming added:
For a “BGMountains-like thing I can switch on and off,” use **GPX overlays** first.
For a “native searchable POI pack,” use **POI-only OBF** second.
* `--interactive-group-names`
* shows sample examples from each detected group
* lets you keep or rename the proposed group name
* loader is now tolerant of both:
### Path 1: GPX overlay layer
* the raw landmark CSVs
* the slimmer per-category CSVs you are iterating through
* `.gpx.gz`, `.osm.gz`, `.json.gz`, `.csv.gz` now actually get gzip-written correctly
This is the quickest route.
Useful commands:
It converts your landmark CSVs into a GPX file with:
Show detected groups with examples:
* one waypoint per landmark
* grouped categories using `<type>`
* OsmAnd waypoint group styling via `osmand:points_groups`
* icons/colors per category
Build it like this:
```bash id="14483"
python landmarks_csv_to_osmand.py *.csv --gpx bg_landmarks.gpx --group-by category
```bash
python landmarks_csv_to_osmand_v2.py *.csv --show-groups
```
For just water sources:
Interactive rename flow:
```bash id="52386"
python landmarks_csv_to_osmand.py *.csv --category water_sources --gpx bg_water_sources.gpx
```bash
python landmarks_csv_to_osmand_v2.py *.csv --show-groups --interactive-group-names --group-by auto --gpx landmarks.gpx
```
Then import that GPX into OsmAnd. GPX waypoints and waypoint groups are supported officially, including icon/color/background customization. ([OsmAnd][1])
Tighter coordinate dedupe:
**Best practice for toggling:** make **one GPX file per theme**:
* `water_sources.gpx`
* `villages.gpx`
* `summits.gpx`
* `springs.gpx`
That gives you clean on/off control by file instead of stuffing everything into one blob.
### Path 2: native POI pack for OsmAnd
This is better when you want it to behave more like a built-in searchable POI dataset.
First convert CSVs to OSM XML:
```bash id="51512"
python landmarks_csv_to_osmand.py *.csv --category water_sources --osm water_sources.osm
```bash
python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord --dedupe-radius-m 6 --gpx landmarks.gpx
```
Then build a POI-only OBF:
Coordinate dedupe, but only merge when type also matches:
```bash id="68855"
java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^
net.osmand.MainUtilities generate-poi C:\path\to\water_sources.osm
```bash
python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord_type --dedupe-radius-m 10 --gpx landmarks.gpx
```
That uses OsmAndMapCreators documented `generate-poi` mode and avoids the routing stage entirely. ([OsmAnd][2])
Water-only overlay:
### Which one should you choose?
Use **GPX** when you want:
* fast import
* easy toggle on/off
* visual overlay behavior
* category grouping with custom icons/colors
Use **POI-only OBF** when you want:
* more native OsmAnd integration
* better POI-style searching/indexing
* a cleaner long-term pack
### Important limitation
For now, your landmark CSV path is strongest for **exact-coordinate point features**. That is why it works well for springs, drinking water, villages, summits, and similar point landmarks. It is not the same as a full vector map overlay with arbitrary line/polygon filtering.
### Ready-to-use commands
All CSVs into one grouped GPX:
```bash id="40684"
python landmarks_csv_to_osmand.py *.csv --gpx all_landmarks.gpx --group-by category --summary-json all_landmarks_summary.json
```bash
python landmarks_csv_to_osmand_v2.py *.csv --filter-tag amenity=drinking_water --gpx drinking_water.gpx
```
Only water sources into GPX:
Springs-only overlay:
```bash id="33043"
python landmarks_csv_to_osmand.py *.csv --category water_sources --gpx water_sources.gpx
```bash
python landmarks_csv_to_osmand_v2.py *.csv --filter-tag natural=spring --gpx springs.gpx
```
Only water sources into OSM for POI build:
If you want the next revision, Id push it further in two directions:
```bash id="57957"
python landmarks_csv_to_osmand.py *.csv --category water_sources --osm water_sources.osm
```
Only named villages into GPX:
```bash id="12262"
python landmarks_csv_to_osmand.py *.csv --filter-tag place=village --named-only --gpx villages.gpx
```
The cleanest deployment pattern is: **one GPX per category for toggles, plus one POI-only OBF per high-value category for native search**.
[1]: https://osmand.net/docs/technical/osmand-file-formats/osmand-gpx/?utm_source=chatgpt.com "OsmAnd GPX | OsmAnd"
[2]: https://osmand.net/docs/technical/map-creation/create-offline-maps-yourself/?utm_source=chatgpt.com "Create Offline Raster & Vector Maps | OsmAnd"
* add **dedupe reports** showing which rows got merged into which canonical landmark
* add **interactive split/merge controls** for ambiguous groups, not just rename controls