stage-2-v3-configurable-v2

This commit is contained in:
nq
2026-04-16 04:54:30 -07:00
parent d1fe219e6c
commit 85a242b84b

View File

@@ -282,6 +282,8 @@ def infer_style_and_name(it: Landmark, locale: str) -> dict[str, Any]:
style["label"] = "Селища"
else:
style["label"] = humanize_key(it.point_group_key or it.raw_type_key())
style["icon"] = "marker"
style["color"] = "#FB8C00"
return {
"semantic_class": sem_class,
"base_name": base,
@@ -355,9 +357,21 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op
raw_types = Counter(m.raw_type_key() for m in members)
sem_pairs = Counter()
for m in members:
for k,v in m.semantic_tags.items():
for k, v in m.semantic_tags.items():
if k != "name" and v:
sem_pairs[(k,v)] += 1
sem_pairs[(k, v)] += 1
member_count = max(1, len(members))
semantic_required_any = []
semantic_preferred_any = []
for (k, v), c in sem_pairs.most_common(5):
pair = {k: v}
coverage = c / member_count
if coverage >= 0.80:
semantic_required_any.append(pair)
else:
semantic_preferred_any.append(pair)
group_id = re.sub(r"[^a-zA-Z0-9_]+", "_", key).strip("_").lower() or "group"
groups_cfg.append({
"id": group_id,
@@ -366,7 +380,8 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op
"match": {
"point_group_keys": [key],
"raw_types": [rt for rt, _ in raw_types.most_common(5)],
"semantic_required_any": [{k:v} for (k,v), _ in sem_pairs.most_common(5)],
"semantic_required_any": semantic_required_any,
"semantic_preferred_any": semantic_preferred_any,
"gpxsee_classes_any": sorted({c for m in members for c in m.gpxsee_classes})[:8],
"has_name": None,
"has_image": None,
@@ -386,6 +401,7 @@ def analyze_to_config(items: list[Landmark], example_count: int, locale_hint: Op
"image_count": sum(1 for m in members if m.garmin_image_id),
"raw_type_counts": dict(raw_types.most_common(10)),
"semantic_tag_counts": {f"{k}={v}": c for (k,v), c in sem_pairs.most_common(10)},
"semantic_tag_coverages": {f"{k}={v}": round(c / member_count, 6) for (k,v), c in sem_pairs.most_common(10)},
},
"examples": spread_examples(members, example_count),
})
@@ -426,6 +442,31 @@ def load_json(path: Path) -> Any:
return json.load(f)
return json.loads(path.read_text(encoding="utf-8"))
def migrate_config(config: dict[str, Any]) -> dict[str, Any]:
"""Normalize older autoconfigs that accidentally promoted sparse semantic
tags into required match constraints for raw symbol groups."""
cfg = json.loads(json.dumps(config, ensure_ascii=False))
for g in cfg.get("groups") or []:
match = g.setdefault("match", {})
sem_req = list(match.get("semantic_required_any") or [])
coverages = dict((g.get("stats") or {}).get("semantic_tag_coverages") or {})
if sem_req and (match.get("raw_types") or match.get("point_group_keys")):
keep_req = []
preferred = list(match.get("semantic_preferred_any") or [])
for kv in sem_req:
if isinstance(kv, dict) and len(kv) == 1:
(k, v), = kv.items()
cov = coverages.get(f"{k}={v}")
if cov is not None and float(cov) < 0.80:
preferred.append(kv)
else:
keep_req.append(kv)
else:
keep_req.append(kv)
match["semantic_required_any"] = keep_req
match["semantic_preferred_any"] = preferred
return cfg
def has_cyrillic(text: str) -> bool:
return any(0x0400 <= ord(ch) <= 0x04FF for ch in text)
@@ -508,11 +549,13 @@ def render_name(it: Landmark, group: dict[str, Any], config: dict[str, Any]) ->
return sanitize_text(template.format(base_name=base, hash=short_hash, existing_name=existing))
return existing or sanitize_text(template.format(base_name=base, hash=short_hash, existing_name=existing))
def build_records(items: list[Landmark], config: dict[str, Any]) -> list[dict[str, Any]]:
def build_records(items: list[Landmark], config: dict[str, Any]) -> tuple[list[dict[str, Any]], list[Landmark]]:
out = []
unmatched: list[Landmark] = []
for it in items:
group = assign_group(it, config)
if group is None:
unmatched.append(it)
continue
display = group.get("display") or {}
final_name = render_name(it, group, config)
@@ -548,7 +591,29 @@ def build_records(items: list[Landmark], config: dict[str, Any]) -> list[dict[st
"source_files": list(it.source_files),
"duplicate_count": it.duplicate_count,
})
return out
return out, unmatched
def write_unmatched_csv(items: list[Landmark], path: Path):
fields = ["mapset","lon","lat","name","garmin_kind","garmin_type","garmin_subtype","point_group_key","garmin_image_id","gpxsee_classes_json","semantic_tags_json","point_interest_reasons_json","source_files_json"]
with open_text(path, "w") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for it in items:
w.writerow({
"mapset": it.mapset,
"lon": f"{it.lon:.8f}",
"lat": f"{it.lat:.8f}",
"name": sanitize_text(it.name),
"garmin_kind": it.garmin_kind,
"garmin_type": it.garmin_type,
"garmin_subtype": it.garmin_subtype,
"point_group_key": it.point_group_key,
"garmin_image_id": it.garmin_image_id,
"gpxsee_classes_json": json.dumps(it.gpxsee_classes, ensure_ascii=False),
"semantic_tags_json": json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True),
"point_interest_reasons_json": json.dumps(it.point_interest_reasons, ensure_ascii=False),
"source_files_json": json.dumps(it.source_files, ensure_ascii=False),
})
def write_resolved_csv(records: list[dict[str, Any]], path: Path):
fields = ["group_id","group_label","lon","lat","name","short_hash","icon","color","background","duplicate_count","source_files_json","props_json"]
@@ -651,6 +716,7 @@ def main(argv=None):
b.add_argument("--osm", type=Path)
b.add_argument("--gpx", type=Path)
b.add_argument("--summary-json", type=Path)
b.add_argument("--unmatched-csv", type=Path, help="Write unmatched input landmarks for debugging config misses")
b.add_argument("--dedupe-mode", choices=["coord","coord_name","coord_type","coord_name_type","off"])
b.add_argument("--dedupe-radius-m", type=float)
args = ap.parse_args(argv)
@@ -673,15 +739,16 @@ def main(argv=None):
print(f"[info] wrote summary: {args.summary_json}")
return 0
cfg = load_json(args.config)
cfg = migrate_config(load_json(args.config))
defaults = cfg.get("defaults") or {}
dedupe_cfg = defaults.get("dedupe") or {}
mode = args.dedupe_mode or dedupe_cfg.get("mode", "coord")
radius = args.dedupe_radius_m if args.dedupe_radius_m is not None else float(dedupe_cfg.get("radius_m", 12.0))
items = dedupe(items, radius_m=radius, mode=mode)
records = build_records(items, cfg)
if not any([args.resolved_csv, args.osm, args.gpx, args.summary_json]):
records, unmatched = build_records(items, cfg)
if not any([args.resolved_csv, args.osm, args.gpx, args.summary_json, args.unmatched_csv]):
print(f"[info] built {len(records)} records")
print(f"[info] unmatched {len(unmatched)} landmarks")
return 0
if args.resolved_csv:
write_resolved_csv(records, args.resolved_csv)
@@ -692,10 +759,14 @@ def main(argv=None):
if args.gpx:
write_gpx(records, args.gpx)
print(f"[info] wrote GPX: {args.gpx}")
if args.unmatched_csv:
write_unmatched_csv(unmatched, args.unmatched_csv)
print(f"[info] wrote unmatched CSV: {args.unmatched_csv}")
if args.summary_json:
c = Counter(r["group_id"] for r in records)
save_json(args.summary_json, {
"record_count": len(records),
"unmatched_count": len(unmatched),
"groups": dict(c.most_common()),
"dedupe_mode": mode,
"dedupe_radius_m": radius,