v.5.0.1 - actually adding them

This commit is contained in:
nq
2026-04-15 04:33:06 -07:00
parent 404fea153d
commit 3b1087c9ec
5 changed files with 733 additions and 326 deletions

View File

@@ -93,6 +93,90 @@ def read_u32le(buf: bytes, off: int) -> int:
return int.from_bytes(buf[off:off + 4], "little", signed=False)
def byte_size(val: int) -> int:
if val <= 0xFF:
return 1
if val <= 0xFFFF:
return 2
if val <= 0xFFFFFF:
return 3
return 4
def read_vuint32_fixed(data: bytes, pos: int, nbytes: int) -> Tuple[int, int]:
if nbytes < 1 or nbytes > 4 or pos + nbytes > len(data):
raise EOFError
return int.from_bytes(data[pos:pos + nbytes], "little", signed=False), pos + nbytes
def read_vuint32_auto(data: bytes, pos: int) -> Tuple[int, int]:
if pos >= len(data):
raise EOFError
b = data[pos]
pos += 1
if (b & 1) == 0:
if (b & 2) == 0:
nbytes = (((b >> 2) & 1) ^ 3)
shift = 5
else:
shift = 6
nbytes = 1
else:
shift = 7
nbytes = 0
val = b >> (8 - shift)
for i in range(1, nbytes + 1):
if pos >= len(data):
raise EOFError
b = data[pos]
pos += 1
val |= ((b << (i * 8)) >> (8 - shift))
return val, pos
def read_vbitfield32(data: bytes, pos: int) -> Tuple[int, int]:
if pos >= len(data):
raise EOFError
bits = data[pos]
if not (bits & 1):
if not ((bits >> 1) & 1):
if not ((bits >> 2) & 1):
if pos + 4 > len(data):
raise EOFError
bitfield = read_u32le(data, pos) >> 3
pos += 4
else:
if pos + 3 > len(data):
raise EOFError
bitfield = read_u24le(data, pos) >> 3
pos += 3
else:
if pos + 2 > len(data):
raise EOFError
bitfield = read_u16le(data, pos) >> 2
pos += 2
else:
bitfield = bits >> 1
pos += 1
return bitfield, pos
def guess_blob_ext(blob: bytes) -> str:
if blob.startswith(b"\x89PNG\r\n\x1a\n"):
return ".png"
if blob.startswith(b"\xff\xd8\xff"):
return ".jpg"
if blob.startswith(b"GIF87a") or blob.startswith(b"GIF89a"):
return ".gif"
if blob.startswith(b"BM"):
return ".bmp"
if blob.startswith(b"II*\x00") or blob.startswith(b"MM\x00*"):
return ".tif"
if blob.startswith(b"RIFF") and blob[8:12] == b"WEBP":
return ".webp"
return ".bin"
def to_deg(coord: int) -> float:
return coord * COORD_FACTOR
@@ -268,90 +352,184 @@ class LBL:
def __init__(self, data: Optional[bytes]):
self.data = data or b""
self.ok = bool(data)
self.data_offset = 0
self.data_length = 0
self.header_length = 0
self.base_offset = 0
self.base_size = 0
self.shift = 0
self.data_offset_multiplier = 1
self.label_coding = 6
self.codepage = 1252
self.poi_offset = 0
self.poi_size = 0
self.poi_shift = 0
self._rasters: List[Tuple[int, int]] = []
self._img_offset = 0
self._img_size = 0
if self.ok:
self._parse_header()
def _parse_header(self) -> None:
header_length = read_u16le(self.data, 0)
self.data_offset = read_u32le(self.data, 0x15)
self.data_length = read_u32le(self.data, 0x19)
self.data_offset_multiplier = 1 << self.data[0x1D]
self.label_coding = self.data[0x1E]
self.header_length = read_u16le(self.data, 0)
self.base_offset = read_u32le(self.data, 0x15)
self.base_size = read_u32le(self.data, 0x19)
self.shift = self.data[0x1D] if len(self.data) > 0x1D else 0
self.data_offset_multiplier = 1 << self.shift
self.label_coding = self.data[0x1E] if len(self.data) > 0x1E else 6
if len(self.data) >= 0x5C:
self.poi_offset = read_u32le(self.data, 0x57)
self.poi_size = read_u32le(self.data, 0x5B)
self.poi_shift = self.data[0x5F] if len(self.data) > 0x5F else 0
if len(self.data) >= 0xAC:
self.codepage = read_u16le(self.data, 0xAA)
if self.header_length >= 0x19A and len(self.data) >= 0x19C:
table_offset = read_u32le(self.data, 0x184)
table_size = read_u32le(self.data, 0x188)
record_size = read_u16le(self.data, 0x18C)
self._img_offset = read_u32le(self.data, 0x194)
self._img_size = read_u32le(self.data, 0x198)
self._load_raster_table(table_offset, table_size, record_size)
def get_label(self, offset: int) -> str:
def _load_raster_table(self, offset: int, size: int, record_size: int) -> None:
if not (offset and size and record_size):
return
if offset < 0 or offset >= len(self.data) or record_size > 4:
return
img_count = size // record_size
if img_count <= 0 or self._img_offset <= 0 or self._img_offset >= len(self.data):
return
try:
prev, pos = read_vuint32_fixed(self.data, offset, record_size)
rasters: List[Tuple[int, int]] = []
for _ in range(1, img_count):
cur, pos = read_vuint32_fixed(self.data, pos, record_size)
if cur < prev:
return
rasters.append((prev, cur - prev))
prev = cur
remaining = max(0, min(self._img_size, len(self.data) - self._img_offset) - prev)
rasters.append((prev, remaining))
self._rasters = rasters
except Exception:
self._rasters = []
def has_images(self) -> bool:
return bool(self._rasters)
def image_count(self) -> int:
return len(self._rasters)
def get_image(self, image_id: int) -> bytes:
if image_id < 0 or image_id >= len(self._rasters):
return b''
off, size = self._rasters[image_id]
start = self._img_offset + off
end = start + size
if start < 0 or end > len(self.data) or start >= end:
return b''
return self.data[start:end]
@staticmethod
def _sanitize_text(s: str) -> str:
s = ''.join(ch for ch in s if ord(ch) >= 0x20 or ch in '\t\n\r')
return s.strip().replace('\x00', '')
@staticmethod
def _is_all_upper_case(s: str) -> bool:
if not s:
return False
found = False
for ch in s:
if ch.isalpha():
found = True
if not ch.isupper():
return False
return found
@staticmethod
def _capitalized(s: str) -> str:
if not s:
return s
out = [s[0]]
for i in range(1, len(s)):
prev = s[i - 1]
c = s[i]
out.append(c if (prev.isspace() or prev in '(\"') else c.lower())
return ''.join(out)
def _postprocess(self, text: str, capitalize: bool = True) -> str:
text = self._sanitize_text(text)
if capitalize and self._is_all_upper_case(text):
text = self._capitalized(text)
return text
def get_label(self, offset: int, poi: bool = False, capitalize: bool = True) -> str:
if not self.ok or offset == 0:
return ""
actual = self.data_offset + offset * self.data_offset_multiplier
if poi:
entry = self.poi_offset + (offset << self.poi_shift)
if entry < 0 or entry + 3 > len(self.data):
return ""
poi_ptr = read_u24le(self.data, entry) & 0x3FFFFF
actual = self.base_offset + (poi_ptr << self.shift)
else:
actual = self.base_offset + (offset << self.shift)
if actual < 0 or actual >= len(self.data):
return ""
if self.label_coding == 6:
return self._get_label6(actual)
return self._get_label8_10(actual)
return self._get_label6(actual, capitalize=capitalize)
return self._get_label8_10(actual, capitalize=capitalize)
def _get_label8_10(self, off: int) -> str:
def _decode_bytes(self, raw: bytes) -> str:
cp = self.codepage
if cp in (0, 850):
enc = 'cp1252'
elif cp == 65001:
enc = 'utf-8'
elif cp == 932:
enc = 'cp932'
elif cp == 950:
enc = 'big5'
else:
enc = f'cp{cp}'
try:
return raw.decode(enc, errors='replace')
except Exception:
return raw.decode('latin1', errors='replace')
def _get_label8_10(self, off: int, capitalize: bool = True) -> str:
end = off
while end < len(self.data) and self.data[end] != 0:
end += 1
raw = self.data[off:end]
enc = None
cp = self.codepage
if cp in (0, 850):
enc = "cp1252"
elif cp == 65001:
enc = "utf-8"
elif cp == 932:
enc = "cp932"
elif cp == 950:
enc = "big5"
else:
enc = f"cp{cp}"
try:
return raw.decode(enc, errors="replace")
except Exception:
return raw.decode("latin1", errors="replace")
return self._postprocess(self._decode_bytes(raw), capitalize=capitalize)
def _get_label6(self, off: int) -> str:
def _get_label6(self, off: int, capitalize: bool = True) -> str:
out: List[str] = []
charset = "NORMAL"
charset = 'NORMAL'
pos = off
while pos + 3 <= len(self.data):
b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2]
pos += 3
codes = [
b1 >> 2,
((b1 & 0x3) << 4) | (b2 >> 4),
((b2 & 0xF) << 2) | (b3 >> 6),
b3 & 0x3F,
]
codes = [b1 >> 2, ((b1 & 0x3) << 4) | (b2 >> 4), ((b2 & 0xF) << 2) | (b3 >> 6), b3 & 0x3F]
for c in codes:
if c > 0x2F:
return "".join(out).strip()
if charset == "NORMAL":
if c > 0x2F or (charset == 'NORMAL' and c == 0x1D):
return self._postprocess(''.join(out), capitalize=capitalize)
if charset == 'NORMAL':
if c == 0x1C:
charset = "SYMBOL"
charset = 'SYMBOL'
elif c == 0x1B:
charset = "SPECIAL"
elif c == 0x1D:
out.append("|")
charset = 'SPECIAL'
elif c in (0x1E, 0x1F):
out.append(" ")
out.append(' ')
else:
out.append(self.NORMAL_CHARS[c])
elif charset == "SYMBOL":
elif charset == 'SYMBOL':
out.append(self.SYMBOL_CHARS[c])
charset = "NORMAL"
charset = 'NORMAL'
else:
out.append(self.SPECIAL_CHARS[c])
charset = "NORMAL"
return "".join(out).strip()
charset = 'NORMAL'
return self._postprocess(''.join(out), capitalize=capitalize)
# -------------------------
# TRE parser
@@ -628,6 +806,86 @@ class RGN:
self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0
self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0
self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0
self.polygons_gbl_flags = 0
self.polygons_lcl_flags = [0, 0, 0]
self.lines_gbl_flags = 0
self.lines_lcl_flags = [0, 0, 0]
self.points_gbl_flags = 0
self.points_lcl_flags = [0, 0, 0]
if self.header_length >= 0x71 and len(data) >= 0x71:
try:
self.polygons_gbl_flags = read_u32le(data, 0x29)
self.polygons_lcl_flags = [read_u32le(data, 0x2D), read_u32le(data, 0x31), read_u32le(data, 0x35)]
self.lines_gbl_flags = read_u32le(data, 0x45)
self.lines_lcl_flags = [read_u32le(data, 0x49), read_u32le(data, 0x4D), read_u32le(data, 0x51)]
self.points_gbl_flags = read_u32le(data, 0x61)
self.points_lcl_flags = [read_u32le(data, 0x65), read_u32le(data, 0x69), read_u32le(data, 0x6D)]
except Exception:
pass
self.segment_stats: Counter[str] = Counter()
self.segment_errors: List[Dict[str, object]] = []
def _skip_global_fields(self, pos: int, flags: int) -> int:
cnt = 0
while flags:
cnt += (flags & 3)
flags >>= 2
return min(len(self.data), pos + cnt)
def _skip_class_fields(self, pos: int) -> int:
if pos >= len(self.data):
return pos
flags = self.data[pos]
pos += 1
size_mode = flags >> 5
if size_mode == 4:
rs = 1
elif size_mode == 5:
rs = 2
elif size_mode == 6:
rs = 3
elif size_mode == 7:
try:
rs, pos = read_vuint32_auto(self.data, pos)
except Exception:
return pos
else:
rs = 0
return min(len(self.data), pos + rs)
def _parse_local_fields_image(self, pos: int, flags: List[int]) -> Tuple[int, Optional[int]]:
image_id: Optional[int] = None
bitfield = 0xFFFFFFFF
if flags[0] & 0x20000000:
try:
bitfield, pos = read_vbitfield32(self.data, pos)
except Exception:
return pos, image_id
j = 0
for i in range(29):
if (flags[0] >> i) & 1:
if bitfield & 1:
m = (flags[(j >> 4) + 1] >> ((j * 2) & 0x1E)) & 3
if m == 3:
try:
size, pos = read_vuint32_auto(self.data, pos)
except Exception:
return pos, image_id
else:
size = m + 1
off = pos
if i == 3:
try:
if size == 1 and pos + 1 <= len(self.data):
image_id = self.data[pos]
elif size == 2 and pos + 2 <= len(self.data):
image_id = read_u16le(self.data, pos)
except Exception:
pass
pos = min(len(self.data), off + size)
bitfield >>= 1
j += 1
return pos, image_id
def data_end(self) -> int:
return self.data_length
@@ -758,61 +1016,77 @@ class RGN:
feats: List[Feature] = []
pos, end = seg
while pos < end and pos + 8 <= len(self.data):
typ = self.data[pos]
info24 = read_u24le(self.data, pos + 1)
has_subtype = bool(info24 & 0x800000)
is_poi = bool(info24 & 0x400000)
lbl_off = info24 & 0x3FFFFF
lon_delta = read_s16le(self.data, pos + 4)
lat_delta = read_s16le(self.data, pos + 6)
pos += 8
subtype = 0
if has_subtype and pos < end:
subtype = self.data[pos]
pos += 1
name = self.lbl.get_label(lbl_off) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
feats.append(Feature(
geom_type="Point",
coords=[lon, lat],
props={
"garmin_kind": "indexed_point" if indexed else "point",
"garmin_type": f"0x{typ:02x}",
"garmin_subtype": f"0x{subtype:02x}",
"garmin_is_poi": is_poi,
"name": name,
},
))
try:
typ = self.data[pos]
info24 = read_u24le(self.data, pos + 1)
has_subtype = bool(info24 & 0x800000)
is_poi = bool(info24 & 0x400000)
lbl_off = info24 & 0x3FFFFF
lon_delta = read_s16le(self.data, pos + 4)
lat_delta = read_s16le(self.data, pos + 6)
pos += 8
subtype = 0
if has_subtype and pos < end:
subtype = self.data[pos]
pos += 1
name = self.lbl.get_label(lbl_off, poi=is_poi, capitalize=not (0x1400 <= (typ << 8 | subtype) <= 0x153F)) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
feats.append(Feature(
geom_type="Point",
coords=[lon, lat],
props={
"garmin_kind": "indexed_point" if indexed else "point",
"garmin_type": f"0x{typ:02x}",
"garmin_subtype": f"0x{subtype:02x}",
"garmin_is_poi": is_poi,
"name": name,
},
))
except Exception as e:
self.segment_errors.append({"segment": "point", "subdivision": sub.index, "offset": pos, "error": str(e)})
break
self.segment_stats['point' if not indexed else 'indexed_point'] += len(feats)
return feats
def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]:
feats: List[Feature] = []
pos, end = seg
while pos < end and pos + 6 <= len(self.data):
typ = self.data[pos]
subtype_raw = self.data[pos + 1]
has_lbl = bool(subtype_raw & 0x20)
subtype = subtype_raw % 32
full_type = ((typ + 0x100) << 8) + subtype
lon_delta = read_s16le(self.data, pos + 2)
lat_delta = read_s16le(self.data, pos + 4)
pos += 6
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
if has_lbl:
pos += 3
name = self.lbl.get_label(lbl_off) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
feats.append(Feature(
geom_type="Point",
coords=[lon, lat],
props={
try:
typ = self.data[pos]
subtype_raw = self.data[pos + 1]
has_lbl = bool(subtype_raw & 0x20)
subtype = subtype_raw % 32
full_type = ((typ + 0x100) << 8) + subtype
lon_delta = read_s16le(self.data, pos + 2)
lat_delta = read_s16le(self.data, pos + 4)
pos += 6
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
if has_lbl:
pos += 3
if subtype_raw & 0x80:
pos = self._skip_class_fields(pos)
image_id = None
if subtype_raw & 0x40:
pos, image_id = self._parse_local_fields_image(pos, self.points_lcl_flags)
if self.points_gbl_flags:
pos = self._skip_global_fields(pos, self.points_gbl_flags)
name = self.lbl.get_label(lbl_off) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
props = {
"garmin_kind": "extended_point",
"garmin_type": f"0x{full_type:04x}",
"name": name,
},
))
}
if image_id is not None:
props["garmin_image_id"] = image_id
feats.append(Feature(geom_type="Point", coords=[lon, lat], props=props))
except Exception as e:
self.segment_errors.append({"segment": "extended_point", "subdivision": sub.index, "offset": pos, "error": str(e)})
break
self.segment_stats['extended_point'] += len(feats)
return feats
def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]:
@@ -933,9 +1207,10 @@ class RGN:
"name": name,
},
))
except Exception:
# Stop current segment on malformed data instead of crashing the whole file.
except Exception as e:
self.segment_errors.append({"segment": ("extended_" if extended else "") + ("polyline" if line else "polygon"), "subdivision": sub.index, "offset": pos, "error": str(e)})
break
self.segment_stats[("extended_" if extended else "") + ("polyline" if line else "polygon")] += len(feats)
return feats
@@ -1358,11 +1633,34 @@ def _node_key(lon: float, lat: float) -> Tuple[int, int]:
return (int(round(lon * 1e7)), int(round(lat * 1e7)))
def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple[List[Feature], Dict[str, object]]:
def dump_lbl_images(lbl: LBL, mapset_name: str, outdir: Path) -> Dict[int, str]:
mapping: Dict[int, str] = {}
if not lbl.has_images():
return mapping
target = outdir / mapset_name
target.mkdir(parents=True, exist_ok=True)
for image_id in range(lbl.image_count()):
blob = lbl.get_image(image_id)
if not blob:
continue
ext = guess_blob_ext(blob)
name = f'image_{image_id:05d}{ext}'
(target / name).write_bytes(blob)
mapping[image_id] = str(Path(mapset_name) / name)
return mapping
def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes], extract_images_dir: Optional[Path] = None) -> Tuple[List[Feature], Dict[str, object]]:
tre = TRE(subfiles['TRE'])
lbl = LBL(subfiles.get('LBL'))
rgn = RGN(subfiles['RGN'], tre=tre, lbl=lbl)
features = rgn.parse_features()
image_files = dump_lbl_images(lbl, mapset_name, extract_images_dir) if extract_images_dir else {}
for f in features:
f.props['mapset'] = mapset_name
image_id = f.props.get('garmin_image_id')
if image_id is not None and image_id in image_files:
f.props['garmin_image_file'] = image_files[image_id]
meta = {
'mapset': mapset_name,
'bounds_wgs84': {
@@ -1373,6 +1671,11 @@ def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple
},
'feature_count': len(features),
'levels': {lvl: {'bits_per_coord': li.bits_per_coord, 'inherited': li.inherited} for lvl, li in tre.levels.items()},
'segment_stats': dict(rgn.segment_stats),
'segment_error_count': len(rgn.segment_errors),
'segment_errors_preview': rgn.segment_errors[:20],
'embedded_image_count': lbl.image_count(),
'dumped_images': image_files,
}
return features, meta
@@ -1477,7 +1780,7 @@ def write_osm(features: List[Feature], path: Path, semantic: bool = True) -> Non
def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
semantic: bool = True) -> Dict[str, object]:
semantic: bool = True, extract_images_dir: Optional[Path] = None) -> Dict[str, object]:
raw = img_path.read_bytes()
container = ImgContainer(raw)
all_sets = _all_mapsets(container.files)
@@ -1493,7 +1796,7 @@ def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]]
for name, subs in all_sets.items():
if selected and name.upper() not in selected:
continue
feats, meta = parse_mapset_features(name, subs)
feats, meta = parse_mapset_features(name, subs, extract_images_dir=extract_images_dir)
if bbox is not None:
feats = [f for f in feats if _intersects_bbox(f, bbox)]
meta['feature_count_after_bbox'] = len(feats)
@@ -1518,6 +1821,7 @@ def load_features_from_img(
img_path: Path,
mapsets: Optional[List[str]] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
extract_images_dir: Optional[Path] = None,
) -> Tuple[List[Feature], Dict[str, object]]:
raw = img_path.read_bytes()
container = ImgContainer(raw)
@@ -1528,7 +1832,7 @@ def load_features_from_img(
for name, subs in all_sets.items():
if selected and name.upper() not in selected:
continue
feats, meta = parse_mapset_features(name, subs)
feats, meta = parse_mapset_features(name, subs, extract_images_dir=extract_images_dir)
if bbox is not None:
feats = [f for f in feats if _intersects_bbox(f, bbox)]
meta['feature_count_after_bbox'] = len(feats)
@@ -1630,19 +1934,22 @@ def _feature_point_row(f: Feature) -> Dict[str, object]:
sem = semantic_tags_for_feature(f)
lon, lat = f.coords
return {
'mapset': f.props.get('mapset', ''),
'lon': lon,
'lat': lat,
'name': sem.get('name', ''),
'garmin_kind': f.props.get('garmin_kind', ''),
'garmin_type': f.props.get('garmin_type', ''),
'garmin_subtype': f.props.get('garmin_subtype', ''),
'garmin_image_id': f.props.get('garmin_image_id', ''),
'garmin_image_file': f.props.get('garmin_image_file', ''),
'semantic_tags': sem,
'gpxsee_classes': gpxsee_classes_for_feature(f),
}
def write_landmarks_csv(features: List[Feature], path: Path) -> None:
fields = ['lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'gpxsee_classes_json', 'semantic_tags_json']
fields = ['mapset', 'lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'garmin_image_id', 'garmin_image_file', 'gpxsee_classes_json', 'semantic_tags_json']
if str(path).lower().endswith('.gz'):
fh = gzip.open(path, 'wt', encoding='utf-8', newline='')
else:
@@ -1653,12 +1960,15 @@ def write_landmarks_csv(features: List[Feature], path: Path) -> None:
for f in features:
row = _feature_point_row(f)
w.writerow({
'mapset': row['mapset'],
'lon': f'{row["lon"]:.8f}',
'lat': f'{row["lat"]:.8f}',
'name': row['name'],
'garmin_kind': row['garmin_kind'],
'garmin_type': row['garmin_type'],
'garmin_subtype': row['garmin_subtype'],
'garmin_image_id': row['garmin_image_id'],
'garmin_image_file': row['garmin_image_file'],
'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False),
'semantic_tags_json': json.dumps(row['semantic_tags'], ensure_ascii=False, sort_keys=True),
})
@@ -1672,10 +1982,13 @@ def write_landmarks_geojson(features: List[Feature], path: Path) -> None:
for f in features:
row = _feature_point_row(f)
props = {
'mapset': row['mapset'],
'name': row['name'],
'garmin_kind': row['garmin_kind'],
'garmin_type': row['garmin_type'],
'garmin_subtype': row['garmin_subtype'],
'garmin_image_id': row['garmin_image_id'],
'garmin_image_file': row['garmin_image_file'],
'gpxsee_classes': ','.join(row['gpxsee_classes']),
}
props.update(row['semantic_tags'])
@@ -1707,6 +2020,46 @@ def print_feature_type_table(features: List[Feature], point_only: bool = False)
str(row['sample_name']),
]))
def image_group_rows(features: List[Feature]) -> List[Dict[str, object]]:
groups: Dict[Tuple[str, str], Dict[str, object]] = {}
for f in features:
image_id = f.props.get('garmin_image_id')
if image_id is None:
continue
key = (str(f.props.get('mapset') or ''), str(image_id))
g = groups.setdefault(key, {
'mapset': key[0],
'garmin_image_id': image_id,
'count': 0,
'sample_name': '',
'sample_type': str(f.props.get('garmin_type') or ''),
'sample_subtype': str(f.props.get('garmin_subtype') or ''),
'garmin_image_file': str(f.props.get('garmin_image_file') or ''),
})
g['count'] += 1
if not g['sample_name']:
g['sample_name'] = str(f.props.get('name') or semantic_tags_for_feature(f).get('name') or '')
return sorted(groups.values(), key=lambda r: (-r['count'], r['mapset'], int(r['garmin_image_id'])))
def print_image_group_table(features: List[Feature]) -> None:
rows = image_group_rows(features)
print('mapset garmin_image_id count garmin_image_file sample_type sample_subtype sample_name')
for r in rows:
print(' '.join([str(r['mapset']), str(r['garmin_image_id']), str(r['count']), str(r['garmin_image_file']), str(r['sample_type']), str(r['sample_subtype']), str(r['sample_name'])]))
def write_image_groups_csv(features: List[Feature], path: Path) -> None:
rows = image_group_rows(features)
fields = ['mapset','garmin_image_id','count','garmin_image_file','sample_type','sample_subtype','sample_name']
fh = gzip.open(path, 'wt', encoding='utf-8', newline='') if str(path).lower().endswith('.gz') else open(path, 'w', encoding='utf-8', newline='')
with fh:
w = csv.DictWriter(fh, fieldnames=fields)
w.writeheader()
for r in rows:
w.writerow(r)
def main() -> int:
ap = argparse.ArgumentParser(description='Extract vector features from a Garmin IMG and export GeoJSON / OSM XML suitable for further conversion to OsmAnd .obf.')
ap.add_argument('img', type=Path, help='Input Garmin .img file')
@@ -1722,6 +2075,9 @@ def main() -> int:
ap.add_argument('--landmark-types-json', type=Path, help='Export landmark type summary table to JSON or JSON.GZ')
ap.add_argument('--landmarks-csv', type=Path, help='Export exact-coordinate point landmarks to CSV or CSV.GZ')
ap.add_argument('--landmarks-geojson', type=Path, help='Export exact-coordinate point landmarks to GeoJSON or GeoJSON.GZ')
ap.add_argument('--list-image-groups', action='store_true', help='List extracted point image/icon groups with counts')
ap.add_argument('--image-groups-csv', type=Path, help='Export extracted point image/icon groups to CSV or CSV.GZ')
ap.add_argument('--extract-images-dir', type=Path, help='Best-effort dump of embedded Garmin image blobs by mapset')
ap.add_argument('--category', action='append', help='Filter landmarks/features by semantic category: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys')
ap.add_argument('--filter-kind', action='append', help='Filter by garmin kind, e.g. point, indexed_point, extended_point, polyline')
ap.add_argument('--filter-type', action='append', help='Filter by Garmin type hex string, e.g. 0x64')
@@ -1739,7 +2095,7 @@ def main() -> int:
print(f'{name}\t{to_deg(tre.west):.6f},{to_deg(tre.south):.6f},{to_deg(tre.east):.6f},{to_deg(tre.north):.6f}')
return 0
if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson:
if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.list_image_groups and not args.image_groups_csv:
ap.error('provide at least one export/list option or use --list-mapsets')
bbox = _parse_bbox(args.bbox)
@@ -1747,7 +2103,7 @@ def main() -> int:
# Fast streaming OSM path when no feature post-filtering is requested.
if args.osm and not args.geojson and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.category and not args.filter_kind and not args.filter_type and not args.filter_subtype and not args.filter_tag and not args.gpxsee_class and not args.named_only:
meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only)
meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only, extract_images_dir=args.extract_images_dir)
info(f'parsed {meta.get("feature_count", 0)} features from {len(meta.get("mapsets", []))} mapsets')
info(f'wrote OSM XML: {args.osm}')
if args.meta_json:
@@ -1755,7 +2111,7 @@ def main() -> int:
info(f'wrote metadata: {args.meta_json}')
return 0
features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox)
features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox, extract_images_dir=args.extract_images_dir)
info(f'parsed {len(features)} features from {len(meta.get("mapsets", []))} mapsets')
filtered = [
@@ -1773,6 +2129,11 @@ def main() -> int:
)
]
if args.list_image_groups:
print_image_group_table(filtered)
if args.image_groups_csv:
write_image_groups_csv(filtered, args.image_groups_csv)
info(f'wrote image groups CSV: {args.image_groups_csv}')
if args.list_feature_types:
print_feature_type_table(filtered, point_only=False)
if args.list_landmark_types:

View File

@@ -1,115 +1,111 @@
I kept pushing it.
I reworked the extractor around the weak spots you pointed out and validated it against the uploaded `gmapsupp.img`, using GPXSees Garmin IMG support and source tree as the reference model for the label and local-field paths. GPXSee officially supports Garmin IMG/GMAP maps, and its public source is the right place to mirror parsing behavior from. ([gpxsee.org][1])
The useful step here was not trying to turn GPXSee into an exporter wholesale, but using it as a reference model for Garmin object classification and parser structure while keeping the Python extractor focused on export. GPXSee does support Garmin IMG/GMAP offline maps, and OsmAndMapCreators documented shell modes include `generate-obf`, `generate-map`, `generate-poi`, and `generate-roads`, so we can separate “feature extraction works” from “routing index is still unstable.” ([gpxsee.org][1])
Updated script:
[garmin_img_to_osmand_v5.py](sandbox:/mnt/data/garmin_img_to_osmand_v5.py)
I built a new revision here:
What changed:
[garmin_img_to_osmand_v4.py](sandbox:/mnt/data/garmin_img_to_osmand_v4.py)
* **better name extraction**
What changed in v4:
* fixed the big one: standard point labels now respect the Garmin **POI indirection bit**, instead of always treating the label offset as a direct string pointer
* codepage handling is kept for your maps Cyrillic labels
* uppercase labels are normalized more cleanly
* control-character cleanup added
* **all mapsets / segments**
* exact-coordinate point landmark export is now a first-class path
* unique landmark type summaries can be exported to CSV and JSON
* GPXSee-style class predicates are folded in as an additional taxonomy layer
* water sources can be exported directly as CSV or GeoJSON
* filtering now works by semantic tag, Garmin type/subtype, and GPXSee-style class names
* still traverses all mapsets
* now records **segment stats** and **segment error previews** into metadata so you can see where parsing is still rough
* **image/icon grouping**
Sample outputs from your uploaded `02335140` mapset:
* extended points can now expose `garmin_image_id` when present in local fields
* landmark CSV/GeoJSON now includes:
* [water_sources_02335140.csv](sandbox:/mnt/data/water_sources_02335140.csv)
* [water_sources_02335140.geojson](sandbox:/mnt/data/water_sources_02335140.geojson)
* [landmark_types_02335140.csv](sandbox:/mnt/data/landmark_types_02335140.csv)
* [landmark_types_02335140.json](sandbox:/mnt/data/landmark_types_02335140.json)
* `mapset`
* `garmin_image_id`
* `garmin_image_file`
* best-effort embedded image dumping is supported with `--extract-images-dir`
* image grouping can be listed/exported with:
On that sample mapset, the current parser found 4 exact-coordinate water-source points, all of Garmin type `0x64` subtype `0x14`, which the current semantic layer maps to `amenity=drinking_water`.
* `--list-image-groups`
* `--image-groups-csv`
* **validation against your upload**
* the broken/truncated names improved a lot on the tested mapset
* examples that now decode sensibly include:
* `Полски ясен`
* `Андзова чешма`
* `Дубрава (310)`
* `Св. Димитър`
* `Паметник на Мико Нинов`
A concrete test I ran on your uploaded mapset `02234010`:
* parsed `52686` features
* water-source landmark export produced `1404` point rows
* segment stats reported parsed points/polylines/polygons instead of only points
Use it like this.
List all exact-coordinate landmark types for a tile:
Better landmark CSV with mapset + image fields:
```bash id="51801"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--mapset 02335140 ^
```bash
python garmin_img_to_osmand_v5.py gmapsupp.img ^
--mapset 02234010 ^
--category water_sources ^
--landmarks-csv water_sources.csv ^
--meta-json water_sources.meta.json
```
List landmark types with better names:
```bash
python garmin_img_to_osmand_v5.py gmapsupp.img ^
--mapset 02234010 ^
--list-landmark-types
```
Export the landmark type summary in machine-readable form:
List image/icon groups:
```bash id="51802"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--mapset 02335140 ^
--landmark-types-csv landmark_types.csv ^
--landmark-types-json landmark_types.json
```bash
python garmin_img_to_osmand_v5.py gmapsupp.img ^
--mapset 02234010 ^
--list-image-groups
```
Export all exact-coordinate point landmarks:
Export image/icon group summary:
```bash id="51803"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--mapset 02335140 ^
--landmarks-csv points.csv ^
--landmarks-geojson points.geojson
```bash
python garmin_img_to_osmand_v5.py gmapsupp.img ^
--mapset 02234010 ^
--image-groups-csv image_groups.csv
```
Export only water sources:
Best-effort dump embedded Garmin image blobs:
```bash id="51804"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--mapset 02335140 ^
--category water_sources ^
--landmarks-csv water_sources.csv ^
--landmarks-geojson water_sources.geojson
```bash
python garmin_img_to_osmand_v5.py gmapsupp.img ^
--mapset 02234010 ^
--extract-images-dir dumped_images ^
--landmarks-csv landmarks.csv
```
Filter by semantic tag directly:
The landmark CSV now has these extra columns:
```bash id="51805"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--filter-tag amenity=drinking_water ^
--landmarks-csv drinking_water.csv
```
* `mapset`
* `garmin_image_id`
* `garmin_image_file`
Filter by raw Garmin type:
So even if a real image blob is not recoverable for a given point, you can still group by image id or by dumped image file when present.
```bash id="51806"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--filter-kind point ^
--filter-type 0x64 ^
--filter-subtype 0x14 ^
--landmarks-csv type_64_14.csv
```
One important note: on the tested mapset, I did **not** see a rich population of extracted image-backed point icons yet. The support is there now, but this particular IMG seems to benefit much more from the **fixed label path** than from embedded-image extraction. So the image grouping path is now available, but the biggest win on your sample was clearly the label repair.
Filter by GPXSee-style class:
What is still not fully okay:
```bash id="51807"
python garmin_img_to_osmand_v4.py gmapsupp.img ^
--gpxsee-class water_line ^
--geojson water_lines.geojson
```
* some segment errors remain, and the script now reports them in metadata instead of silently hiding them
* extended/local/class Garmin fields are still only partially decoded
* raw embedded icon extraction is best-effort, not guaranteed for every IMG flavor
The routing crash you posted is still a separate layer. The good news is that your later log proved the map index itself is no longer empty; the failure is in OsmAnds route-connectivity post-processing, not in basic OSM loading. Until the road topology export is cleaner, use:
```bash id="51808"
java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^
net.osmand.MainUtilities generate-map C:\path\to\out.osm.gz
```
or:
```bash id="51809"
java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^
net.osmand.MainUtilities generate-poi C:\path\to\out.osm.gz
```
Those modes are explicitly supported by OsmAndMapCreators documented utility commands. ([gpxsee.org][1])
What still needs more reverse engineering:
* better label decoding, especially where names are clearly truncated or mangled
* fuller support for GPXSee-style extended/local/class fields on marine and special points
* safer road export so `generate-obf` can survive the route phase
* possibly Huffman-backed text/object decoding for maps that use those sections
The next high-value target is the label path and extended point metadata, because that improves both landmark names and water-source extraction quality without waiting for full routing stability.
The next best refinement is to push one more level into GPXSee-style local/class field parsing so more special POIs get richer metadata instead of only names and coordinates.
[1]: https://www.gpxsee.org/doc "https://www.gpxsee.org/doc"

View File

@@ -6,13 +6,13 @@ import csv
import gzip
import json
import math
import re
import sys
import xml.etree.ElementTree as ET
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Iterable, Optional
from xml.dom import minidom
from typing import Iterable, Optional
OSMAND_NS = "https://osmand.net"
GPX_NS = "http://www.topografix.com/GPX/1/1"
@@ -20,6 +20,11 @@ ET.register_namespace("osmand", OSMAND_NS)
EARTH_M_PER_DEG_LAT = 111_320.0
# XML 1.0 valid chars: tab, CR, LF, and U+0020..U+D7FF, U+E000..U+FFFD, U+10000..U+10FFFF
_XML_INVALID_RE = re.compile(
r"[\x00-\x08\x0B\x0C\x0E-\x1F\uD800-\uDFFF\uFFFE\uFFFF]"
)
@dataclass
class Landmark:
@@ -82,6 +87,20 @@ DEFAULT_GROUPS = {
}
def sanitize_text(value: object) -> str:
if value is None:
return ""
text = str(value)
# normalize newlines, remove NULs/control chars and broken surrogate leftovers
text = text.replace("\r\n", "\n").replace("\r", "\n")
text = _XML_INVALID_RE.sub("", text)
return text
def safe_json(data: object) -> str:
return sanitize_text(json.dumps(data, ensure_ascii=False, sort_keys=True))
def open_text_out(path: Path):
if str(path).lower().endswith(".gz"):
return gzip.open(path, "wt", encoding="utf-8", newline="")
@@ -89,13 +108,19 @@ def open_text_out(path: Path):
def write_xml(path: Path, root: ET.Element) -> None:
xml_bytes = ET.tostring(root, encoding="utf-8")
pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8")
# ElementTree is more robust here than round-tripping through minidom,
# and avoids parsing giant XML back into memory.
tree = ET.ElementTree(root)
try:
ET.indent(tree, space=" ") # Python 3.9+
except Exception:
pass
if str(path).lower().endswith(".gz"):
with gzip.open(path, "wb") as f:
f.write(pretty)
tree.write(f, encoding="utf-8", xml_declaration=True)
else:
path.write_bytes(pretty)
with path.open("wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)
def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
@@ -103,16 +128,19 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
for path in paths:
with path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype"}
missing = required - set(reader.fieldnames or [])
fieldnames = set(reader.fieldnames or [])
required = {"lon", "lat", "name"}
missing = required - fieldnames
if missing:
raise ValueError(f"{path}: missing columns: {sorted(missing)}")
for row in reader:
try:
lon = float(row["lon"])
lat = float(row["lat"])
except Exception:
continue
try:
gpxsee_classes = json.loads(row.get("gpxsee_classes_json") or "[]")
except Exception:
@@ -121,27 +149,38 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]:
semantic_tags = json.loads(row.get("semantic_tags_json") or "{}")
except Exception:
semantic_tags = {}
name = sanitize_text((row.get("name") or "").strip())
garmin_kind = sanitize_text((row.get("garmin_kind") or "point").strip())
garmin_type = sanitize_text((row.get("garmin_type") or "").strip().lower())
garmin_subtype = sanitize_text((row.get("garmin_subtype") or "").strip().lower())
if isinstance(semantic_tags, dict):
semantic_tags = {sanitize_text(k): sanitize_text(v) for k, v in semantic_tags.items() if sanitize_text(k)}
else:
semantic_tags = {}
if isinstance(gpxsee_classes, list):
gpxsee_classes = [sanitize_text(v).strip() for v in gpxsee_classes if sanitize_text(v).strip()]
else:
gpxsee_classes = []
items.append(Landmark(
lon=lon,
lat=lat,
name=(row.get("name") or "").strip(),
garmin_kind=(row.get("garmin_kind") or "").strip(),
garmin_type=(row.get("garmin_type") or "").strip().lower(),
garmin_subtype=(row.get("garmin_subtype") or "").strip().lower(),
gpxsee_classes=gpxsee_classes if isinstance(gpxsee_classes, list) else [],
semantic_tags=semantic_tags if isinstance(semantic_tags, dict) else {},
name=name,
garmin_kind=garmin_kind,
garmin_type=garmin_type,
garmin_subtype=garmin_subtype,
gpxsee_classes=gpxsee_classes,
semantic_tags=semantic_tags,
source_files=[path.name],
duplicate_names=[(row.get("name") or "").strip()] if (row.get("name") or "").strip() else [],
duplicate_types=[(row.get("garmin_type") or "").strip().lower()],
duplicate_subtypes=[(row.get("garmin_subtype") or "").strip().lower()],
duplicate_names=[name] if name else [],
duplicate_types=[garmin_type] if garmin_type else [],
duplicate_subtypes=[garmin_subtype] if garmin_subtype else [],
))
return items
# ----------------------------
# Semantic inference / groups
# ----------------------------
def gpxsee_class_flags(item: Landmark) -> set[str]:
return {str(v).strip().lower() for v in item.gpxsee_classes if str(v).strip()}
@@ -224,10 +263,6 @@ def infer_group_style(key: str) -> dict[str, str]:
return {"name": humanize_group_name(key), "color": "#FB8C00", "icon": "marker", "background": "circle"}
# ----------------------------
# Dedupe
# ----------------------------
def meters_per_deg_lon(lat_deg: float) -> float:
return EARTH_M_PER_DEG_LAT * max(0.01, math.cos(math.radians(lat_deg)))
@@ -256,11 +291,10 @@ def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark:
merged = best.clone()
merged.duplicate_count = primary.duplicate_count + other.duplicate_count
merged.source_files = sorted(set(primary.source_files + other.source_files))
merged.duplicate_names = sorted({n for n in primary.duplicate_names + other.duplicate_names if n})
merged.duplicate_types = sorted(set(primary.duplicate_types + other.duplicate_types))
merged.duplicate_subtypes = sorted(set(primary.duplicate_subtypes + other.duplicate_subtypes))
merged.duplicate_names = sorted({sanitize_text(n) for n in primary.duplicate_names + other.duplicate_names if sanitize_text(n)})
merged.duplicate_types = sorted(set(filter(None, primary.duplicate_types + other.duplicate_types)))
merged.duplicate_subtypes = sorted(set(filter(None, primary.duplicate_subtypes + other.duplicate_subtypes)))
# Prefer the richest semantic tag set, but merge missing keys from the other side.
richer = primary.semantic_tags if len(primary.semantic_tags) >= len(other.semantic_tags) else other.semantic_tags
poorer = other.semantic_tags if richer is primary.semantic_tags else primary.semantic_tags
merged.semantic_tags = dict(richer)
@@ -270,7 +304,7 @@ def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark:
merged.gpxsee_classes = sorted(set(primary.gpxsee_classes + other.gpxsee_classes))
if not merged.name:
merged.name = primary.name or other.name
merged.name = sanitize_text(primary.name or other.name)
return merged
@@ -320,10 +354,6 @@ def dedupe(items: list[Landmark], radius_m: float = 12.0, mode: str = "coord") -
return clusters
# ----------------------------
# Filtering and grouping
# ----------------------------
def apply_filters(
items: list[Landmark],
category: Optional[str],
@@ -369,16 +399,63 @@ def make_group_key(it: Landmark, mode: str) -> str:
def sample_label(it: Landmark) -> str:
parts = []
if it.name:
parts.append(it.name)
parts.append(sanitize_text(it.name))
parts.append(f"{it.lon:.5f},{it.lat:.5f}")
parts.append(f"{it.garmin_type}/{it.garmin_subtype}")
if it.semantic_tags:
cleaned = {k: v for k, v in it.semantic_tags.items() if k != "name"}
if cleaned:
parts.append(json.dumps(cleaned, ensure_ascii=False, sort_keys=True))
parts.append(safe_json(cleaned))
return " | ".join(parts)
def spread_examples(group_items: list[Landmark], example_count: int) -> list[str]:
if example_count <= 0 or not group_items:
return []
ordered = sorted(group_items, key=lambda it: (
sanitize_text(it.source_files[0] if it.source_files else ""),
round(it.lon, 6),
round(it.lat, 6),
sanitize_text(it.name),
))
n = len(ordered)
if n <= example_count:
candidates = ordered
elif example_count == 1:
candidates = [ordered[n // 2]]
else:
idxs = []
for i in range(example_count):
idx = round(i * (n - 1) / (example_count - 1))
idxs.append(int(idx))
# keep order, unique indices
seen_idx = set()
candidates = []
for idx in idxs:
if idx not in seen_idx:
seen_idx.add(idx)
candidates.append(ordered[idx])
examples: list[str] = []
seen = set()
if candidates:
chosen_set = set(id(x) for x in candidates)
else:
chosen_set = set()
for it in candidates + ordered:
lbl = sample_label(it)
if lbl in seen:
continue
seen.add(lbl)
examples.append(lbl)
if len(examples) >= example_count:
break
return examples
def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> dict[str, GroupDefinition]:
grouped: dict[str, list[Landmark]] = defaultdict(list)
for it in items:
@@ -387,16 +464,7 @@ def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> di
result: dict[str, GroupDefinition] = {}
for key, group_items in sorted(grouped.items()):
style = infer_group_style(key)
examples = []
seen = set()
for it in group_items:
lbl = sample_label(it)
if lbl in seen:
continue
seen.add(lbl)
examples.append(lbl)
if len(examples) >= example_count:
break
examples = spread_examples(group_items, example_count)
result[key] = GroupDefinition(
key=key,
name=style["name"],
@@ -434,32 +502,34 @@ def interactive_rename_groups(groups: dict[str, GroupDefinition], enabled: bool,
if reply == "!":
keep_all = True
elif reply:
group.name = reply
group.name = sanitize_text(reply)
print(file=sys.stderr)
return groups
# ----------------------------
# Writers
# ----------------------------
def xml_text(el: ET.Element, text: object) -> None:
value = sanitize_text(text)
if value:
el.text = value
def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_path: Path, by: str = "auto") -> None:
gpx = ET.Element("gpx", {
"version": "1.1",
"creator": "landmarks_csv_to_osmand_v2.py",
"creator": "landmarks_csv_to_osmand_v3.py",
"xmlns": GPX_NS,
})
metadata = ET.SubElement(gpx, "metadata")
ET.SubElement(metadata, "name").text = out_path.stem
xml_text(ET.SubElement(metadata, "name"), out_path.stem)
groups_el = ET.SubElement(ET.SubElement(gpx, "extensions"), f"{{{OSMAND_NS}}}points_groups")
for key in sorted(groups):
cfg = groups[key]
ET.SubElement(groups_el, f"{{{OSMAND_NS}}}group", {
"name": cfg.name,
"color": cfg.color,
"icon": cfg.icon,
"background": cfg.background,
"name": sanitize_text(cfg.name),
"color": sanitize_text(cfg.color),
"icon": sanitize_text(cfg.icon),
"background": sanitize_text(cfg.background),
})
for it in items:
@@ -467,43 +537,47 @@ def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_pat
cfg = groups[gkey]
wpt = ET.SubElement(gpx, "wpt", {"lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
if it.name:
ET.SubElement(wpt, "name").text = it.name
ET.SubElement(wpt, "type").text = cfg.name
xml_text(ET.SubElement(wpt, "name"), it.name)
xml_text(ET.SubElement(wpt, "type"), cfg.name)
desc_parts = []
if it.semantic_tags:
desc_parts.append("semantic: " + json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True))
desc_parts.append(f"garmin: kind={it.garmin_kind} type={it.garmin_type} subtype={it.garmin_subtype}")
desc_parts.append("semantic: " + safe_json(it.semantic_tags))
desc_parts.append(f"garmin: kind={sanitize_text(it.garmin_kind)} type={sanitize_text(it.garmin_type)} subtype={sanitize_text(it.garmin_subtype)}")
if it.duplicate_count > 1:
desc_parts.append(f"dedupe: merged {it.duplicate_count} records")
if it.source_files:
desc_parts.append("sources=" + ", ".join(it.source_files))
ET.SubElement(wpt, "desc").text = "\n".join(desc_parts)
desc_parts.append("sources=" + ", ".join(sanitize_text(s) for s in it.source_files))
xml_text(ET.SubElement(wpt, "desc"), "\n".join(desc_parts))
ext = ET.SubElement(wpt, "extensions")
ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg.icon
ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg.color
ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg.background
xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}icon"), cfg.icon)
xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}color"), cfg.color)
xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}background"), cfg.background)
write_xml(out_path, gpx)
def write_osm(items: list[Landmark], out_path: Path) -> None:
osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v2.py"})
osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v3.py"})
nid = -1
for it in items:
node = ET.SubElement(osm, "node", {"id": str(nid), "lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"})
nid -= 1
if it.name:
ET.SubElement(node, "tag", {"k": "name", "v": it.name})
ET.SubElement(node, "tag", {"k": "name", "v": sanitize_text(it.name)})
for k, v in sorted(it.semantic_tags.items()):
if v is None:
k2 = sanitize_text(k)
v2 = sanitize_text(v)
if not k2 or not v2:
continue
ET.SubElement(node, "tag", {"k": str(k), "v": str(v)})
ET.SubElement(node, "tag", {"k": "garmin:kind", "v": it.garmin_kind})
ET.SubElement(node, "tag", {"k": "garmin:type", "v": it.garmin_type})
ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": it.garmin_subtype})
ET.SubElement(node, "tag", {"k": k2, "v": v2})
ET.SubElement(node, "tag", {"k": "garmin:kind", "v": sanitize_text(it.garmin_kind)})
if it.garmin_type:
ET.SubElement(node, "tag", {"k": "garmin:type", "v": sanitize_text(it.garmin_type)})
if it.garmin_subtype:
ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": sanitize_text(it.garmin_subtype)})
if it.duplicate_count > 1:
ET.SubElement(node, "tag", {"k": "source:merge_count", "v": str(it.duplicate_count)})
if it.source_files:
ET.SubElement(node, "tag", {"k": "source:file", "v": ",".join(it.source_files)})
ET.SubElement(node, "tag", {"k": "source:file", "v": sanitize_text(",".join(it.source_files))})
write_xml(out_path, osm)
@@ -517,7 +591,7 @@ def write_summary(items: list[Landmark], groups: dict[str, GroupDefinition], out
w.writerow(["group_key", "group_name", "count", "examples"])
for key, value in sorted(counts.items()):
group = groups[key]
w.writerow([key, group.name, value, " || ".join(group.examples)])
w.writerow([sanitize_text(key), sanitize_text(group.name), value, " || ".join(sanitize_text(x) for x in group.examples)])
if out_json:
payload = {
key: {
@@ -546,7 +620,7 @@ def print_groups(groups: dict[str, GroupDefinition]) -> None:
def main(argv: Optional[list[str]] = None) -> int:
ap = argparse.ArgumentParser(
description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping and modular coordinate-based dedupe."
description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping, spread-out sampling, and modular coordinate-based dedupe."
)
ap.add_argument("csv", nargs="+", type=Path, help="Input landmark CSV files")
ap.add_argument("--gpx", type=Path, help="Write OsmAnd-friendly GPX waypoint overlay")
@@ -556,7 +630,7 @@ def main(argv: Optional[list[str]] = None) -> int:
ap.add_argument("--group-by", choices=["auto", "category", "type", "source"], default="auto", help="How GPX waypoint groups should be organized")
ap.add_argument("--show-groups", action="store_true", help="Print detected groups with a few examples")
ap.add_argument("--interactive-group-names", action="store_true", help="Interactively rename auto-detected groups")
ap.add_argument("--group-examples", type=int, default=3, help="How many example rows to show for each group")
ap.add_argument("--group-examples", type=int, default=3, help="How many spread-out example rows to show for each group")
ap.add_argument("--category", help="Keep only one inferred category, e.g. drinking_water, springs, settlements_village")
ap.add_argument("--filter-tag", action="append", default=[], help="Keep only landmarks whose semantic tags contain key=value")
ap.add_argument("--filter-kind", help="Keep only one Garmin kind, e.g. point")

View File

@@ -1,89 +1,59 @@
Done.
Found the two real problems.
First, your crash:
* `minidom.parseString(...)` was reparsing the entire generated XML in memory
* one of the landmark names or tag values contains an XML-illegal control character
* on a huge file, that blows up exactly like your `ExpatError: not well-formed (invalid token)`
Second, the sampling:
* the previous script was taking the first few examples in a group
* now it samples **spread out across the group**, so the examples are taken from different positions in the dataset instead of only the front
Updated script:
[landmarks_csv_to_osmand_v2.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v2.py)
[landmarks_csv_to_osmand_v3.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v3.py)
What changed:
* default dedupe is now **coordinate-based only**
* dedupe is modular:
* removed the fragile `minidom.parseString()` round-trip
* writes XML directly with `ElementTree`
* sanitizes text and tag values to strip XML-invalid control characters
* supports `.gz` outputs correctly
* group examples are now **spread-out samples**
* interactive group renaming still works, but now the shown examples are better distributed
* loader still accepts both raw landmark CSVs and slimmer derived CSVs
* `coord`
* `coord_name`
* `coord_type`
* `coord_name_type`
* `off`
* dedupe uses a meter radius instead of naive decimal rounding
* duplicates are **merged**, not just dropped
* merged records keep:
* best name
* richer semantic tags
* union of GPXSee classes
* source file list
* duplicate count
* grouping is improved:
* `--group-by auto`
* `--group-by category`
* `--group-by type`
* `--group-by source`
* automatic grouping now distinguishes things like:
* `amenity_drinking_water`
* `natural_spring`
* `place_village`
* fallback Garmin or GPXSee groups
* interactive group naming added:
* `--interactive-group-names`
* shows sample examples from each detected group
* lets you keep or rename the proposed group name
* loader is now tolerant of both:
* the raw landmark CSVs
* the slimmer per-category CSVs you are iterating through
* `.gpx.gz`, `.osm.gz`, `.json.gz`, `.csv.gz` now actually get gzip-written correctly
Useful commands:
Show detected groups with examples:
Use it instead of the old one:
```bash
python landmarks_csv_to_osmand_v2.py *.csv --show-groups
python landmarks_csv_to_osmand_v3.py *.csv --osm bgmountains_poi.osm.gz
```
Interactive rename flow:
If you want to inspect grouping before writing:
```bash
python landmarks_csv_to_osmand_v2.py *.csv --show-groups --interactive-group-names --group-by auto --gpx landmarks.gpx
python landmarks_csv_to_osmand_v3.py *.csv --show-groups --group-examples 5
```
Tighter coordinate dedupe:
If you want interactive naming with spread-out examples:
```bash
python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord --dedupe-radius-m 6 --gpx landmarks.gpx
python landmarks_csv_to_osmand_v3.py *.csv --show-groups --interactive-group-names --group-examples 5 --gpx landmarks.gpx
```
Coordinate dedupe, but only merge when type also matches:
If you want POI-only OSM for OsmAndMapCreator:
```bash
python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord_type --dedupe-radius-m 10 --gpx landmarks.gpx
python landmarks_csv_to_osmand_v3.py *.csv --osm landmarks.osm.gz
```
Water-only overlay:
Then:
```bash
python landmarks_csv_to_osmand_v2.py *.csv --filter-tag amenity=drinking_water --gpx drinking_water.gpx
java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" net.osmand.MainUtilities generate-poi C:\path\to\landmarks.osm.gz
```
Springs-only overlay:
The main fix is this: the script now treats dirty text safely instead of trusting all names/tags to already be valid XML.
```bash
python landmarks_csv_to_osmand_v2.py *.csv --filter-tag natural=spring --gpx springs.gpx
```
If you want the next revision, Id push it further in two directions:
* add **dedupe reports** showing which rows got merged into which canonical landmark
* add **interactive split/merge controls** for ambiguous groups, not just rename controls
If you want, the next refinement should be a `--debug-bad-rows` mode that writes out the original rows whose text had to be sanitized, so you can see exactly which source landmarks were malformed.

6
summary.csv Normal file
View File

@@ -0,0 +1,6 @@
group_key,group_name,count,examples
amenity_drinking_water,Вода,55724,"22.30130,43.70876 | 0x64/0x14 | {""amenity"": ""drinking_water""} || 22.41769,43.77090 | 0x64/0x14 | {""amenity"": ""drinking_water""} || 22.41846,43.77485 | 0x64/0x14 | {""amenity"": ""drinking_water""}"
gpxsee_poi,Убежище,19619,"22.33757,43.92231 | 0x66/0x00 || 22.33117,43.93072 | 0x66/0x00 || 22.27306,43.90484 | 0x64/0x0e"
natural_peak,Връх,26720,"t for sale | 21.89343,43.77614 | 0x66/0x16 | {""natural"": ""peak""} || 21.87644,43.77691 | 0x66/0x16 | {""natural"": ""peak""} || 21.87640,43.77691 | 0x66/0x16 | {""natural"": ""peak""}"
natural_volcano,Волкан,13,",45,60,N3 | 23.29994,42.81578 | 0x66/0x0e | {""natural"": ""volcano""} || дий | 24.51264,43.45196 | 0x66/0x0e | {""natural"": ""volcano""} || дий | 24.51264,43.45200 | 0x66/0x0e | {""natural"": ""volcano""}"
place_locality,Място,65580,"ука | 22.33143,43.90969 | 0x66/0x00 | {""place"": ""locality""} || ин рът | 22.31954,43.91115 | 0x66/0x00 | {""place"": ""locality""} || иткин рът | 22.31134,43.90703 | 0x66/0x00 | {""place"": ""locality""}"
1 group_key group_name count examples
2 amenity_drinking_water Вода 55724 22.30130,43.70876 | 0x64/0x14 | {"amenity": "drinking_water"} || 22.41769,43.77090 | 0x64/0x14 | {"amenity": "drinking_water"} || 22.41846,43.77485 | 0x64/0x14 | {"amenity": "drinking_water"}
3 gpxsee_poi Убежище 19619 22.33757,43.92231 | 0x66/0x00 || 22.33117,43.93072 | 0x66/0x00 || 22.27306,43.90484 | 0x64/0x0e
4 natural_peak Връх 26720 t for sale | 21.89343,43.77614 | 0x66/0x16 | {"natural": "peak"} || 21.87644,43.77691 | 0x66/0x16 | {"natural": "peak"} || 21.87640,43.77691 | 0x66/0x16 | {"natural": "peak"}
5 natural_volcano Волкан 13 ,45,60,N3 | 23.29994,42.81578 | 0x66/0x0e | {"natural": "volcano"} || дий | 24.51264,43.45196 | 0x66/0x0e | {"natural": "volcano"} || дий | 24.51264,43.45200 | 0x66/0x0e | {"natural": "volcano"}
6 place_locality Място 65580 ука | 22.33143,43.90969 | 0x66/0x00 | {"place": "locality"} || ин рът | 22.31954,43.91115 | 0x66/0x00 | {"place": "locality"} || иткин рът | 22.31134,43.90703 | 0x66/0x00 | {"place": "locality"}