stage-3-roads - NON WORKING

This commit is contained in:
nq
2026-05-03 02:00:14 -07:00
parent ca36d12324
commit b3587bee51
5 changed files with 1701 additions and 0 deletions

View File

@@ -0,0 +1,386 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import gzip
import hashlib
import importlib.util
import json
import math
import sys
from collections import Counter
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
EARTH_M_PER_DEG_LAT = 111_320.0
def info(msg: str) -> None:
print(f"[info] {msg}", file=sys.stderr)
def warn(msg: str) -> None:
print(f"[warn] {msg}", file=sys.stderr)
def load_stage1_module(path: Optional[Path] = None):
path = path or (Path(__file__).with_name('garmin_img_to_osmand_v6.py'))
spec = importlib.util.spec_from_file_location('garmin_img_stage1_v6', path)
if spec is None or spec.loader is None:
raise RuntimeError(f'cannot load stage1 module from {path}')
mod = importlib.util.module_from_spec(spec)
sys.modules['garmin_img_stage1_v6'] = mod
spec.loader.exec_module(mod)
return mod
def open_text(path: Path, mode: str):
if str(path).lower().endswith('.gz'):
return gzip.open(path, mode + 't', encoding='utf-8', newline='')
return open(path, mode, encoding='utf-8', newline='')
def meters_per_deg_lon(lat_deg: float) -> float:
return EARTH_M_PER_DEG_LAT * max(0.01, math.cos(math.radians(lat_deg)))
def line_length_m(coords: List[List[float]]) -> float:
total = 0.0
for (ax, ay), (bx, by) in zip(coords, coords[1:]):
mean_lat = (ay + by) / 2.0
dx = (bx - ax) * meters_per_deg_lon(mean_lat)
dy = (by - ay) * EARTH_M_PER_DEG_LAT
total += math.hypot(dx, dy)
return total
def line_bbox(coords: List[List[float]]) -> Tuple[float, float, float, float]:
xs = [p[0] for p in coords]
ys = [p[1] for p in coords]
return min(xs), min(ys), max(xs), max(ys)
def line_centroid(coords: List[List[float]]) -> Tuple[float, float]:
n = max(1, len(coords))
return sum(p[0] for p in coords) / n, sum(p[1] for p in coords) / n
def line_endpoints(coords: List[List[float]]) -> Tuple[float, float, float, float]:
a = coords[0]
b = coords[-1]
return a[0], a[1], b[0], b[1]
def is_closed(coords: List[List[float]], tol_m: float = 5.0) -> bool:
if len(coords) < 3:
return False
a = coords[0]
b = coords[-1]
mean_lat = (a[1] + b[1]) / 2.0
dx = (b[0] - a[0]) * meters_per_deg_lon(mean_lat)
dy = (b[1] - a[1]) * EARTH_M_PER_DEG_LAT
return math.hypot(dx, dy) <= tol_m
def stable_line_hash(coords: List[List[float]], garmin_kind: str, garmin_type: str, garmin_subtype: str, *, grid_m: float = 12.0) -> str:
if not coords:
seed = f'{garmin_kind}|{garmin_type}|{garmin_subtype}|empty'
return hashlib.blake2b(seed.encode('utf-8'), digest_size=8).hexdigest().upper()[:12]
lon0, lat0 = line_centroid(coords)
pts = []
for lon, lat in coords:
qx = round((lon - lon0) * meters_per_deg_lon(lat0) / grid_m)
qy = round((lat - lat0) * EARTH_M_PER_DEG_LAT / grid_m)
pts.append(f'{qx}:{qy}')
a = '|'.join(pts)
b = '|'.join(reversed(pts))
canon = min(a, b)
seed = f'{garmin_kind}|{garmin_type}|{garmin_subtype}|{len(coords)}|{canon}'
return hashlib.blake2b(seed.encode('utf-8'), digest_size=8).hexdigest().upper()[:12]
def endpoint_hash(coords: List[List[float]], *, grid_m: float = 8.0) -> str:
if not coords:
return ''
ax, ay = coords[0]
bx, by = coords[-1]
lat0 = (ay + by) / 2.0
p1 = (round(ax * meters_per_deg_lon(lat0) / grid_m), round(ay * EARTH_M_PER_DEG_LAT / grid_m))
p2 = (round(bx * meters_per_deg_lon(lat0) / grid_m), round(by * EARTH_M_PER_DEG_LAT / grid_m))
a, b = sorted([p1, p2])
raw = f'{a[0]}:{a[1]}|{b[0]}:{b[1]}'
return hashlib.blake2b(raw.encode('utf-8'), digest_size=6).hexdigest().upper()[:10]
def parse_near(text: Optional[str]) -> Optional[Tuple[float, float, float]]:
if not text:
return None
parts = [p.strip() for p in text.split(',')]
if len(parts) not in (2, 3):
raise SystemExit('--near must be lon,lat[,radius_m]')
lon = float(parts[0]); lat = float(parts[1]); radius = float(parts[2]) if len(parts) == 3 else 50.0
return lon, lat, radius
def line_intersects_near(coords: List[List[float]], near: Tuple[float, float, float]) -> bool:
lon, lat, radius_m = near
for x, y in coords:
mean_lat = (lat + y) / 2.0
dx = (x - lon) * meters_per_deg_lon(mean_lat)
dy = (y - lat) * EARTH_M_PER_DEG_LAT
if math.hypot(dx, dy) <= radius_m:
return True
return False
def parse_tag_filters(values: Optional[List[str]]) -> List[Tuple[str, str]]:
out = []
for v in values or []:
if '=' not in v:
raise SystemExit(f'invalid --filter-tag value {v!r}, expected key=value')
k, val = v.split('=', 1)
out.append((k.strip(), val.strip()))
return out
def infer_line_class(sem: Dict[str, str], gpxsee_classes: List[str]) -> str:
if sem.get('highway'):
return f'highway:{sem["highway"]}'
if sem.get('railway'):
return f'railway:{sem["railway"]}'
if sem.get('route'):
return f'route:{sem["route"]}'
if sem.get('waterway'):
return f'waterway:{sem["waterway"]}'
if sem.get('natural') == 'coastline':
return 'natural:coastline'
classes = {c.lower() for c in gpxsee_classes}
if 'contour_line' in classes:
return 'contour:elevation'
if 'cartographic_line' in classes:
return 'cartographic:line'
if 'styled_line' in classes:
return 'styled:line'
return 'raw'
def default_group_key(sem: Dict[str, str], gpxsee_classes: List[str], garmin_kind: str, garmin_type: str, garmin_subtype: str) -> str:
line_class = infer_line_class(sem, gpxsee_classes)
if line_class != 'raw':
return line_class
return f'raw:{garmin_kind}:{garmin_type}:{garmin_subtype}'
def road_interest_score(sem: Dict[str, str], gpxsee_classes: List[str], length_m: float) -> int:
score = 0
if sem.get('highway'):
score += 60
if sem.get('route') == 'ferry' or sem.get('railway'):
score += 40
if sem.get('highway') in {'motorway', 'primary', 'secondary', 'tertiary'}:
score += 20
if sem.get('highway') in {'track', 'path', 'cycleway', 'bridleway'}:
score += 10
if 'contour_line' in {c.lower() for c in gpxsee_classes}:
score -= 40
if sem.get('waterway'):
score -= 20
score += min(20, int(length_m // 500))
return score
def profile_accepts(sem: Dict[str, str], gpxsee_classes: List[str], profile: str) -> bool:
classes = {c.lower() for c in gpxsee_classes}
if profile == 'all_lines':
return True
if profile == 'roads':
return 'highway' in sem
if profile == 'roads_paths':
return 'highway' in sem or sem.get('route') == 'ferry'
if profile == 'roads_strict':
return sem.get('highway') in {'motorway','primary','secondary','tertiary','unclassified','residential','service','road'}
if profile == 'transport':
return any(k in sem for k in ('highway', 'railway', 'route', 'aerialway', 'aeroway'))
if profile == 'hydro':
return 'waterway' in sem or sem.get('natural') == 'coastline'
if profile == 'contours':
return 'contour_line' in classes or sem.get('contour') == 'elevation'
if profile == 'cartographic':
return 'cartographic_line' in classes or 'styled_line' in classes
if profile == 'nonroads':
return ('highway' not in sem) and (profile_accepts(sem, gpxsee_classes, 'all_lines'))
raise ValueError(f'unknown profile {profile!r}')
def normalize_row(f: Any, sem: Dict[str, str], gpxsee_classes: List[str], coords: List[List[float]]) -> Dict[str, Any]:
min_lon, min_lat, max_lon, max_lat = line_bbox(coords)
start_lon, start_lat, end_lon, end_lat = line_endpoints(coords)
length_m = line_length_m(coords)
feature_id = stable_line_hash(coords, str(f.props.get('garmin_kind') or ''), str(f.props.get('garmin_type') or ''), str(f.props.get('garmin_subtype') or ''))
centroid_lon, centroid_lat = line_centroid(coords)
group_key = default_group_key(sem, gpxsee_classes, str(f.props.get('garmin_kind') or ''), str(f.props.get('garmin_type') or ''), str(f.props.get('garmin_subtype') or ''))
row = {
'mapset': str(f.props.get('mapset') or ''),
'feature_id': feature_id,
'endpoint_hash': endpoint_hash(coords),
'name': str(sem.get('name') or f.props.get('name') or ''),
'garmin_kind': str(f.props.get('garmin_kind') or ''),
'garmin_type': str(f.props.get('garmin_type') or ''),
'garmin_subtype': str(f.props.get('garmin_subtype') or ''),
'line_class': infer_line_class(sem, gpxsee_classes),
'road_group_key': group_key,
'highway': sem.get('highway', ''),
'railway': sem.get('railway', ''),
'route': sem.get('route', ''),
'waterway': sem.get('waterway', ''),
'aerialway': sem.get('aerialway', ''),
'aeroway': sem.get('aeroway', ''),
'natural': sem.get('natural', ''),
'surface': sem.get('surface', ''),
'tracktype': sem.get('tracktype', ''),
'junction': sem.get('junction', ''),
'oneway': 'yes' if f.props.get('garmin_direction') else '',
'closed_loop': 'yes' if is_closed(coords) else '',
'point_count': len(coords),
'length_m': round(length_m, 1),
'road_interest_score': road_interest_score(sem, gpxsee_classes, length_m),
'preview_lon': f'{centroid_lon:.8f}',
'preview_lat': f'{centroid_lat:.8f}',
'start_lon': f'{start_lon:.8f}', 'start_lat': f'{start_lat:.8f}',
'end_lon': f'{end_lon:.8f}', 'end_lat': f'{end_lat:.8f}',
'min_lon': f'{min_lon:.8f}', 'min_lat': f'{min_lat:.8f}',
'max_lon': f'{max_lon:.8f}', 'max_lat': f'{max_lat:.8f}',
'bbox_json': json.dumps({'west': min_lon, 'south': min_lat, 'east': max_lon, 'north': max_lat}, ensure_ascii=False, separators=(',', ':')),
'gpxsee_classes_json': json.dumps(gpxsee_classes, ensure_ascii=False),
'semantic_tags_json': json.dumps(sem, ensure_ascii=False, sort_keys=True),
'coords_json': json.dumps(coords, ensure_ascii=False, separators=(',', ':')),
}
return row
def matches_filters(row: Dict[str, Any], tag_filters: List[Tuple[str, str]], line_classes: Optional[List[str]], group_keys: Optional[List[str]]) -> bool:
if line_classes and row['line_class'] not in set(line_classes):
return False
if group_keys and row['road_group_key'] not in set(group_keys):
return False
sem = json.loads(row['semantic_tags_json']) if row.get('semantic_tags_json') else {}
for k, v in tag_filters:
if str(sem.get(k, '')) != v:
return False
return True
def extract_rows(mod, img: Path, *, mapsets: Optional[List[str]], bbox: Optional[str], profile: str, near: Optional[Tuple[float, float, float]], min_length_m: float, tag_filters: List[Tuple[str, str]], line_classes: Optional[List[str]], group_keys: Optional[List[str]], max_rows: int) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
bbox_parsed = mod._parse_bbox(bbox) if bbox else None
features, meta = mod.load_features_from_img(img, mapsets=mapsets, bbox=bbox_parsed)
rows: List[Dict[str, Any]] = []
group_counter = Counter()
type_counter = Counter()
class_counter = Counter()
dropped = Counter()
for f in features:
if f.geom_type != 'LineString':
continue
sem = mod.semantic_tags_for_feature(f)
gpxsee_classes = mod.gpxsee_classes_for_feature(f)
if not profile_accepts(sem, gpxsee_classes, profile):
dropped['profile'] += 1
continue
coords = f.coords
if len(coords) < 2:
dropped['too_short_geom'] += 1
continue
if near and not line_intersects_near(coords, near):
dropped['near'] += 1
continue
length_m = line_length_m(coords)
if length_m < min_length_m:
dropped['length'] += 1
continue
row = normalize_row(f, sem, gpxsee_classes, coords)
if not matches_filters(row, tag_filters, line_classes, group_keys):
dropped['filters'] += 1
continue
rows.append(row)
group_counter[row['road_group_key']] += 1
type_counter[f"{row['garmin_type']}:{row['garmin_subtype']}"] += 1
class_counter[row['line_class']] += 1
if max_rows and len(rows) >= max_rows:
break
meta2 = dict(meta)
meta2.update({
'road_profile': profile,
'road_count': len(rows),
'road_groups': dict(group_counter.most_common()),
'line_classes': dict(class_counter.most_common()),
'raw_type_counts': dict(type_counter.most_common()),
'dropped_counts': dict(dropped),
})
return rows, meta2
def write_roads_csv(rows: List[Dict[str, Any]], path: Path) -> None:
fields = [
'mapset','feature_id','endpoint_hash','name','garmin_kind','garmin_type','garmin_subtype','line_class','road_group_key',
'highway','railway','route','waterway','aerialway','aeroway','natural','surface','tracktype','junction','oneway','closed_loop',
'point_count','length_m','road_interest_score','preview_lon','preview_lat','start_lon','start_lat','end_lon','end_lat','min_lon','min_lat','max_lon','max_lat',
'bbox_json','gpxsee_classes_json','semantic_tags_json','coords_json'
]
with open_text(path, 'w') as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for row in rows:
w.writerow({k: row.get(k, '') for k in fields})
def print_groups(rows: List[Dict[str, Any]]) -> None:
cnt = Counter(r['road_group_key'] for r in rows)
print('road_group_key\tcount\tsample_line_class\tsample_name')
sample = {}
for r in rows:
sample.setdefault(r['road_group_key'], r)
for key, value in cnt.most_common():
s = sample[key]
print(f"{key}\t{value}\t{s.get('line_class','')}\t{s.get('name','')}")
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description='Stage 1 road extractor from Garmin IMG. Extracts LineString features into road-focused CSV for stage 2 packaging.')
ap.add_argument('img', type=Path)
ap.add_argument('--stage1-module', type=Path, help='Path to garmin_img_to_osmand_v6.py')
ap.add_argument('--mapset', action='append')
ap.add_argument('--bbox')
ap.add_argument('--near')
ap.add_argument('--roads-csv', type=Path)
ap.add_argument('--summary-json', type=Path)
ap.add_argument('--list-road-groups', action='store_true')
ap.add_argument('--road-profile', choices=['roads','roads_paths','roads_strict','transport','hydro','contours','cartographic','nonroads','all_lines'], default='roads_paths')
ap.add_argument('--min-length-m', type=float, default=0.0)
ap.add_argument('--filter-tag', action='append')
ap.add_argument('--filter-line-class', action='append')
ap.add_argument('--filter-group-key', action='append')
ap.add_argument('--max-rows', type=int, default=0)
args = ap.parse_args(argv)
if not args.roads_csv and not args.summary_json and not args.list_road_groups:
ap.error('provide at least one of --roads-csv, --summary-json, --list-road-groups')
mod = load_stage1_module(args.stage1_module)
near = parse_near(args.near)
tag_filters = parse_tag_filters(args.filter_tag)
rows, meta = extract_rows(mod, args.img, mapsets=args.mapset, bbox=args.bbox, profile=args.road_profile, near=near, min_length_m=args.min_length_m, tag_filters=tag_filters, line_classes=args.filter_line_class, group_keys=args.filter_group_key, max_rows=args.max_rows)
info(f'extracted {len(rows)} line features for profile={args.road_profile}')
if args.list_road_groups:
print_groups(rows)
if args.roads_csv:
write_roads_csv(rows, args.roads_csv)
info(f'wrote roads CSV: {args.roads_csv}')
if args.summary_json:
args.summary_json.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8')
info(f'wrote summary JSON: {args.summary_json}')
return 0
if __name__ == '__main__':
raise SystemExit(main())

View File

@@ -0,0 +1,144 @@
I reworked the road path instead of just cloning the landmark flow.
The old strategy was decent, but it had three weak spots:
* extraction was line-focused, but not **verification-focused**
* stage 2 could group and package, but it did not have a proper **review loop for ambiguous roads**
* there was no persistent **feature-level override layer** driven by an external visualizer
That mattered because the base IMG extractor already gives enough line semantics to separate roads, ferries, rail, water lines, contours, and raw leftovers through `LINE_TAGS` and the line-class logic, so the next bottleneck is not decoding lines, but **reviewing and correcting them efficiently**.
Also, the landmark stage-2 script already had the right pattern for external visualization hooks and interactive config editing, including `--run-cmd-before-eval` placeholders such as `{lat}`, `{lon}`, `{z}`, `{mapset}`, `{group_id}`, `{group_label}`, `{raw_type}`, and `{name}`. I used that as the model for the road verifier rather than pretending roads need a completely different interaction style.
New files:
* [garmin_img_roads_v3.py](sandbox:/mnt/data/garmin_img_roads_v3.py)
* [roads_stage2_configurable_v3.py](sandbox:/mnt/data/roads_stage2_configurable_v3.py)
Tested artifacts:
* [sample road CSV](sandbox:/mnt/data/sample_roads_v3_02235051.csv)
* [road extraction summary](sandbox:/mnt/data/sample_roads_v3_02235051_summary.json)
* [road stage-2 config](sandbox:/mnt/data/sample_roads_stage2_v3_config_02235051.json)
* [road stage-2 config summary](sandbox:/mnt/data/sample_roads_stage2_v3_config_02235051_summary.json)
* [resolved road CSV](sandbox:/mnt/data/sample_roads_stage2_v3_resolved_02235051.csv)
* [resolved road OSM](sandbox:/mnt/data/sample_roads_stage2_v3_resolved_02235051.osm)
* [resolved road summary](sandbox:/mnt/data/sample_roads_stage2_v3_resolved_02235051_summary.json)
* [unmatched road CSV](sandbox:/mnt/data/sample_roads_stage2_v3_unmatched_02235051.csv)
What changed
`garmin_img_roads_v3.py`
* still builds on the IMG extractor logic you uploaded and the current Garmin stage-1 parser
* adds:
* `preview_lon` / `preview_lat`
* `endpoint_hash`
* `bbox_json`
* `road_interest_score`
* filtering by:
* `--filter-line-class`
* `--filter-group-key`
* `--filter-tag`
* extra profile:
* `roads_strict`
`roads_stage2_configurable_v3.py`
* keeps the analyze/build config workflow from the earlier road packager
* adds a real `verify` subcommand
* stores **feature-level overrides** in the config under:
* `overrides.features.<feature_id>`
* verifier supports:
* forcing a feature into a specific group
* disabling a feature
* adding/removing tags on a single feature
* renaming a single feature
* editing the assigned group on the fly
* supports external preview command placeholders:
* `{lat}`
* `{lon}`
* `{z}`
* `{mapset}`
* `{group_id}`
* `{group_label}`
* `{raw_type}`
* `{line_class}`
* `{feature_id}`
* `{endpoint_hash}`
* `{name}`
So now the road config is not just “analyze once, build once.” It becomes:
1. extract roads
2. analyze into config
3. verify visually against the real IMG in GPXSee or another viewer
4. save corrections back into the same config
5. build final OSM
Commands
Stage 1 extract:
```bash
python garmin_img_roads_v3.py gmapsupp.img --mapset 02235051 --road-profile roads_paths --roads-csv roads.csv --summary-json roads_summary.json
```
Stage 2 analyze:
```bash
python roads_stage2_configurable_v3.py analyze roads.csv --config-out roads_config.json --summary-json roads_config_summary.json
```
Verifier with external viewer:
```bash
python roads_stage2_configurable_v3.py verify roads.csv --config roads_config.json --run-cmd-before-eval ".\GPXSee.exe D:\maps-bg\gmapsupp\gmapsupp.img \"geo:{lat},{lon};z={z}\"" --run-cmd-zoom 16
```
Build final OSM:
```bash
python roads_stage2_configurable_v3.py build roads.csv --config roads_config.json --resolved-csv roads_final.csv --osm roads_final.osm --summary-json roads_final_summary.json --unmatched-csv roads_unmatched.csv
```
Or review during build:
```bash
python roads_stage2_configurable_v3.py build roads.csv --config roads_config.json --interactive-verify --run-cmd-before-eval ".\GPXSee.exe D:\maps-bg\gmapsupp\gmapsupp.img \"geo:{lat},{lon};z={z}\"" --osm roads_final.osm
```
What I tested
* extractor v3 successfully ran on your uploaded `gmapsupp.img` for mapset `02235051`
* it produced `13,918` road/path line features
* analyze ran successfully
* build ran successfully and produced OSM/CSV/summary
* I did **not** execute the interactive verifier loop end-to-end here because that needs a live TTY and an external GUI program
Strategy evaluation, after iteration
What is now good:
* road extraction is no longer blind bulk export
* stage 2 has a proper verification layer
* config is now the single source of truth for:
* group rules
* group styling
* postprocess tags
* per-feature corrections
What is still weak:
* road grouping is still mostly based on semantic class and raw fallback, not on **network topology**
* the verifier is feature-by-feature, not yet **intersection-aware**
* no automatic “likely duplicate road geometry” reconciliation across mapsets yet
The next best iteration is a **connectivity-aware verifier**, where roads sharing endpoint hashes or intersecting near the same nodes can be reviewed as a bundle instead of individually. That would be the real “beast mode” next step for road quality.

View File

@@ -0,0 +1,969 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import gzip
import hashlib
import json
import re
import subprocess
import sys
import xml.etree.ElementTree as ET
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
OSMAND_NS = 'https://osmand.net'
GPX_NS = 'http://www.topografix.com/GPX/1/1'
ET.register_namespace('osmand', OSMAND_NS)
_XML_INVALID_RE = re.compile(r'[\x00-\x08\x0B\x0C\x0E-\x1F\uD800-\uDFFF\uFFFE\uFFFF]')
DEFAULT_STYLES = {
'highway:motorway': {'label': 'Motorways', 'color': '#E53935', 'width': 4, 'icon': 'road'},
'highway:primary': {'label': 'Primary roads', 'color': '#FB8C00', 'width': 4, 'icon': 'road'},
'highway:secondary': {'label': 'Secondary roads', 'color': '#FDD835', 'width': 3, 'icon': 'road'},
'highway:tertiary': {'label': 'Tertiary roads', 'color': '#C0CA33', 'width': 3, 'icon': 'road'},
'highway:residential': {'label': 'Residential roads', 'color': '#9CCC65', 'width': 2, 'icon': 'road'},
'highway:service': {'label': 'Service roads', 'color': '#AED581', 'width': 2, 'icon': 'road'},
'highway:unclassified': {'label': 'Unclassified roads', 'color': '#AED581', 'width': 2, 'icon': 'road'},
'highway:road': {'label': 'Generic roads', 'color': '#FFB74D', 'width': 2, 'icon': 'road'},
'highway:track': {'label': 'Tracks', 'color': '#8D6E63', 'width': 2, 'icon': 'road'},
'highway:path': {'label': 'Paths', 'color': '#43A047', 'width': 2, 'icon': 'trekking'},
'highway:cycleway': {'label': 'Cycleways', 'color': '#00ACC1', 'width': 2, 'icon': 'trekking'},
'route:ferry': {'label': 'Ferries', 'color': '#1E88E5', 'width': 3, 'icon': 'water_drop'},
'railway:rail': {'label': 'Railways', 'color': '#616161', 'width': 3, 'icon': 'railway'},
'waterway:river': {'label': 'Rivers', 'color': '#1E88E5', 'width': 2, 'icon': 'water_drop'},
'waterway:stream': {'label': 'Streams', 'color': '#42A5F5', 'width': 1, 'icon': 'water_drop'},
'contour:elevation': {'label': 'Contours', 'color': '#BCAAA4', 'width': 1, 'icon': 'marker'},
'raw': {'label': 'Raw lines', 'color': '#5E35B1', 'width': 2, 'icon': 'road'},
}
@dataclass
class Road:
mapset: str
feature_id: str
endpoint_hash: str
name: str
garmin_kind: str
garmin_type: str
garmin_subtype: str
line_class: str
road_group_key: str
semantic_tags: Dict[str, str]
gpxsee_classes: List[str]
coords: List[List[float]]
preview_lon: float = 0.0
preview_lat: float = 0.0
source_files: List[str] = field(default_factory=list)
def raw_type_key(self) -> str:
return f'{self.garmin_type}:{self.garmin_subtype}'
def clone(self) -> 'Road':
return Road(
mapset=self.mapset,
feature_id=self.feature_id,
endpoint_hash=self.endpoint_hash,
name=self.name,
garmin_kind=self.garmin_kind,
garmin_type=self.garmin_type,
garmin_subtype=self.garmin_subtype,
line_class=self.line_class,
road_group_key=self.road_group_key,
semantic_tags=dict(self.semantic_tags),
gpxsee_classes=list(self.gpxsee_classes),
coords=[list(p) for p in self.coords],
preview_lon=self.preview_lon,
preview_lat=self.preview_lat,
source_files=list(self.source_files),
)
def sanitize_text(value: Any) -> str:
if value is None:
return ''
return _XML_INVALID_RE.sub('', str(value).replace('\r\n', '\n').replace('\r', '\n')).replace('\x00', '').strip()
def open_text(path: Path, mode: str):
if str(path).lower().endswith('.gz'):
return gzip.open(path, mode + 't', encoding='utf-8', newline='')
return open(path, mode, encoding='utf-8', newline='')
def init_csv_field_limit() -> int:
"""Set csv field size limit as high as the host Python build allows.
CSV rows can contain very large coords_json payloads for long roads. The stdlib
default limit (often 128 KiB) is too small for this workload.
"""
limit = getattr(sys, 'maxsize', 2**31 - 1)
while True:
try:
csv.field_size_limit(limit)
return limit
except OverflowError:
limit //= 10
if limit <= 0:
csv.field_size_limit(2**31 - 1)
return 2**31 - 1
def safe_json_loads(s: str, default):
try:
return json.loads(s) if s else default
except Exception:
return default
def load_roads(paths: Iterable[Path]) -> List[Road]:
init_csv_field_limit()
items: List[Road] = []
for path in paths:
print(f"Parsing - {path}")
with open_text(path, 'r') as f:
try:
reader = csv.DictReader(f)
for row_idx, row in enumerate(reader, start=2):
try:
coords = safe_json_loads(row.get('coords_json') or '[]', [])
if not isinstance(coords, list) or len(coords) < 2:
continue
try:
preview_lon = float(row.get('preview_lon') or row.get('centroid_lon') or coords[len(coords)//2][0])
preview_lat = float(row.get('preview_lat') or row.get('centroid_lat') or coords[len(coords)//2][1])
except Exception:
preview_lon, preview_lat = coords[len(coords)//2]
sem = safe_json_loads(row.get('semantic_tags_json') or '{}', {})
gpx = safe_json_loads(row.get('gpxsee_classes_json') or '[]', [])
if not isinstance(sem, dict):
sem = {}
if not isinstance(gpx, list):
gpx = []
items.append(Road(
mapset=sanitize_text(row.get('mapset')),
feature_id=sanitize_text(row.get('feature_id')),
endpoint_hash=sanitize_text(row.get('endpoint_hash')),
name=sanitize_text(row.get('name')),
garmin_kind=sanitize_text(row.get('garmin_kind')),
garmin_type=sanitize_text(row.get('garmin_type')),
garmin_subtype=sanitize_text(row.get('garmin_subtype')),
line_class=sanitize_text(row.get('line_class')),
road_group_key=sanitize_text(row.get('road_group_key')),
semantic_tags={sanitize_text(k): sanitize_text(v) for k, v in sem.items() if sanitize_text(k) and sanitize_text(v)},
gpxsee_classes=[sanitize_text(x) for x in gpx if sanitize_text(x)],
coords=coords,
preview_lon=preview_lon,
preview_lat=preview_lat,
source_files=[path.name],
))
except Exception as e:
print(f'[warn] skipping bad row in {path} at CSV row {row_idx}: {e}', file=sys.stderr)
continue
except csv.Error as e:
raise RuntimeError(f'CSV parse failed for {path}: {e}. This usually means an oversized coords_json field or malformed quoting.') from e
return items
def dedupe_roads(items: List[Road]) -> List[Road]:
seen: Dict[str, Road] = {}
for it in items:
key = it.feature_id or it.endpoint_hash or f'{it.raw_type_key()}:{it.preview_lon:.7f}:{it.preview_lat:.7f}'
cur = seen.get(key)
if cur is None:
seen[key] = it.clone()
continue
if not cur.name and it.name:
cur.name = it.name
cur.source_files = sorted(set(cur.source_files + it.source_files))
return list(seen.values())
def infer_style(group_key: str) -> Dict[str, Any]:
if group_key in DEFAULT_STYLES:
return dict(DEFAULT_STYLES[group_key])
for prefix, style in DEFAULT_STYLES.items():
if group_key.startswith(prefix):
return dict(style)
return {'label': group_key.replace(':', ' ').replace('_', ' ').title(), 'color': '#5E35B1', 'width': 2, 'icon': 'road'}
def spread_examples(items: List[Road], n: int) -> List[Dict[str, Any]]:
ordered = sorted(items, key=lambda r: (r.mapset, r.feature_id or r.endpoint_hash, r.preview_lat, r.preview_lon))
if not ordered or n <= 0:
return []
if len(ordered) <= n:
chosen = ordered
elif n == 1:
chosen = [ordered[len(ordered)//2]]
else:
idxs = sorted(set(round(i * (len(ordered)-1) / (n-1)) for i in range(n)))
chosen = [ordered[int(i)] for i in idxs]
out = []
for it in chosen:
out.append({
'feature_id': it.feature_id,
'endpoint_hash': it.endpoint_hash,
'name': it.name,
'mapset': it.mapset,
'group_key': it.road_group_key,
'lat': round(it.preview_lat, 8),
'lon': round(it.preview_lon, 8),
'raw_type': it.raw_type_key(),
'line_class': it.line_class,
'semantic_tags': dict(it.semantic_tags),
})
return out
def parse_tag_exprs(values: List[str] | None) -> Dict[str, str]:
out: Dict[str, str] = {}
for expr in values or []:
if '=' not in expr:
raise ValueError(f'invalid tag expression {expr!r}; expected key=value')
k, v = expr.split('=', 1)
k = sanitize_text(k)
v = sanitize_text(v)
if k and v:
out[k] = v
return out
def merge_postprocess_dicts(*dicts: Dict[str, str]) -> Dict[str, str]:
out: Dict[str, str] = {}
for d in dicts:
for k, v in (d or {}).items():
k2 = sanitize_text(k)
v2 = sanitize_text(v)
if k2 and v2:
out[k2] = v2
return out
def merge_remove_tags(*lists: List[str]) -> List[str]:
out: List[str] = []
seen = set()
for lst in lists:
for item in lst or []:
v = sanitize_text(item)
if v and v not in seen:
seen.add(v)
out.append(v)
return out
def analyze_to_config(items: List[Road], *, example_count: int, default_postprocess_add: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
grouped = defaultdict(list)
for it in items:
grouped[it.road_group_key].append(it)
groups = []
for idx, (gkey, members) in enumerate(sorted(grouped.items(), key=lambda kv: (-len(kv[1]), kv[0]))):
style = infer_style(gkey)
group_id = re.sub(r'[^a-zA-Z0-9_]+', '_', gkey).strip('_').lower() or f'road_group_{idx+1}'
raw_types = Counter(m.raw_type_key() for m in members)
sem_pairs = Counter()
for m in members:
for k, v in m.semantic_tags.items():
if v:
sem_pairs[(k, v)] += 1
member_count = max(1, len(members))
semantic_required_any = []
semantic_preferred_any = []
for (k, v), c in sem_pairs.most_common(8):
pair = {k: v}
if c / member_count >= 0.80:
semantic_required_any.append(pair)
else:
semantic_preferred_any.append(pair)
groups.append({
'id': group_id,
'enabled': True,
'priority': 1000 - idx,
'match': {
'road_group_keys': [gkey],
'line_classes': sorted({m.line_class for m in members if m.line_class}),
'raw_types': [rt for rt, _ in raw_types.most_common(8)],
'semantic_required_any': semantic_required_any,
'semantic_preferred_any': semantic_preferred_any,
'has_name': None,
},
'display': style,
'naming': {
'mode': 'preserve_or_template_if_missing',
'base_name': style['label'],
'template': '{existing_name_or_base} - {hash}',
'preserve_existing_names': True,
},
'postprocess': {
'add_tags': dict(default_postprocess_add or {}),
'remove_tags': [],
},
'stats': {
'count': len(members),
'named_count': sum(1 for m in members if m.name),
'raw_type_counts': dict(raw_types.most_common(12)),
'semantic_tag_counts': {f'{k}={v}': c for (k, v), c in sem_pairs.most_common(12)},
},
'examples': spread_examples(members, example_count),
})
return {
'kind': 'roads_stage2',
'version': 3,
'defaults': {
'postprocess': {'add_tags': dict(default_postprocess_add or {}), 'remove_tags': []},
'verifier': {'run_cmd_before_eval': '', 'run_cmd_zoom': 16},
},
'groups': groups,
'overrides': {'features': {}},
}
def save_json(path: Path, data: Any):
with open_text(path, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_json(path: Path) -> Any:
with open_text(path, 'r') as f:
return json.load(f)
def migrate_config(config: dict[str, Any]) -> dict[str, Any]:
cfg = json.loads(json.dumps(config, ensure_ascii=False))
defaults = cfg.setdefault('defaults', {})
defaults.setdefault('postprocess', {'add_tags': {}, 'remove_tags': []})
defaults.setdefault('verifier', {'run_cmd_before_eval': '', 'run_cmd_zoom': 16})
overrides = cfg.setdefault('overrides', {})
overrides.setdefault('features', {})
for g in cfg.get('groups', []):
g.setdefault('enabled', True)
g.setdefault('priority', 0)
g.setdefault('match', {})
g.setdefault('display', {'label': g.get('id', 'Roads'), 'color': '#5E35B1', 'width': 2, 'icon': 'road'})
g.setdefault('naming', {'mode': 'preserve_or_template_if_missing', 'base_name': g.get('display', {}).get('label', 'Roads'), 'template': '{existing_name_or_base} - {hash}', 'preserve_existing_names': True})
pp = g.setdefault('postprocess', {})
if not isinstance(pp.get('add_tags'), dict):
pp['add_tags'] = {}
if not isinstance(pp.get('remove_tags'), list):
pp['remove_tags'] = []
return cfg
def match_group(it: Road, group: Dict[str, Any]) -> bool:
if not group.get('enabled', True):
return False
m = group.get('match', {})
if m.get('road_group_keys') and it.road_group_key not in set(m['road_group_keys']):
return False
if m.get('line_classes') and it.line_class not in set(m['line_classes']):
return False
raw_type = it.raw_type_key()
if m.get('raw_types') and raw_type not in set(m['raw_types']):
return False
req = m.get('semantic_required_any') or []
if req and not any(all(it.semantic_tags.get(k) == v for k, v in cond.items()) for cond in req):
return False
has_name = m.get('has_name')
if has_name is True and not it.name:
return False
if has_name is False and it.name:
return False
return True
def get_group_by_id(config: Dict[str, Any], group_id: str) -> Optional[Dict[str, Any]]:
for g in config.get('groups', []):
if g.get('id') == group_id:
return g
return None
def assign_group(it: Road, config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
overrides = ((config.get('overrides') or {}).get('features') or {}).get(it.feature_id) or {}
if overrides.get('disabled'):
return None
forced = overrides.get('force_group_id')
if forced:
return get_group_by_id(config, forced)
groups = sorted(config.get('groups', []), key=lambda g: (-int(g.get('priority', 0)), g.get('id', '')))
for group in groups:
if match_group(it, group):
return group
return None
def compute_hash(it: Road) -> str:
if it.feature_id:
return it.feature_id[-6:]
raw = json.dumps(it.coords, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
return hashlib.blake2b(raw, digest_size=6).hexdigest().upper()[:6]
def render_template_value(template: Any, *, base_name: str, existing_name: str, hash_value: str, group_id: str) -> str:
if template is None:
return ''
return sanitize_text(str(template).format(
base_name=base_name,
existing_name=existing_name,
existing_name_or_base=existing_name or base_name,
hash=hash_value,
group_id=group_id,
))
def render_name(it: Road, group: Dict[str, Any], config: Dict[str, Any]) -> str:
naming = group.get('naming', {})
override = (((config.get('overrides') or {}).get('features') or {}).get(it.feature_id) or {})
if override.get('name'):
return sanitize_text(override['name'])
mode = naming.get('mode', 'preserve_or_template_if_missing')
base_name = sanitize_text(naming.get('base_name') or group.get('display', {}).get('label') or 'Road')
existing = sanitize_text(it.name)
h = compute_hash(it)
template = naming.get('template') or '{existing_name_or_base}'
if mode == 'preserve':
return existing
if mode == 'template_always':
return render_template_value(template, base_name=base_name, existing_name=existing, hash_value=h, group_id=group.get('id', ''))
return existing or render_template_value(template, base_name=base_name, existing_name=existing, hash_value=h, group_id=group.get('id', ''))
def apply_postprocess(props: Dict[str, str], group: Dict[str, Any], config: Dict[str, Any], it: Road, *, base_name: str, existing_name: str, hash_value: str) -> Dict[str, str]:
out = dict(props)
defaults = ((config.get('defaults') or {}).get('postprocess') or {})
grp = group.get('postprocess') or {}
override = (((config.get('overrides') or {}).get('features') or {}).get(it.feature_id) or {})
add = merge_postprocess_dicts(defaults.get('add_tags') or {}, grp.get('add_tags') or {}, override.get('add_tags') or {})
for k, v in add.items():
out[sanitize_text(k)] = render_template_value(v, base_name=base_name, existing_name=existing_name, hash_value=hash_value, group_id=group.get('id', ''))
remove = merge_remove_tags(defaults.get('remove_tags') or [], grp.get('remove_tags') or [], override.get('remove_tags') or [])
for k in remove:
out.pop(k, None)
return {sanitize_text(k): sanitize_text(v) for k, v in out.items() if sanitize_text(k) and sanitize_text(v)}
def build_records(items: List[Road], config: Dict[str, Any]) -> tuple[List[Dict[str, Any]], List[Road], List[Road]]:
records: List[Dict[str, Any]] = []
unmatched: List[Road] = []
disabled: List[Road] = []
for it in items:
override = (((config.get('overrides') or {}).get('features') or {}).get(it.feature_id) or {})
if override.get('disabled'):
disabled.append(it)
continue
group = assign_group(it, config)
if not group:
unmatched.append(it)
continue
h = compute_hash(it)
name = render_name(it, group, config)
props = dict(it.semantic_tags)
if name:
props['name'] = name
props['garmin:kind'] = it.garmin_kind
if it.garmin_type:
props['garmin:type'] = it.garmin_type
if it.garmin_subtype:
props['garmin:subtype'] = it.garmin_subtype
props['source:road_group'] = sanitize_text(group.get('id') or '')
props['source:road_group_key'] = it.road_group_key
props['source:stable_hash'] = h
props['source:feature_id'] = it.feature_id
props['source:mapset'] = it.mapset
display = group.get('display', {})
if display.get('color'):
props['osmand:color'] = sanitize_text(display['color'])
if display.get('icon'):
props['osmand:icon'] = sanitize_text(display['icon'])
props = apply_postprocess(props, group, config, it, base_name=sanitize_text(display.get('label') or 'Road'), existing_name=sanitize_text(it.name), hash_value=h)
records.append({
'feature_id': it.feature_id,
'group_id': sanitize_text(group.get('id') or ''),
'group_label': sanitize_text(display.get('label') or ''),
'name': name,
'icon': sanitize_text(display.get('icon') or ''),
'color': sanitize_text(display.get('color') or ''),
'width': int(display.get('width', 2) or 2),
'mapset': it.mapset,
'preview_lon': it.preview_lon,
'preview_lat': it.preview_lat,
'coords': [list(p) for p in it.coords],
'props': props,
'source_files': list(it.source_files),
})
return records, unmatched, disabled
def write_resolved_csv(records: List[Dict[str, Any]], path: Path):
fields = ['group_id','group_label','feature_id','mapset','name','icon','color','width','preview_lon','preview_lat','source_files_json','props_json','coords_json']
with open_text(path, 'w') as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for r in records:
w.writerow({
'group_id': r['group_id'],
'group_label': r['group_label'],
'feature_id': r['feature_id'],
'mapset': r['mapset'],
'name': sanitize_text(r['name']),
'icon': r['icon'],
'color': r['color'],
'width': r['width'],
'preview_lon': f"{r['preview_lon']:.8f}",
'preview_lat': f"{r['preview_lat']:.8f}",
'source_files_json': json.dumps(r['source_files'], ensure_ascii=False),
'props_json': json.dumps(r['props'], ensure_ascii=False, sort_keys=True),
'coords_json': json.dumps(r['coords'], ensure_ascii=False, separators=(',', ':')),
})
def write_osm(records: List[Dict[str, Any]], path: Path):
osm = ET.Element('osm', {'version': '0.6', 'generator': 'roads_stage2_configurable_v3.py'})
node_id = -1
way_id = -1
for r in records:
node_refs = []
for lon, lat in r['coords']:
node = ET.SubElement(osm, 'node', {'id': str(node_id), 'lat': f'{lat:.8f}', 'lon': f'{lon:.8f}'})
node_refs.append(node_id)
node_id -= 1
way = ET.SubElement(osm, 'way', {'id': str(way_id)})
way_id -= 1
for nid in node_refs:
ET.SubElement(way, 'nd', {'ref': str(nid)})
for k, v in sorted(r['props'].items()):
k2 = sanitize_text(k); v2 = sanitize_text(v)
if k2 and v2:
ET.SubElement(way, 'tag', {'k': k2, 'v': v2})
tree = ET.ElementTree(osm)
try:
ET.indent(tree, space=' ')
except Exception:
pass
if str(path).lower().endswith('.gz'):
with gzip.open(path, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
else:
with path.open('wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
def write_gpx(records: List[Dict[str, Any]], path: Path):
ET.register_namespace('', GPX_NS)
gpx = ET.Element('gpx', {'version': '1.1', 'creator': 'roads_stage2_configurable_v3.py', 'xmlns': GPX_NS})
ext_root = ET.SubElement(gpx, 'extensions')
groups_el = ET.SubElement(ext_root, f'{{{OSMAND_NS}}}points_groups')
seen = set()
for r in records:
if r['group_id'] in seen:
continue
seen.add(r['group_id'])
ET.SubElement(groups_el, f'{{{OSMAND_NS}}}group', {'name': r['group_label'], 'color': r['color'], 'icon': r['icon'], 'background': 'circle'})
for r in records:
trk = ET.SubElement(gpx, 'trk')
ET.SubElement(trk, 'name').text = sanitize_text(r['name'] or r['feature_id'])
ET.SubElement(trk, 'type').text = sanitize_text(r['group_label'])
ext = ET.SubElement(trk, 'extensions')
ET.SubElement(ext, f'{{{OSMAND_NS}}}icon').text = sanitize_text(r['icon'])
ET.SubElement(ext, f'{{{OSMAND_NS}}}color').text = sanitize_text(r['color'])
seg = ET.SubElement(trk, 'trkseg')
for lon, lat in r['coords']:
ET.SubElement(seg, 'trkpt', {'lat': f'{lat:.8f}', 'lon': f'{lon:.8f}'})
tree = ET.ElementTree(gpx)
try:
ET.indent(tree, space=' ')
except Exception:
pass
if str(path).lower().endswith('.gz'):
with gzip.open(path, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
else:
with path.open('wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
def write_unmatched_csv(items: List[Road], path: Path):
fields = ['mapset','feature_id','endpoint_hash','name','garmin_kind','garmin_type','garmin_subtype','line_class','road_group_key','preview_lon','preview_lat','semantic_tags_json','gpxsee_classes_json','coords_json']
with open_text(path, 'w') as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
for it in items:
w.writerow({
'mapset': it.mapset,
'feature_id': it.feature_id,
'endpoint_hash': it.endpoint_hash,
'name': sanitize_text(it.name),
'garmin_kind': it.garmin_kind,
'garmin_type': it.garmin_type,
'garmin_subtype': it.garmin_subtype,
'line_class': it.line_class,
'road_group_key': it.road_group_key,
'preview_lon': f'{it.preview_lon:.8f}',
'preview_lat': f'{it.preview_lat:.8f}',
'semantic_tags_json': json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True),
'gpxsee_classes_json': json.dumps(it.gpxsee_classes, ensure_ascii=False),
'coords_json': json.dumps(it.coords, ensure_ascii=False, separators=(',', ':')),
})
def run_eval_command(template: str, example: dict[str, Any], *, group: dict[str, Any], z: int = 16) -> None:
if not template:
return
fmt = {
'lon': example.get('lon', ''), 'lat': example.get('lat', ''), 'z': z,
'name': sanitize_text(example.get('name', '')),
'mapset': sanitize_text(example.get('mapset', '')),
'group_id': sanitize_text(group.get('id', '')),
'group_label': sanitize_text((group.get('display') or {}).get('label', '')),
'raw_type': sanitize_text(example.get('raw_type', '')),
'line_class': sanitize_text(example.get('line_class', '')),
'feature_id': sanitize_text(example.get('feature_id', '')),
'endpoint_hash': sanitize_text(example.get('endpoint_hash', '')),
}
cmd = str(template).format(**fmt)
try:
subprocess.Popen(cmd, shell=True)
except Exception as e:
print(f'[warn] failed to run command: {e}', file=sys.stderr)
def feature_examples(it: Road, assigned_group: Optional[Dict[str, Any]] = None) -> dict[str, Any]:
return {
'feature_id': it.feature_id,
'endpoint_hash': it.endpoint_hash,
'name': it.name,
'mapset': it.mapset,
'group_key': it.road_group_key,
'lat': round(it.preview_lat, 8),
'lon': round(it.preview_lon, 8),
'raw_type': it.raw_type_key(),
'line_class': it.line_class,
'semantic_tags': dict(it.semantic_tags),
'assigned_group_id': assigned_group.get('id') if assigned_group else '',
'assigned_group_label': (assigned_group.get('display') or {}).get('label', '') if assigned_group else '',
}
def interactive_edit_group(g: Dict[str, Any]) -> None:
display = g.setdefault('display', {})
naming = g.setdefault('naming', {})
pp = g.setdefault('postprocess', {})
pp.setdefault('add_tags', {})
pp.setdefault('remove_tags', [])
new_label = input(f"label [{display.get('label','')}]: ").strip()
if new_label:
display['label'] = sanitize_text(new_label)
new_color = input(f"color [{display.get('color','')}]: ").strip()
if new_color:
display['color'] = sanitize_text(new_color)
new_width = input(f"width [{display.get('width',2)}]: ").strip()
if new_width:
try:
display['width'] = int(new_width)
except Exception:
pass
new_icon = input(f"icon [{display.get('icon','')}]: ").strip()
if new_icon:
display['icon'] = sanitize_text(new_icon)
new_base = input(f"base_name [{naming.get('base_name','')}]: ").strip()
if new_base:
naming['base_name'] = sanitize_text(new_base)
new_tpl = input(f"template [{naming.get('template','')}]: ").strip()
if new_tpl:
naming['template'] = sanitize_text(new_tpl)
print('add_tags current =', pp.get('add_tags', {}), file=sys.stderr)
while True:
expr = input('add/replace tag key=value (blank to stop): ').strip()
if not expr:
break
if '=' not in expr:
print('expected key=value', file=sys.stderr)
continue
k, v = expr.split('=', 1)
pp['add_tags'][sanitize_text(k)] = sanitize_text(v)
print('remove_tags current =', pp.get('remove_tags', []), file=sys.stderr)
while True:
expr = input('append remove-tag key (blank to stop): ').strip()
if not expr:
break
pp['remove_tags'].append(sanitize_text(expr))
def interactive_edit_config(cfg: Dict[str, Any], *, save_path: Optional[Path] = None, run_cmd_before_eval: Optional[str] = None, run_cmd_zoom: int = 16, auto_open_example: bool = True) -> Dict[str, Any]:
if not sys.stdin.isatty():
print('[warn] interactive config editing requested, but stdin is not interactive; keeping config unchanged', file=sys.stderr)
return cfg
groups = cfg.get('groups') or []
for g in groups:
examples = list(g.get('examples') or [])
idx = 0
while True:
ex = examples[idx] if examples else {}
print('\n=== ROAD GROUP ===', file=sys.stderr)
print(f"id: {g.get('id')} label: {(g.get('display') or {}).get('label','')} count: {(g.get('stats') or {}).get('count','?')}", file=sys.stderr)
if ex:
print(f"example[{idx+1}/{len(examples)}]: name={ex.get('name','')} mapset={ex.get('mapset','')} at {ex.get('lat')},{ex.get('lon')} raw={ex.get('raw_type','')} class={ex.get('line_class','')}", file=sys.stderr)
if run_cmd_before_eval and ex and auto_open_example:
run_eval_command(run_cmd_before_eval, ex, group=g, z=run_cmd_zoom)
reply = input('Preview command [Enter/e edit, n next, p prev, o open, s skip, ! stop]: ').strip().lower()
auto_open_example = False
if reply in ('', 'e'):
break
if reply == 'n' and examples:
idx = (idx + 1) % len(examples)
continue
if reply == 'p' and examples:
idx = (idx - 1) % len(examples)
continue
if reply == 'o' and run_cmd_before_eval and ex:
run_eval_command(run_cmd_before_eval, ex, group=g, z=run_cmd_zoom)
continue
if reply == 's':
break
if reply == '!':
if save_path:
save_json(save_path, cfg)
return cfg
if reply == 's':
continue
interactive_edit_group(g)
if save_path:
save_json(save_path, cfg)
return cfg
def interactive_verify(cfg: Dict[str, Any], items: List[Road], *, scope: str, group_id: Optional[str], save_path: Optional[Path], run_cmd_before_eval: Optional[str], run_cmd_zoom: int, auto_open_example: bool = True) -> Dict[str, Any]:
if not sys.stdin.isatty():
print('[warn] interactive verifier requested, but stdin is not interactive; keeping config unchanged', file=sys.stderr)
return cfg
cfg = migrate_config(cfg)
records, unmatched, disabled = build_records(items, cfg)
assigned_map = {r['feature_id']: r['group_id'] for r in records}
record_items = []
for it in items:
assigned_group = get_group_by_id(cfg, assigned_map.get(it.feature_id, '')) if assigned_map.get(it.feature_id) else None
if scope == 'matched' and not assigned_group:
continue
if scope == 'unmatched' and assigned_group:
continue
if scope == 'group' and group_id and (not assigned_group or assigned_group.get('id') != group_id):
continue
record_items.append((it, assigned_group))
groups_by_id = {g.get('id'): g for g in cfg.get('groups', [])}
idx = 0
while idx < len(record_items):
it, assigned_group = record_items[idx]
override = (((cfg.get('overrides') or {}).get('features') or {}).get(it.feature_id) or {})
ex = feature_examples(it, assigned_group)
print('\n=== ROAD VERIFY ===', file=sys.stderr)
print(f"[{idx+1}/{len(record_items)}] feature_id={it.feature_id} endpoint_hash={it.endpoint_hash}", file=sys.stderr)
print(f" mapset={it.mapset} at {it.preview_lat:.8f},{it.preview_lon:.8f}", file=sys.stderr)
print(f" raw={it.raw_type_key()} class={it.line_class} name={it.name or '<unnamed>'}", file=sys.stderr)
print(f" road_group_key={it.road_group_key}", file=sys.stderr)
print(f" assigned_group={(assigned_group.get('id') if assigned_group else '<unmatched>')} label={((assigned_group.get('display') or {}).get('label') if assigned_group else '')}", file=sys.stderr)
if it.semantic_tags:
print(f" semantic={json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True)}", file=sys.stderr)
if override:
print(f" override={json.dumps(override, ensure_ascii=False, sort_keys=True)}", file=sys.stderr)
if run_cmd_before_eval and auto_open_example:
run_eval_command(run_cmd_before_eval, ex, group=assigned_group or {'id':'','display':{}}, z=run_cmd_zoom)
auto_open_example = False
cmd = input('Command [Enter/n next, p prev, o open, g edit-group, f force-group, d disable, u clear-override, t add-tag, r remove-tag, m rename, ! stop]: ').strip().lower()
if cmd in ('', 'n'):
idx += 1
continue
if cmd == 'p':
idx = max(0, idx - 1)
continue
if cmd == 'o' and run_cmd_before_eval:
run_eval_command(run_cmd_before_eval, ex, group=assigned_group or {'id':'','display':{}}, z=run_cmd_zoom)
continue
features_over = cfg.setdefault('overrides', {}).setdefault('features', {})
ov = features_over.setdefault(it.feature_id, {})
if cmd == 'g':
target = assigned_group
if not target:
gid = input('group id to edit: ').strip()
target = groups_by_id.get(gid)
if target:
interactive_edit_group(target)
else:
print('[warn] group not found', file=sys.stderr)
continue
if cmd == 'f':
gid = input('force group id: ').strip()
if gid in groups_by_id:
ov['force_group_id'] = gid
ov.pop('disabled', None)
else:
print('[warn] group id not found', file=sys.stderr)
idx += 1
continue
if cmd == 'd':
ov['disabled'] = True
ov.pop('force_group_id', None)
idx += 1
continue
if cmd == 'u':
features_over.pop(it.feature_id, None)
idx += 1
continue
if cmd == 't':
add = ov.setdefault('add_tags', {})
while True:
expr = input('feature add-tag key=value (blank to stop): ').strip()
if not expr:
break
if '=' not in expr:
print('expected key=value', file=sys.stderr)
continue
k, v = expr.split('=', 1)
add[sanitize_text(k)] = sanitize_text(v)
idx += 1
continue
if cmd == 'r':
rem = ov.setdefault('remove_tags', [])
while True:
expr = input('feature remove-tag key (blank to stop): ').strip()
if not expr:
break
rem.append(sanitize_text(expr))
idx += 1
continue
if cmd == 'm':
val = input(f'rename [{ov.get("name", it.name)}]: ').strip()
if val:
ov['name'] = sanitize_text(val)
idx += 1
continue
if cmd == '!':
if save_path:
save_json(save_path, cfg)
return cfg
print('[warn] unknown command', file=sys.stderr)
if save_path:
save_json(save_path, cfg)
return cfg
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description='Road packager + verifier: analyze road CSVs into config, interactively edit or verify with external visualizer, then build OSM/GPX/CSV.')
sub = ap.add_subparsers(dest='cmd', required=True)
a = sub.add_parser('analyze')
a.add_argument('csv', nargs='+', type=Path)
a.add_argument('--config-out', required=True, type=Path)
a.add_argument('--summary-json', type=Path)
a.add_argument('--group-examples', type=int, default=4)
a.add_argument('--default-postprocess-tag', action='append', default=[])
e = sub.add_parser('edit-config')
e.add_argument('--config', required=True, type=Path)
e.add_argument('--output', type=Path)
e.add_argument('--run-cmd-before-eval')
e.add_argument('--run-cmd-zoom', type=int, default=16)
e.add_argument('--default-postprocess-tag', action='append', default=[])
v = sub.add_parser('verify')
v.add_argument('csv', nargs='+', type=Path)
v.add_argument('--config', required=True, type=Path)
v.add_argument('--output', type=Path, help='Write reviewed config here; defaults to in-place update')
v.add_argument('--scope', choices=['all', 'matched', 'unmatched', 'group'], default='unmatched')
v.add_argument('--group-id', help='Required when --scope group')
v.add_argument('--run-cmd-before-eval')
v.add_argument('--run-cmd-zoom', type=int, default=16)
b = sub.add_parser('build')
b.add_argument('csv', nargs='+', type=Path)
b.add_argument('--config', required=True, type=Path)
b.add_argument('--resolved-csv', type=Path)
b.add_argument('--osm', type=Path)
b.add_argument('--gpx', type=Path)
b.add_argument('--summary-json', type=Path)
b.add_argument('--unmatched-csv', type=Path)
b.add_argument('--interactive-edit-config', action='store_true')
b.add_argument('--interactive-verify', action='store_true')
b.add_argument('--verify-scope', choices=['all', 'matched', 'unmatched', 'group'], default='unmatched')
b.add_argument('--verify-group-id')
b.add_argument('--edited-config-out', type=Path)
b.add_argument('--run-cmd-before-eval')
b.add_argument('--run-cmd-zoom', type=int, default=16)
args = ap.parse_args(argv)
if args.cmd == 'edit-config':
cfg = migrate_config(load_json(args.config))
if args.default_postprocess_tag:
defaults = cfg.setdefault('defaults', {})
pp = defaults.setdefault('postprocess', {'add_tags': {}, 'remove_tags': []})
pp['add_tags'] = merge_postprocess_dicts(pp.get('add_tags') or {}, parse_tag_exprs(args.default_postprocess_tag))
if args.run_cmd_before_eval:
cfg.setdefault('defaults', {}).setdefault('verifier', {})['run_cmd_before_eval'] = args.run_cmd_before_eval
cfg.setdefault('defaults', {}).setdefault('verifier', {})['run_cmd_zoom'] = args.run_cmd_zoom
output = args.output or args.config
interactive_edit_config(cfg, save_path=output, run_cmd_before_eval=args.run_cmd_before_eval, run_cmd_zoom=args.run_cmd_zoom)
print(f'[info] wrote config: {output}')
return 0
items = dedupe_roads(load_roads(args.csv if args.cmd != 'edit-config' else [])) if args.cmd != 'edit-config' else []
if args.cmd == 'analyze':
cfg = analyze_to_config(items, example_count=args.group_examples, default_postprocess_add=parse_tag_exprs(args.default_postprocess_tag))
save_json(args.config_out, cfg)
if args.summary_json:
save_json(args.summary_json, {
'item_count': len(items),
'group_count': len(cfg['groups']),
'top_groups': [{'id': g['id'], 'count': g['stats']['count'], 'label': g['display']['label']} for g in cfg['groups'][:50]],
})
print(f'[info] analyzed {len(items)} roads')
print(f'[info] wrote config: {args.config_out}')
if args.summary_json:
print(f'[info] wrote summary: {args.summary_json}')
return 0
cfg = migrate_config(load_json(args.config))
verifier_defaults = (cfg.get('defaults') or {}).get('verifier') or {}
run_cmd = args.run_cmd_before_eval or verifier_defaults.get('run_cmd_before_eval') or None
run_zoom = args.run_cmd_zoom or verifier_defaults.get('run_cmd_zoom', 16)
if args.cmd == 'verify':
output = args.output or args.config
cfg = interactive_verify(cfg, items, scope=args.scope, group_id=args.group_id, save_path=output, run_cmd_before_eval=run_cmd, run_cmd_zoom=run_zoom)
print(f'[info] wrote config: {output}')
return 0
if args.interactive_edit_config:
cfg = interactive_edit_config(cfg, save_path=args.edited_config_out, run_cmd_before_eval=run_cmd, run_cmd_zoom=run_zoom)
if args.interactive_verify:
cfg = interactive_verify(cfg, items, scope=args.verify_scope, group_id=args.verify_group_id, save_path=args.edited_config_out or args.config, run_cmd_before_eval=run_cmd, run_cmd_zoom=run_zoom)
records, unmatched, disabled = build_records(items, cfg)
if not any([args.resolved_csv, args.osm, args.gpx, args.summary_json, args.unmatched_csv]):
print(f'[info] built {len(records)} road records')
print(f'[info] unmatched {len(unmatched)} roads; disabled {len(disabled)} roads')
return 0
if args.resolved_csv:
write_resolved_csv(records, args.resolved_csv)
print(f'[info] wrote resolved CSV: {args.resolved_csv}')
if args.osm:
write_osm(records, args.osm)
print(f'[info] wrote OSM: {args.osm}')
if args.gpx:
write_gpx(records, args.gpx)
print(f'[info] wrote GPX: {args.gpx}')
if args.unmatched_csv:
write_unmatched_csv(unmatched, args.unmatched_csv)
print(f'[info] wrote unmatched CSV: {args.unmatched_csv}')
if args.summary_json:
c = Counter(r['group_id'] for r in records)
save_json(args.summary_json, {'record_count': len(records), 'unmatched_count': len(unmatched), 'disabled_count': len(disabled), 'groups': dict(c.most_common())})
print(f'[info] wrote summary: {args.summary_json}')
return 0
if __name__ == '__main__':
raise SystemExit(main())

View File

@@ -0,0 +1,202 @@
#take from previous output
temp = """00234008 21.467285,43.637695,22.038574,44.033203
00234009 21.972656,43.637695,22.543945,44.033203
00234010 22.456055,43.637695,23.027344,44.033203
00234011 22.961426,43.637695,23.532715,44.033203
00234012 23.466797,43.637695,24.038086,44.033203
00234020 21.467285,43.308105,22.038574,43.703613
00234021 21.972656,43.308105,22.543945,43.703613
00234022 22.456055,43.308105,23.027344,43.703613
00234023 22.961426,43.308105,23.532715,43.703613
00234024 23.466797,43.308105,24.038086,43.703613
00234032 21.467285,42.978516,22.038574,43.374023
00234033 21.972656,42.956543,22.543945,43.395996
00234034 22.456055,42.978516,23.027344,43.374023
00234035 22.961426,42.978516,23.532715,43.374023
00234036 23.466797,42.978516,24.038086,43.374023
00234044 21.467285,42.604980,22.038574,43.044434
00234045 21.972656,42.626953,22.543945,43.022461
00234046 22.456055,42.626953,23.027344,43.022461
00234047 22.961426,42.626953,23.532715,43.022461
00234048 23.466797,42.604980,24.038086,43.044434
00234056 21.467285,42.297363,22.038574,42.692871
00234057 21.972656,42.297363,22.543945,42.692871
00234058 22.456055,42.297363,23.027344,42.692871
00234059 22.961426,42.297363,23.532715,42.692871
00234060 23.466797,42.297363,24.038086,42.692871
00234068 21.467285,41.967773,22.038574,42.363281
00234069 21.972656,41.967773,22.543945,42.363281
00234070 22.456055,41.967773,23.027344,42.363281
00234071 22.961426,41.967773,23.532715,42.363281
00234072 23.466797,41.967773,24.038086,42.363281
00234080 21.467285,41.638184,22.038574,42.033691
00234081 21.972656,41.638184,22.543945,42.033691
00234082 22.456055,41.638184,23.027344,42.033691
00234083 22.961426,41.638184,23.532715,42.033691
00234084 23.466797,41.638184,24.038086,42.033691
00234092 21.467285,41.308594,22.038574,41.704102
00234093 21.972656,41.308594,22.543945,41.704102
00234094 22.456055,41.308594,23.027344,41.704102
00234095 22.961426,41.308594,23.532715,41.704102
00234096 23.466797,41.308594,24.038086,41.704102
00234104 21.467285,40.979004,22.038574,41.374512
00234105 21.972656,40.957031,22.543945,41.396484
00234106 22.456055,40.979004,23.027344,41.374512
00234107 22.961426,40.979004,23.532715,41.374512
00234108 23.466797,40.979004,24.038086,41.374512
00234116 21.467285,40.605469,22.038574,41.044922
00234117 21.972656,40.627441,22.543945,41.022949
00234118 22.456055,40.605469,23.027344,41.044922
00234119 22.961426,40.627441,23.532715,41.066895
00234120 23.466797,40.627441,24.038086,41.022949
00235001 23.972168,43.637695,24.543457,44.033203
00235002 24.455566,43.637695,25.026855,44.033203
00235003 24.960938,43.637695,25.532227,44.033203
00235004 25.466309,43.637695,26.037598,44.033203
00235005 25.971680,43.637695,26.542969,44.033203
00235006 26.455078,43.637695,27.026367,44.033203
00235007 26.960449,43.637695,27.531738,44.033203
00235008 27.465820,43.637695,28.037109,44.033203
00235009 27.971191,43.637695,28.542480,44.033203
00235010 28.454590,43.637695,29.025879,44.033203
00235013 23.972168,43.308105,24.543457,43.703613
00235014 24.455566,43.308105,25.026855,43.703613
00235015 24.960938,43.308105,25.532227,43.703613
00235016 25.466309,43.308105,26.037598,43.703613
00235017 25.971680,43.308105,26.542969,43.703613
00235018 26.455078,43.308105,27.026367,43.703613
00235019 26.960449,43.308105,27.531738,43.703613
00235020 27.465820,43.308105,28.037109,43.703613
00235021 27.971191,43.308105,28.542480,43.703613
00235022 28.454590,43.308105,29.025879,43.703613
00235025 23.972168,42.956543,24.543457,43.395996
00235026 24.455566,42.978516,25.026855,43.374023
00235027 24.960938,42.978516,25.532227,43.374023
00235028 25.466309,42.956543,26.037598,43.395996
00235029 25.971680,42.978516,26.542969,43.374023
00235030 26.455078,42.978516,27.026367,43.374023
00235031 26.960449,42.978516,27.531738,43.374023
00235032 27.465820,42.978516,28.037109,43.374023
00235033 27.971191,42.978516,28.542480,43.374023
00235034 28.454590,42.978516,29.025879,43.374023
00235037 23.972168,42.626953,24.543457,43.022461
00235038 24.455566,42.626953,25.026855,43.066406
00235039 24.960938,42.604980,25.532227,43.044434
00235040 25.466309,42.604980,26.037598,43.044434
00235041 25.971680,42.626953,26.542969,43.022461
00235042 26.455078,42.626953,27.026367,43.022461
00235043 26.960449,42.626953,27.531738,43.022461
00235044 27.465820,42.604980,28.037109,43.044434
00235045 27.971191,42.626953,28.542480,43.022461
00235046 28.454590,42.626953,29.025879,43.022461
00235049 23.972168,42.297363,24.543457,42.692871
00235050 24.455566,42.297363,25.026855,42.692871
00235051 24.960938,42.297363,25.532227,42.692871
00235052 25.466309,42.297363,26.037598,42.692871
00235053 25.971680,42.297363,26.542969,42.692871
00235054 26.455078,42.297363,27.026367,42.692871
00235055 26.960449,42.297363,27.531738,42.692871
00235056 27.465820,42.297363,28.037109,42.692871
00235057 27.971191,42.297363,28.542480,42.692871
00235058 28.454590,42.297363,29.025879,42.692871
00235061 23.972168,41.967773,24.543457,42.363281
00235062 24.455566,41.967773,25.026855,42.363281
00235063 24.960938,41.967773,25.532227,42.363281
00235064 25.466309,41.967773,26.037598,42.363281
00235065 25.971680,41.967773,26.542969,42.363281
00235066 26.455078,41.967773,27.026367,42.363281
00235067 26.960449,41.967773,27.531738,42.363281
00235068 27.465820,41.967773,28.037109,42.363281
00235069 27.971191,41.967773,28.542480,42.363281
00235070 28.454590,41.967773,29.025879,42.363281
00235073 23.928223,41.638184,24.543457,42.033691
00235074 24.455566,41.638184,25.026855,42.033691
00235075 24.960938,41.638184,25.532227,42.033691
00235076 25.466309,41.638184,26.037598,42.033691
00235077 25.971680,41.638184,26.542969,42.033691
00235078 26.455078,41.638184,27.026367,42.033691
00235079 26.960449,41.638184,27.531738,42.033691
00235080 27.465820,41.638184,28.037109,42.033691
00235081 27.971191,41.638184,28.542480,42.033691
00235082 28.454590,41.638184,29.025879,42.033691
00235085 23.972168,41.308594,24.543457,41.704102
00235086 24.455566,41.308594,25.026855,41.704102
00235087 24.960938,41.308594,25.532227,41.704102
00235088 25.466309,41.308594,26.037598,41.704102
00235089 25.971680,41.308594,26.542969,41.704102
00235090 26.455078,41.308594,27.026367,41.704102
00235091 26.960449,41.308594,27.531738,41.704102
00235092 27.465820,41.308594,28.037109,41.704102
00235093 27.971191,41.308594,28.542480,41.704102
00235094 28.454590,41.308594,29.025879,41.704102
00235097 23.972168,40.979004,24.543457,41.374512
00235098 24.455566,40.979004,25.026855,41.374512
00235099 24.960938,40.979004,25.532227,41.374512
00235100 25.466309,40.979004,26.037598,41.374512
00235101 25.971680,40.957031,26.542969,41.396484
00235102 26.455078,40.979004,27.026367,41.374512
00235103 26.960449,40.979004,27.531738,41.374512
00235104 27.465820,40.979004,28.037109,41.374512
00235105 27.971191,40.957031,28.542480,41.396484
00235106 28.454590,40.979004,29.025879,41.374512
00235109 23.972168,40.605469,24.543457,41.044922
00235110 24.455566,40.627441,25.026855,41.066895
00235111 24.960938,40.627441,25.532227,41.066895
00235112 25.466309,40.605469,26.037598,41.044922
00235113 25.971680,40.627441,26.542969,41.022949
00235114 26.455078,40.627441,27.026367,41.022949
00235115 26.960449,40.627441,27.531738,41.022949
00235116 27.465820,40.627441,28.037109,41.022949
00235117 27.971191,40.605469,28.542480,41.044922
00235118 28.454590,40.627441,29.025879,41.022949
00334140 21.467285,43.967285,22.038574,44.362793
00334141 21.972656,43.967285,22.543945,44.362793
00334142 22.456055,43.967285,23.027344,44.362793
00334143 22.961426,43.967285,23.532715,44.362793
00334144 23.466797,43.967285,24.038086,44.362793
00335133 23.972168,43.967285,24.543457,44.362793
00335134 24.455566,43.967285,25.026855,44.362793
00335135 24.960938,43.967285,25.532227,44.362793
00335136 25.466309,43.967285,26.037598,44.362793
00335137 25.971680,43.967285,26.542969,44.362793
00335138 26.455078,43.967285,27.026367,44.362793
00335139 26.960449,43.967285,27.531738,44.362793
00335140 27.465820,43.967285,28.037109,44.362793
00335141 27.971191,43.967285,28.542480,44.362793
00335142 28.454590,43.967285,29.025879,44.362793
"""
map_segments = []
for line in temp.split("\n"):
map_segments.append(line.split(" ")[0])
import os,sys,garmin_img_roads
print(len(map_segments))
filepath = r"D:\\maps-bg\\gmapsupp\\gmapsupp.img"
if input(f'Running against {len(map_segments)} and {filepath} [!=y break]')!='y':
exit()
counter = 0
# map_segments = map_segments[50:52]
for msegment in map_segments:
# sys.argv = ['garmin_img_to_osmand_v4.py', filepath, '--mapset',msegment,"--landmarks-csv",f"export_landmarks/landmarks-{msegment}.csv"]
# garmin_img_to_osmand_v4.main()
# print("Parsed ",msegment)
# sys.argv = ['garmin_img_to_osmand.py', filepath, '--mapset',msegment,'--landmarks-csv', f"..\parsed-landmarks\csv-from-stage-1\export-v6-{msegment}.csv", '--include-unnamed','--point-profile','all']
# garmin_img_to_osmand.main()
#python stage-1-read-garmin-img/garmin_img_to_osmand.py 'd:/maps-bg/gmapsupp/gmapsupp.img' --point-profile all --include-unnamed --point-group-by raw_type
sys.argv = ['garmin_img_to_osmand.py', filepath,
'--stage1-module','../stage-1-read-garmin-img/garmin_img_to_osmand.py',
'--mapset', msegment,
'--roads-csv', f"../parsed-landmarks/roads-from-stage-3/export-roads-v3-t1-{msegment}.csv",
'--road-profile', 'all_lines'
]
garmin_img_roads.main()
print(f"[{counter}/{len(map_segments)}] - Parsed {msegment}")
counter = counter + 1