diff --git a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py index 83c851d..6118810 100644 --- a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py +++ b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py @@ -282,6 +282,8 @@ def infer_style_and_name(it: Landmark, locale: str) -> dict[str, Any]: style["label"] = "Селища" else: style["label"] = humanize_key(it.point_group_key or it.raw_type_key()) + style["icon"] = "marker" + style["color"] = "#FB8C00" return { "semantic_class": sem_class, "base_name": base, @@ -355,9 +357,21 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op raw_types = Counter(m.raw_type_key() for m in members) sem_pairs = Counter() for m in members: - for k,v in m.semantic_tags.items(): + for k, v in m.semantic_tags.items(): if k != "name" and v: - sem_pairs[(k,v)] += 1 + sem_pairs[(k, v)] += 1 + + member_count = max(1, len(members)) + semantic_required_any = [] + semantic_preferred_any = [] + for (k, v), c in sem_pairs.most_common(5): + pair = {k: v} + coverage = c / member_count + if coverage >= 0.80: + semantic_required_any.append(pair) + else: + semantic_preferred_any.append(pair) + group_id = re.sub(r"[^a-zA-Z0-9_]+", "_", key).strip("_").lower() or "group" groups_cfg.append({ "id": group_id, @@ -366,7 +380,8 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op "match": { "point_group_keys": [key], "raw_types": [rt for rt, _ in raw_types.most_common(5)], - "semantic_required_any": [{k:v} for (k,v), _ in sem_pairs.most_common(5)], + "semantic_required_any": semantic_required_any, + "semantic_preferred_any": semantic_preferred_any, "gpxsee_classes_any": sorted({c for m in members for c in m.gpxsee_classes})[:8], "has_name": None, "has_image": None, @@ -386,6 +401,7 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op "image_count": sum(1 for m in members if m.garmin_image_id), "raw_type_counts": dict(raw_types.most_common(10)), "semantic_tag_counts": {f"{k}={v}": c for (k,v), c in sem_pairs.most_common(10)}, + "semantic_tag_coverages": {f"{k}={v}": round(c / member_count, 6) for (k,v), c in sem_pairs.most_common(10)}, }, "examples": spread_examples(members, example_count), }) @@ -426,6 +442,31 @@ def load_json(path: Path) -> Any: return json.load(f) return json.loads(path.read_text(encoding="utf-8")) +def migrate_config(config: dict[str, Any]) -> dict[str, Any]: + """Normalize older autoconfigs that accidentally promoted sparse semantic + tags into required match constraints for raw symbol groups.""" + cfg = json.loads(json.dumps(config, ensure_ascii=False)) + for g in cfg.get("groups") or []: + match = g.setdefault("match", {}) + sem_req = list(match.get("semantic_required_any") or []) + coverages = dict((g.get("stats") or {}).get("semantic_tag_coverages") or {}) + if sem_req and (match.get("raw_types") or match.get("point_group_keys")): + keep_req = [] + preferred = list(match.get("semantic_preferred_any") or []) + for kv in sem_req: + if isinstance(kv, dict) and len(kv) == 1: + (k, v), = kv.items() + cov = coverages.get(f"{k}={v}") + if cov is not None and float(cov) < 0.80: + preferred.append(kv) + else: + keep_req.append(kv) + else: + keep_req.append(kv) + match["semantic_required_any"] = keep_req + match["semantic_preferred_any"] = preferred + return cfg + def has_cyrillic(text: str) -> bool: return any(0x0400 <= ord(ch) <= 0x04FF for ch in text) @@ -508,11 +549,13 @@ def render_name(it: Landmark, group: dict[str, Any], config: dict[str, Any]) -> return sanitize_text(template.format(base_name=base, hash=short_hash, existing_name=existing)) return existing or sanitize_text(template.format(base_name=base, hash=short_hash, existing_name=existing)) -def build_records(items: list[Landmark], config: dict[str, Any]) -> list[dict[str, Any]]: +def build_records(items: list[Landmark], config: dict[str, Any]) -> tuple[list[dict[str, Any]], list[Landmark]]: out = [] + unmatched: list[Landmark] = [] for it in items: group = assign_group(it, config) if group is None: + unmatched.append(it) continue display = group.get("display") or {} final_name = render_name(it, group, config) @@ -548,7 +591,29 @@ def build_records(items: list[Landmark], config: dict[str, Any]) -> list[dict[st "source_files": list(it.source_files), "duplicate_count": it.duplicate_count, }) - return out + return out, unmatched + +def write_unmatched_csv(items: list[Landmark], path: Path): + fields = ["mapset","lon","lat","name","garmin_kind","garmin_type","garmin_subtype","point_group_key","garmin_image_id","gpxsee_classes_json","semantic_tags_json","point_interest_reasons_json","source_files_json"] + with open_text(path, "w") as f: + w = csv.DictWriter(f, fieldnames=fields) + w.writeheader() + for it in items: + w.writerow({ + "mapset": it.mapset, + "lon": f"{it.lon:.8f}", + "lat": f"{it.lat:.8f}", + "name": sanitize_text(it.name), + "garmin_kind": it.garmin_kind, + "garmin_type": it.garmin_type, + "garmin_subtype": it.garmin_subtype, + "point_group_key": it.point_group_key, + "garmin_image_id": it.garmin_image_id, + "gpxsee_classes_json": json.dumps(it.gpxsee_classes, ensure_ascii=False), + "semantic_tags_json": json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True), + "point_interest_reasons_json": json.dumps(it.point_interest_reasons, ensure_ascii=False), + "source_files_json": json.dumps(it.source_files, ensure_ascii=False), + }) def write_resolved_csv(records: list[dict[str, Any]], path: Path): fields = ["group_id","group_label","lon","lat","name","short_hash","icon","color","background","duplicate_count","source_files_json","props_json"] @@ -651,6 +716,7 @@ def main(argv=None): b.add_argument("--osm", type=Path) b.add_argument("--gpx", type=Path) b.add_argument("--summary-json", type=Path) + b.add_argument("--unmatched-csv", type=Path, help="Write unmatched input landmarks for debugging config misses") b.add_argument("--dedupe-mode", choices=["coord","coord_name","coord_type","coord_name_type","off"]) b.add_argument("--dedupe-radius-m", type=float) args = ap.parse_args(argv) @@ -673,15 +739,16 @@ def main(argv=None): print(f"[info] wrote summary: {args.summary_json}") return 0 - cfg = load_json(args.config) + cfg = migrate_config(load_json(args.config)) defaults = cfg.get("defaults") or {} dedupe_cfg = defaults.get("dedupe") or {} mode = args.dedupe_mode or dedupe_cfg.get("mode", "coord") radius = args.dedupe_radius_m if args.dedupe_radius_m is not None else float(dedupe_cfg.get("radius_m", 12.0)) items = dedupe(items, radius_m=radius, mode=mode) - records = build_records(items, cfg) - if not any([args.resolved_csv, args.osm, args.gpx, args.summary_json]): + records, unmatched = build_records(items, cfg) + if not any([args.resolved_csv, args.osm, args.gpx, args.summary_json, args.unmatched_csv]): print(f"[info] built {len(records)} records") + print(f"[info] unmatched {len(unmatched)} landmarks") return 0 if args.resolved_csv: write_resolved_csv(records, args.resolved_csv) @@ -692,10 +759,14 @@ def main(argv=None): if args.gpx: write_gpx(records, args.gpx) print(f"[info] wrote GPX: {args.gpx}") + if args.unmatched_csv: + write_unmatched_csv(unmatched, args.unmatched_csv) + print(f"[info] wrote unmatched CSV: {args.unmatched_csv}") if args.summary_json: c = Counter(r["group_id"] for r in records) save_json(args.summary_json, { "record_count": len(records), + "unmatched_count": len(unmatched), "groups": dict(c.most_common()), "dedupe_mode": mode, "dedupe_radius_m": radius,