From 3b1087c9ecc6db8090e7a2d5a1bf7eac776664dc Mon Sep 17 00:00:00 2001 From: nq Date: Wed, 15 Apr 2026 04:33:06 -0700 Subject: [PATCH] v.5.0.1 - actually adding them --- ...o_osmand_v4.py => garmin_img_to_osmand.py} | 571 ++++++++++++++---- stage-1-read-garmin-img/readme.md | 162 +++-- .../landmarks_csv_to_osmand.py | 226 ++++--- stage-2-parse-stage-1/readme.md | 94 +-- summary.csv | 6 + 5 files changed, 733 insertions(+), 326 deletions(-) rename stage-1-read-garmin-img/{garmin_img_to_osmand_v4.py => garmin_img_to_osmand.py} (77%) create mode 100644 summary.csv diff --git a/stage-1-read-garmin-img/garmin_img_to_osmand_v4.py b/stage-1-read-garmin-img/garmin_img_to_osmand.py similarity index 77% rename from stage-1-read-garmin-img/garmin_img_to_osmand_v4.py rename to stage-1-read-garmin-img/garmin_img_to_osmand.py index 43ad504..8326fbd 100644 --- a/stage-1-read-garmin-img/garmin_img_to_osmand_v4.py +++ b/stage-1-read-garmin-img/garmin_img_to_osmand.py @@ -93,6 +93,90 @@ def read_u32le(buf: bytes, off: int) -> int: return int.from_bytes(buf[off:off + 4], "little", signed=False) +def byte_size(val: int) -> int: + if val <= 0xFF: + return 1 + if val <= 0xFFFF: + return 2 + if val <= 0xFFFFFF: + return 3 + return 4 + + +def read_vuint32_fixed(data: bytes, pos: int, nbytes: int) -> Tuple[int, int]: + if nbytes < 1 or nbytes > 4 or pos + nbytes > len(data): + raise EOFError + return int.from_bytes(data[pos:pos + nbytes], "little", signed=False), pos + nbytes + + +def read_vuint32_auto(data: bytes, pos: int) -> Tuple[int, int]: + if pos >= len(data): + raise EOFError + b = data[pos] + pos += 1 + if (b & 1) == 0: + if (b & 2) == 0: + nbytes = (((b >> 2) & 1) ^ 3) + shift = 5 + else: + shift = 6 + nbytes = 1 + else: + shift = 7 + nbytes = 0 + val = b >> (8 - shift) + for i in range(1, nbytes + 1): + if pos >= len(data): + raise EOFError + b = data[pos] + pos += 1 + val |= ((b << (i * 8)) >> (8 - shift)) + return val, pos + + +def read_vbitfield32(data: bytes, pos: int) -> Tuple[int, int]: + if pos >= len(data): + raise EOFError + bits = data[pos] + if not (bits & 1): + if not ((bits >> 1) & 1): + if not ((bits >> 2) & 1): + if pos + 4 > len(data): + raise EOFError + bitfield = read_u32le(data, pos) >> 3 + pos += 4 + else: + if pos + 3 > len(data): + raise EOFError + bitfield = read_u24le(data, pos) >> 3 + pos += 3 + else: + if pos + 2 > len(data): + raise EOFError + bitfield = read_u16le(data, pos) >> 2 + pos += 2 + else: + bitfield = bits >> 1 + pos += 1 + return bitfield, pos + + +def guess_blob_ext(blob: bytes) -> str: + if blob.startswith(b"\x89PNG\r\n\x1a\n"): + return ".png" + if blob.startswith(b"\xff\xd8\xff"): + return ".jpg" + if blob.startswith(b"GIF87a") or blob.startswith(b"GIF89a"): + return ".gif" + if blob.startswith(b"BM"): + return ".bmp" + if blob.startswith(b"II*\x00") or blob.startswith(b"MM\x00*"): + return ".tif" + if blob.startswith(b"RIFF") and blob[8:12] == b"WEBP": + return ".webp" + return ".bin" + + def to_deg(coord: int) -> float: return coord * COORD_FACTOR @@ -268,90 +352,184 @@ class LBL: def __init__(self, data: Optional[bytes]): self.data = data or b"" self.ok = bool(data) - self.data_offset = 0 - self.data_length = 0 + self.header_length = 0 + self.base_offset = 0 + self.base_size = 0 + self.shift = 0 self.data_offset_multiplier = 1 self.label_coding = 6 self.codepage = 1252 + self.poi_offset = 0 + self.poi_size = 0 + self.poi_shift = 0 + self._rasters: List[Tuple[int, int]] = [] + self._img_offset = 0 + self._img_size = 0 if self.ok: self._parse_header() def _parse_header(self) -> None: - header_length = read_u16le(self.data, 0) - self.data_offset = read_u32le(self.data, 0x15) - self.data_length = read_u32le(self.data, 0x19) - self.data_offset_multiplier = 1 << self.data[0x1D] - self.label_coding = self.data[0x1E] + self.header_length = read_u16le(self.data, 0) + self.base_offset = read_u32le(self.data, 0x15) + self.base_size = read_u32le(self.data, 0x19) + self.shift = self.data[0x1D] if len(self.data) > 0x1D else 0 + self.data_offset_multiplier = 1 << self.shift + self.label_coding = self.data[0x1E] if len(self.data) > 0x1E else 6 + if len(self.data) >= 0x5C: + self.poi_offset = read_u32le(self.data, 0x57) + self.poi_size = read_u32le(self.data, 0x5B) + self.poi_shift = self.data[0x5F] if len(self.data) > 0x5F else 0 if len(self.data) >= 0xAC: self.codepage = read_u16le(self.data, 0xAA) + if self.header_length >= 0x19A and len(self.data) >= 0x19C: + table_offset = read_u32le(self.data, 0x184) + table_size = read_u32le(self.data, 0x188) + record_size = read_u16le(self.data, 0x18C) + self._img_offset = read_u32le(self.data, 0x194) + self._img_size = read_u32le(self.data, 0x198) + self._load_raster_table(table_offset, table_size, record_size) - def get_label(self, offset: int) -> str: + def _load_raster_table(self, offset: int, size: int, record_size: int) -> None: + if not (offset and size and record_size): + return + if offset < 0 or offset >= len(self.data) or record_size > 4: + return + img_count = size // record_size + if img_count <= 0 or self._img_offset <= 0 or self._img_offset >= len(self.data): + return + try: + prev, pos = read_vuint32_fixed(self.data, offset, record_size) + rasters: List[Tuple[int, int]] = [] + for _ in range(1, img_count): + cur, pos = read_vuint32_fixed(self.data, pos, record_size) + if cur < prev: + return + rasters.append((prev, cur - prev)) + prev = cur + remaining = max(0, min(self._img_size, len(self.data) - self._img_offset) - prev) + rasters.append((prev, remaining)) + self._rasters = rasters + except Exception: + self._rasters = [] + + def has_images(self) -> bool: + return bool(self._rasters) + + def image_count(self) -> int: + return len(self._rasters) + + def get_image(self, image_id: int) -> bytes: + if image_id < 0 or image_id >= len(self._rasters): + return b'' + off, size = self._rasters[image_id] + start = self._img_offset + off + end = start + size + if start < 0 or end > len(self.data) or start >= end: + return b'' + return self.data[start:end] + + @staticmethod + def _sanitize_text(s: str) -> str: + s = ''.join(ch for ch in s if ord(ch) >= 0x20 or ch in '\t\n\r') + return s.strip().replace('\x00', '') + + @staticmethod + def _is_all_upper_case(s: str) -> bool: + if not s: + return False + found = False + for ch in s: + if ch.isalpha(): + found = True + if not ch.isupper(): + return False + return found + + @staticmethod + def _capitalized(s: str) -> str: + if not s: + return s + out = [s[0]] + for i in range(1, len(s)): + prev = s[i - 1] + c = s[i] + out.append(c if (prev.isspace() or prev in '(\"') else c.lower()) + return ''.join(out) + + def _postprocess(self, text: str, capitalize: bool = True) -> str: + text = self._sanitize_text(text) + if capitalize and self._is_all_upper_case(text): + text = self._capitalized(text) + return text + + def get_label(self, offset: int, poi: bool = False, capitalize: bool = True) -> str: if not self.ok or offset == 0: return "" - actual = self.data_offset + offset * self.data_offset_multiplier + if poi: + entry = self.poi_offset + (offset << self.poi_shift) + if entry < 0 or entry + 3 > len(self.data): + return "" + poi_ptr = read_u24le(self.data, entry) & 0x3FFFFF + actual = self.base_offset + (poi_ptr << self.shift) + else: + actual = self.base_offset + (offset << self.shift) if actual < 0 or actual >= len(self.data): return "" if self.label_coding == 6: - return self._get_label6(actual) - return self._get_label8_10(actual) + return self._get_label6(actual, capitalize=capitalize) + return self._get_label8_10(actual, capitalize=capitalize) - def _get_label8_10(self, off: int) -> str: + def _decode_bytes(self, raw: bytes) -> str: + cp = self.codepage + if cp in (0, 850): + enc = 'cp1252' + elif cp == 65001: + enc = 'utf-8' + elif cp == 932: + enc = 'cp932' + elif cp == 950: + enc = 'big5' + else: + enc = f'cp{cp}' + try: + return raw.decode(enc, errors='replace') + except Exception: + return raw.decode('latin1', errors='replace') + + def _get_label8_10(self, off: int, capitalize: bool = True) -> str: end = off while end < len(self.data) and self.data[end] != 0: end += 1 raw = self.data[off:end] - enc = None - cp = self.codepage - if cp in (0, 850): - enc = "cp1252" - elif cp == 65001: - enc = "utf-8" - elif cp == 932: - enc = "cp932" - elif cp == 950: - enc = "big5" - else: - enc = f"cp{cp}" - try: - return raw.decode(enc, errors="replace") - except Exception: - return raw.decode("latin1", errors="replace") + return self._postprocess(self._decode_bytes(raw), capitalize=capitalize) - def _get_label6(self, off: int) -> str: + def _get_label6(self, off: int, capitalize: bool = True) -> str: out: List[str] = [] - charset = "NORMAL" + charset = 'NORMAL' pos = off while pos + 3 <= len(self.data): b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2] pos += 3 - codes = [ - b1 >> 2, - ((b1 & 0x3) << 4) | (b2 >> 4), - ((b2 & 0xF) << 2) | (b3 >> 6), - b3 & 0x3F, - ] + codes = [b1 >> 2, ((b1 & 0x3) << 4) | (b2 >> 4), ((b2 & 0xF) << 2) | (b3 >> 6), b3 & 0x3F] for c in codes: - if c > 0x2F: - return "".join(out).strip() - if charset == "NORMAL": + if c > 0x2F or (charset == 'NORMAL' and c == 0x1D): + return self._postprocess(''.join(out), capitalize=capitalize) + if charset == 'NORMAL': if c == 0x1C: - charset = "SYMBOL" + charset = 'SYMBOL' elif c == 0x1B: - charset = "SPECIAL" - elif c == 0x1D: - out.append("|") + charset = 'SPECIAL' elif c in (0x1E, 0x1F): - out.append(" ") + out.append(' ') else: out.append(self.NORMAL_CHARS[c]) - elif charset == "SYMBOL": + elif charset == 'SYMBOL': out.append(self.SYMBOL_CHARS[c]) - charset = "NORMAL" + charset = 'NORMAL' else: out.append(self.SPECIAL_CHARS[c]) - charset = "NORMAL" - return "".join(out).strip() - + charset = 'NORMAL' + return self._postprocess(''.join(out), capitalize=capitalize) # ------------------------- # TRE parser @@ -628,6 +806,86 @@ class RGN: self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0 self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0 self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0 + self.polygons_gbl_flags = 0 + self.polygons_lcl_flags = [0, 0, 0] + self.lines_gbl_flags = 0 + self.lines_lcl_flags = [0, 0, 0] + self.points_gbl_flags = 0 + self.points_lcl_flags = [0, 0, 0] + if self.header_length >= 0x71 and len(data) >= 0x71: + try: + self.polygons_gbl_flags = read_u32le(data, 0x29) + self.polygons_lcl_flags = [read_u32le(data, 0x2D), read_u32le(data, 0x31), read_u32le(data, 0x35)] + self.lines_gbl_flags = read_u32le(data, 0x45) + self.lines_lcl_flags = [read_u32le(data, 0x49), read_u32le(data, 0x4D), read_u32le(data, 0x51)] + self.points_gbl_flags = read_u32le(data, 0x61) + self.points_lcl_flags = [read_u32le(data, 0x65), read_u32le(data, 0x69), read_u32le(data, 0x6D)] + except Exception: + pass + self.segment_stats: Counter[str] = Counter() + self.segment_errors: List[Dict[str, object]] = [] + + def _skip_global_fields(self, pos: int, flags: int) -> int: + cnt = 0 + while flags: + cnt += (flags & 3) + flags >>= 2 + return min(len(self.data), pos + cnt) + + def _skip_class_fields(self, pos: int) -> int: + if pos >= len(self.data): + return pos + flags = self.data[pos] + pos += 1 + size_mode = flags >> 5 + if size_mode == 4: + rs = 1 + elif size_mode == 5: + rs = 2 + elif size_mode == 6: + rs = 3 + elif size_mode == 7: + try: + rs, pos = read_vuint32_auto(self.data, pos) + except Exception: + return pos + else: + rs = 0 + return min(len(self.data), pos + rs) + + def _parse_local_fields_image(self, pos: int, flags: List[int]) -> Tuple[int, Optional[int]]: + image_id: Optional[int] = None + bitfield = 0xFFFFFFFF + if flags[0] & 0x20000000: + try: + bitfield, pos = read_vbitfield32(self.data, pos) + except Exception: + return pos, image_id + j = 0 + for i in range(29): + if (flags[0] >> i) & 1: + if bitfield & 1: + m = (flags[(j >> 4) + 1] >> ((j * 2) & 0x1E)) & 3 + if m == 3: + try: + size, pos = read_vuint32_auto(self.data, pos) + except Exception: + return pos, image_id + else: + size = m + 1 + off = pos + if i == 3: + try: + if size == 1 and pos + 1 <= len(self.data): + image_id = self.data[pos] + elif size == 2 and pos + 2 <= len(self.data): + image_id = read_u16le(self.data, pos) + except Exception: + pass + pos = min(len(self.data), off + size) + bitfield >>= 1 + j += 1 + return pos, image_id def data_end(self) -> int: return self.data_length @@ -758,61 +1016,77 @@ class RGN: feats: List[Feature] = [] pos, end = seg while pos < end and pos + 8 <= len(self.data): - typ = self.data[pos] - info24 = read_u24le(self.data, pos + 1) - has_subtype = bool(info24 & 0x800000) - is_poi = bool(info24 & 0x400000) - lbl_off = info24 & 0x3FFFFF - lon_delta = read_s16le(self.data, pos + 4) - lat_delta = read_s16le(self.data, pos + 6) - pos += 8 - subtype = 0 - if has_subtype and pos < end: - subtype = self.data[pos] - pos += 1 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ - "garmin_kind": "indexed_point" if indexed else "point", - "garmin_type": f"0x{typ:02x}", - "garmin_subtype": f"0x{subtype:02x}", - "garmin_is_poi": is_poi, - "name": name, - }, - )) + try: + typ = self.data[pos] + info24 = read_u24le(self.data, pos + 1) + has_subtype = bool(info24 & 0x800000) + is_poi = bool(info24 & 0x400000) + lbl_off = info24 & 0x3FFFFF + lon_delta = read_s16le(self.data, pos + 4) + lat_delta = read_s16le(self.data, pos + 6) + pos += 8 + subtype = 0 + if has_subtype and pos < end: + subtype = self.data[pos] + pos += 1 + name = self.lbl.get_label(lbl_off, poi=is_poi, capitalize=not (0x1400 <= (typ << 8 | subtype) <= 0x153F)) if lbl_off else "" + lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) + lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) + feats.append(Feature( + geom_type="Point", + coords=[lon, lat], + props={ + "garmin_kind": "indexed_point" if indexed else "point", + "garmin_type": f"0x{typ:02x}", + "garmin_subtype": f"0x{subtype:02x}", + "garmin_is_poi": is_poi, + "name": name, + }, + )) + except Exception as e: + self.segment_errors.append({"segment": "point", "subdivision": sub.index, "offset": pos, "error": str(e)}) + break + self.segment_stats['point' if not indexed else 'indexed_point'] += len(feats) return feats def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]: feats: List[Feature] = [] pos, end = seg while pos < end and pos + 6 <= len(self.data): - typ = self.data[pos] - subtype_raw = self.data[pos + 1] - has_lbl = bool(subtype_raw & 0x20) - subtype = subtype_raw % 32 - full_type = ((typ + 0x100) << 8) + subtype - lon_delta = read_s16le(self.data, pos + 2) - lat_delta = read_s16le(self.data, pos + 4) - pos += 6 - lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 - if has_lbl: - pos += 3 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ + try: + typ = self.data[pos] + subtype_raw = self.data[pos + 1] + has_lbl = bool(subtype_raw & 0x20) + subtype = subtype_raw % 32 + full_type = ((typ + 0x100) << 8) + subtype + lon_delta = read_s16le(self.data, pos + 2) + lat_delta = read_s16le(self.data, pos + 4) + pos += 6 + lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 + if has_lbl: + pos += 3 + if subtype_raw & 0x80: + pos = self._skip_class_fields(pos) + image_id = None + if subtype_raw & 0x40: + pos, image_id = self._parse_local_fields_image(pos, self.points_lcl_flags) + if self.points_gbl_flags: + pos = self._skip_global_fields(pos, self.points_gbl_flags) + name = self.lbl.get_label(lbl_off) if lbl_off else "" + lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) + lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) + props = { "garmin_kind": "extended_point", "garmin_type": f"0x{full_type:04x}", "name": name, - }, - )) + } + if image_id is not None: + props["garmin_image_id"] = image_id + feats.append(Feature(geom_type="Point", coords=[lon, lat], props=props)) + except Exception as e: + self.segment_errors.append({"segment": "extended_point", "subdivision": sub.index, "offset": pos, "error": str(e)}) + break + self.segment_stats['extended_point'] += len(feats) return feats def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]: @@ -933,9 +1207,10 @@ class RGN: "name": name, }, )) - except Exception: - # Stop current segment on malformed data instead of crashing the whole file. + except Exception as e: + self.segment_errors.append({"segment": ("extended_" if extended else "") + ("polyline" if line else "polygon"), "subdivision": sub.index, "offset": pos, "error": str(e)}) break + self.segment_stats[("extended_" if extended else "") + ("polyline" if line else "polygon")] += len(feats) return feats @@ -1358,11 +1633,34 @@ def _node_key(lon: float, lat: float) -> Tuple[int, int]: return (int(round(lon * 1e7)), int(round(lat * 1e7))) -def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple[List[Feature], Dict[str, object]]: +def dump_lbl_images(lbl: LBL, mapset_name: str, outdir: Path) -> Dict[int, str]: + mapping: Dict[int, str] = {} + if not lbl.has_images(): + return mapping + target = outdir / mapset_name + target.mkdir(parents=True, exist_ok=True) + for image_id in range(lbl.image_count()): + blob = lbl.get_image(image_id) + if not blob: + continue + ext = guess_blob_ext(blob) + name = f'image_{image_id:05d}{ext}' + (target / name).write_bytes(blob) + mapping[image_id] = str(Path(mapset_name) / name) + return mapping + + +def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes], extract_images_dir: Optional[Path] = None) -> Tuple[List[Feature], Dict[str, object]]: tre = TRE(subfiles['TRE']) lbl = LBL(subfiles.get('LBL')) rgn = RGN(subfiles['RGN'], tre=tre, lbl=lbl) features = rgn.parse_features() + image_files = dump_lbl_images(lbl, mapset_name, extract_images_dir) if extract_images_dir else {} + for f in features: + f.props['mapset'] = mapset_name + image_id = f.props.get('garmin_image_id') + if image_id is not None and image_id in image_files: + f.props['garmin_image_file'] = image_files[image_id] meta = { 'mapset': mapset_name, 'bounds_wgs84': { @@ -1373,6 +1671,11 @@ def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple }, 'feature_count': len(features), 'levels': {lvl: {'bits_per_coord': li.bits_per_coord, 'inherited': li.inherited} for lvl, li in tre.levels.items()}, + 'segment_stats': dict(rgn.segment_stats), + 'segment_error_count': len(rgn.segment_errors), + 'segment_errors_preview': rgn.segment_errors[:20], + 'embedded_image_count': lbl.image_count(), + 'dumped_images': image_files, } return features, meta @@ -1477,7 +1780,7 @@ def write_osm(features: List[Feature], path: Path, semantic: bool = True) -> Non def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] = None, bbox: Optional[Tuple[float, float, float, float]] = None, - semantic: bool = True) -> Dict[str, object]: + semantic: bool = True, extract_images_dir: Optional[Path] = None) -> Dict[str, object]: raw = img_path.read_bytes() container = ImgContainer(raw) all_sets = _all_mapsets(container.files) @@ -1493,7 +1796,7 @@ def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] for name, subs in all_sets.items(): if selected and name.upper() not in selected: continue - feats, meta = parse_mapset_features(name, subs) + feats, meta = parse_mapset_features(name, subs, extract_images_dir=extract_images_dir) if bbox is not None: feats = [f for f in feats if _intersects_bbox(f, bbox)] meta['feature_count_after_bbox'] = len(feats) @@ -1518,6 +1821,7 @@ def load_features_from_img( img_path: Path, mapsets: Optional[List[str]] = None, bbox: Optional[Tuple[float, float, float, float]] = None, + extract_images_dir: Optional[Path] = None, ) -> Tuple[List[Feature], Dict[str, object]]: raw = img_path.read_bytes() container = ImgContainer(raw) @@ -1528,7 +1832,7 @@ def load_features_from_img( for name, subs in all_sets.items(): if selected and name.upper() not in selected: continue - feats, meta = parse_mapset_features(name, subs) + feats, meta = parse_mapset_features(name, subs, extract_images_dir=extract_images_dir) if bbox is not None: feats = [f for f in feats if _intersects_bbox(f, bbox)] meta['feature_count_after_bbox'] = len(feats) @@ -1630,19 +1934,22 @@ def _feature_point_row(f: Feature) -> Dict[str, object]: sem = semantic_tags_for_feature(f) lon, lat = f.coords return { + 'mapset': f.props.get('mapset', ''), 'lon': lon, 'lat': lat, 'name': sem.get('name', ''), 'garmin_kind': f.props.get('garmin_kind', ''), 'garmin_type': f.props.get('garmin_type', ''), 'garmin_subtype': f.props.get('garmin_subtype', ''), + 'garmin_image_id': f.props.get('garmin_image_id', ''), + 'garmin_image_file': f.props.get('garmin_image_file', ''), 'semantic_tags': sem, 'gpxsee_classes': gpxsee_classes_for_feature(f), } def write_landmarks_csv(features: List[Feature], path: Path) -> None: - fields = ['lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'gpxsee_classes_json', 'semantic_tags_json'] + fields = ['mapset', 'lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'garmin_image_id', 'garmin_image_file', 'gpxsee_classes_json', 'semantic_tags_json'] if str(path).lower().endswith('.gz'): fh = gzip.open(path, 'wt', encoding='utf-8', newline='') else: @@ -1653,12 +1960,15 @@ def write_landmarks_csv(features: List[Feature], path: Path) -> None: for f in features: row = _feature_point_row(f) w.writerow({ + 'mapset': row['mapset'], 'lon': f'{row["lon"]:.8f}', 'lat': f'{row["lat"]:.8f}', 'name': row['name'], 'garmin_kind': row['garmin_kind'], 'garmin_type': row['garmin_type'], 'garmin_subtype': row['garmin_subtype'], + 'garmin_image_id': row['garmin_image_id'], + 'garmin_image_file': row['garmin_image_file'], 'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False), 'semantic_tags_json': json.dumps(row['semantic_tags'], ensure_ascii=False, sort_keys=True), }) @@ -1672,10 +1982,13 @@ def write_landmarks_geojson(features: List[Feature], path: Path) -> None: for f in features: row = _feature_point_row(f) props = { + 'mapset': row['mapset'], 'name': row['name'], 'garmin_kind': row['garmin_kind'], 'garmin_type': row['garmin_type'], 'garmin_subtype': row['garmin_subtype'], + 'garmin_image_id': row['garmin_image_id'], + 'garmin_image_file': row['garmin_image_file'], 'gpxsee_classes': ','.join(row['gpxsee_classes']), } props.update(row['semantic_tags']) @@ -1707,6 +2020,46 @@ def print_feature_type_table(features: List[Feature], point_only: bool = False) str(row['sample_name']), ])) + +def image_group_rows(features: List[Feature]) -> List[Dict[str, object]]: + groups: Dict[Tuple[str, str], Dict[str, object]] = {} + for f in features: + image_id = f.props.get('garmin_image_id') + if image_id is None: + continue + key = (str(f.props.get('mapset') or ''), str(image_id)) + g = groups.setdefault(key, { + 'mapset': key[0], + 'garmin_image_id': image_id, + 'count': 0, + 'sample_name': '', + 'sample_type': str(f.props.get('garmin_type') or ''), + 'sample_subtype': str(f.props.get('garmin_subtype') or ''), + 'garmin_image_file': str(f.props.get('garmin_image_file') or ''), + }) + g['count'] += 1 + if not g['sample_name']: + g['sample_name'] = str(f.props.get('name') or semantic_tags_for_feature(f).get('name') or '') + return sorted(groups.values(), key=lambda r: (-r['count'], r['mapset'], int(r['garmin_image_id']))) + + +def print_image_group_table(features: List[Feature]) -> None: + rows = image_group_rows(features) + print('mapset garmin_image_id count garmin_image_file sample_type sample_subtype sample_name') + for r in rows: + print(' '.join([str(r['mapset']), str(r['garmin_image_id']), str(r['count']), str(r['garmin_image_file']), str(r['sample_type']), str(r['sample_subtype']), str(r['sample_name'])])) + + +def write_image_groups_csv(features: List[Feature], path: Path) -> None: + rows = image_group_rows(features) + fields = ['mapset','garmin_image_id','count','garmin_image_file','sample_type','sample_subtype','sample_name'] + fh = gzip.open(path, 'wt', encoding='utf-8', newline='') if str(path).lower().endswith('.gz') else open(path, 'w', encoding='utf-8', newline='') + with fh: + w = csv.DictWriter(fh, fieldnames=fields) + w.writeheader() + for r in rows: + w.writerow(r) + def main() -> int: ap = argparse.ArgumentParser(description='Extract vector features from a Garmin IMG and export GeoJSON / OSM XML suitable for further conversion to OsmAnd .obf.') ap.add_argument('img', type=Path, help='Input Garmin .img file') @@ -1722,6 +2075,9 @@ def main() -> int: ap.add_argument('--landmark-types-json', type=Path, help='Export landmark type summary table to JSON or JSON.GZ') ap.add_argument('--landmarks-csv', type=Path, help='Export exact-coordinate point landmarks to CSV or CSV.GZ') ap.add_argument('--landmarks-geojson', type=Path, help='Export exact-coordinate point landmarks to GeoJSON or GeoJSON.GZ') + ap.add_argument('--list-image-groups', action='store_true', help='List extracted point image/icon groups with counts') + ap.add_argument('--image-groups-csv', type=Path, help='Export extracted point image/icon groups to CSV or CSV.GZ') + ap.add_argument('--extract-images-dir', type=Path, help='Best-effort dump of embedded Garmin image blobs by mapset') ap.add_argument('--category', action='append', help='Filter landmarks/features by semantic category: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys') ap.add_argument('--filter-kind', action='append', help='Filter by garmin kind, e.g. point, indexed_point, extended_point, polyline') ap.add_argument('--filter-type', action='append', help='Filter by Garmin type hex string, e.g. 0x64') @@ -1739,7 +2095,7 @@ def main() -> int: print(f'{name}\t{to_deg(tre.west):.6f},{to_deg(tre.south):.6f},{to_deg(tre.east):.6f},{to_deg(tre.north):.6f}') return 0 - if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson: + if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.list_image_groups and not args.image_groups_csv: ap.error('provide at least one export/list option or use --list-mapsets') bbox = _parse_bbox(args.bbox) @@ -1747,7 +2103,7 @@ def main() -> int: # Fast streaming OSM path when no feature post-filtering is requested. if args.osm and not args.geojson and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.category and not args.filter_kind and not args.filter_type and not args.filter_subtype and not args.filter_tag and not args.gpxsee_class and not args.named_only: - meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only) + meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only, extract_images_dir=args.extract_images_dir) info(f'parsed {meta.get("feature_count", 0)} features from {len(meta.get("mapsets", []))} mapsets') info(f'wrote OSM XML: {args.osm}') if args.meta_json: @@ -1755,7 +2111,7 @@ def main() -> int: info(f'wrote metadata: {args.meta_json}') return 0 - features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox) + features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox, extract_images_dir=args.extract_images_dir) info(f'parsed {len(features)} features from {len(meta.get("mapsets", []))} mapsets') filtered = [ @@ -1773,6 +2129,11 @@ def main() -> int: ) ] + if args.list_image_groups: + print_image_group_table(filtered) + if args.image_groups_csv: + write_image_groups_csv(filtered, args.image_groups_csv) + info(f'wrote image groups CSV: {args.image_groups_csv}') if args.list_feature_types: print_feature_type_table(filtered, point_only=False) if args.list_landmark_types: diff --git a/stage-1-read-garmin-img/readme.md b/stage-1-read-garmin-img/readme.md index 9a9023d..2941415 100644 --- a/stage-1-read-garmin-img/readme.md +++ b/stage-1-read-garmin-img/readme.md @@ -1,115 +1,111 @@ -I kept pushing it. +I reworked the extractor around the weak spots you pointed out and validated it against the uploaded `gmapsupp.img`, using GPXSee’s Garmin IMG support and source tree as the reference model for the label and local-field paths. GPXSee officially supports Garmin IMG/GMAP maps, and its public source is the right place to mirror parsing behavior from. ([gpxsee.org][1]) -The useful step here was not trying to turn GPXSee into an exporter wholesale, but using it as a reference model for Garmin object classification and parser structure while keeping the Python extractor focused on export. GPXSee does support Garmin IMG/GMAP offline maps, and OsmAndMapCreator’s documented shell modes include `generate-obf`, `generate-map`, `generate-poi`, and `generate-roads`, so we can separate “feature extraction works” from “routing index is still unstable.” ([gpxsee.org][1]) +Updated script: +[garmin_img_to_osmand_v5.py](sandbox:/mnt/data/garmin_img_to_osmand_v5.py) -I built a new revision here: +What changed: -[garmin_img_to_osmand_v4.py](sandbox:/mnt/data/garmin_img_to_osmand_v4.py) +* **better name extraction** -What changed in v4: + * fixed the big one: standard point labels now respect the Garmin **POI indirection bit**, instead of always treating the label offset as a direct string pointer + * codepage handling is kept for your map’s Cyrillic labels + * uppercase labels are normalized more cleanly + * control-character cleanup added +* **all mapsets / segments** -* exact-coordinate point landmark export is now a first-class path -* unique landmark type summaries can be exported to CSV and JSON -* GPXSee-style class predicates are folded in as an additional taxonomy layer -* water sources can be exported directly as CSV or GeoJSON -* filtering now works by semantic tag, Garmin type/subtype, and GPXSee-style class names + * still traverses all mapsets + * now records **segment stats** and **segment error previews** into metadata so you can see where parsing is still rough +* **image/icon grouping** -Sample outputs from your uploaded `02335140` mapset: + * extended points can now expose `garmin_image_id` when present in local fields + * landmark CSV/GeoJSON now includes: -* [water_sources_02335140.csv](sandbox:/mnt/data/water_sources_02335140.csv) -* [water_sources_02335140.geojson](sandbox:/mnt/data/water_sources_02335140.geojson) -* [landmark_types_02335140.csv](sandbox:/mnt/data/landmark_types_02335140.csv) -* [landmark_types_02335140.json](sandbox:/mnt/data/landmark_types_02335140.json) + * `mapset` + * `garmin_image_id` + * `garmin_image_file` + * best-effort embedded image dumping is supported with `--extract-images-dir` + * image grouping can be listed/exported with: -On that sample mapset, the current parser found 4 exact-coordinate water-source points, all of Garmin type `0x64` subtype `0x14`, which the current semantic layer maps to `amenity=drinking_water`. + * `--list-image-groups` + * `--image-groups-csv` +* **validation against your upload** + + * the broken/truncated names improved a lot on the tested mapset + * examples that now decode sensibly include: + + * `Полски ясен` + * `Андзова чешма` + * `Дубрава (310)` + * `Св. Димитър` + * `Паметник на Мико Нинов` + +A concrete test I ran on your uploaded mapset `02234010`: + +* parsed `52686` features +* water-source landmark export produced `1404` point rows +* segment stats reported parsed points/polylines/polygons instead of only points Use it like this. -List all exact-coordinate landmark types for a tile: +Better landmark CSV with mapset + image fields: -```bash id="51801" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --mapset 02335140 ^ +```bash +python garmin_img_to_osmand_v5.py gmapsupp.img ^ + --mapset 02234010 ^ + --category water_sources ^ + --landmarks-csv water_sources.csv ^ + --meta-json water_sources.meta.json +``` + +List landmark types with better names: + +```bash +python garmin_img_to_osmand_v5.py gmapsupp.img ^ + --mapset 02234010 ^ --list-landmark-types ``` -Export the landmark type summary in machine-readable form: +List image/icon groups: -```bash id="51802" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --mapset 02335140 ^ - --landmark-types-csv landmark_types.csv ^ - --landmark-types-json landmark_types.json +```bash +python garmin_img_to_osmand_v5.py gmapsupp.img ^ + --mapset 02234010 ^ + --list-image-groups ``` -Export all exact-coordinate point landmarks: +Export image/icon group summary: -```bash id="51803" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --mapset 02335140 ^ - --landmarks-csv points.csv ^ - --landmarks-geojson points.geojson +```bash +python garmin_img_to_osmand_v5.py gmapsupp.img ^ + --mapset 02234010 ^ + --image-groups-csv image_groups.csv ``` -Export only water sources: +Best-effort dump embedded Garmin image blobs: -```bash id="51804" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --mapset 02335140 ^ - --category water_sources ^ - --landmarks-csv water_sources.csv ^ - --landmarks-geojson water_sources.geojson +```bash +python garmin_img_to_osmand_v5.py gmapsupp.img ^ + --mapset 02234010 ^ + --extract-images-dir dumped_images ^ + --landmarks-csv landmarks.csv ``` -Filter by semantic tag directly: +The landmark CSV now has these extra columns: -```bash id="51805" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --filter-tag amenity=drinking_water ^ - --landmarks-csv drinking_water.csv -``` +* `mapset` +* `garmin_image_id` +* `garmin_image_file` -Filter by raw Garmin type: +So even if a real image blob is not recoverable for a given point, you can still group by image id or by dumped image file when present. -```bash id="51806" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --filter-kind point ^ - --filter-type 0x64 ^ - --filter-subtype 0x14 ^ - --landmarks-csv type_64_14.csv -``` +One important note: on the tested mapset, I did **not** see a rich population of extracted image-backed point icons yet. The support is there now, but this particular IMG seems to benefit much more from the **fixed label path** than from embedded-image extraction. So the image grouping path is now available, but the biggest win on your sample was clearly the label repair. -Filter by GPXSee-style class: +What is still not fully okay: -```bash id="51807" -python garmin_img_to_osmand_v4.py gmapsupp.img ^ - --gpxsee-class water_line ^ - --geojson water_lines.geojson -``` +* some segment errors remain, and the script now reports them in metadata instead of silently hiding them +* extended/local/class Garmin fields are still only partially decoded +* raw embedded icon extraction is best-effort, not guaranteed for every IMG flavor -The routing crash you posted is still a separate layer. The good news is that your later log proved the map index itself is no longer empty; the failure is in OsmAnd’s route-connectivity post-processing, not in basic OSM loading. Until the road topology export is cleaner, use: - -```bash id="51808" -java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^ - net.osmand.MainUtilities generate-map C:\path\to\out.osm.gz -``` - -or: - -```bash id="51809" -java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" ^ - net.osmand.MainUtilities generate-poi C:\path\to\out.osm.gz -``` - -Those modes are explicitly supported by OsmAndMapCreator’s documented utility commands. ([gpxsee.org][1]) - -What still needs more reverse engineering: - -* better label decoding, especially where names are clearly truncated or mangled -* fuller support for GPXSee-style extended/local/class fields on marine and special points -* safer road export so `generate-obf` can survive the route phase -* possibly Huffman-backed text/object decoding for maps that use those sections - -The next high-value target is the label path and extended point metadata, because that improves both landmark names and water-source extraction quality without waiting for full routing stability. +The next best refinement is to push one more level into GPXSee-style local/class field parsing so more special POIs get richer metadata instead of only names and coordinates. [1]: https://www.gpxsee.org/doc "https://www.gpxsee.org/doc" diff --git a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py index d85e572..77a507d 100644 --- a/stage-2-parse-stage-1/landmarks_csv_to_osmand.py +++ b/stage-2-parse-stage-1/landmarks_csv_to_osmand.py @@ -6,13 +6,13 @@ import csv import gzip import json import math +import re import sys import xml.etree.ElementTree as ET from collections import defaultdict from dataclasses import dataclass, field from pathlib import Path -from typing import Callable, Iterable, Optional -from xml.dom import minidom +from typing import Iterable, Optional OSMAND_NS = "https://osmand.net" GPX_NS = "http://www.topografix.com/GPX/1/1" @@ -20,6 +20,11 @@ ET.register_namespace("osmand", OSMAND_NS) EARTH_M_PER_DEG_LAT = 111_320.0 +# XML 1.0 valid chars: tab, CR, LF, and U+0020..U+D7FF, U+E000..U+FFFD, U+10000..U+10FFFF +_XML_INVALID_RE = re.compile( + r"[\x00-\x08\x0B\x0C\x0E-\x1F\uD800-\uDFFF\uFFFE\uFFFF]" +) + @dataclass class Landmark: @@ -82,6 +87,20 @@ DEFAULT_GROUPS = { } +def sanitize_text(value: object) -> str: + if value is None: + return "" + text = str(value) + # normalize newlines, remove NULs/control chars and broken surrogate leftovers + text = text.replace("\r\n", "\n").replace("\r", "\n") + text = _XML_INVALID_RE.sub("", text) + return text + + +def safe_json(data: object) -> str: + return sanitize_text(json.dumps(data, ensure_ascii=False, sort_keys=True)) + + def open_text_out(path: Path): if str(path).lower().endswith(".gz"): return gzip.open(path, "wt", encoding="utf-8", newline="") @@ -89,13 +108,19 @@ def open_text_out(path: Path): def write_xml(path: Path, root: ET.Element) -> None: - xml_bytes = ET.tostring(root, encoding="utf-8") - pretty = minidom.parseString(xml_bytes).toprettyxml(indent=" ", encoding="utf-8") + # ElementTree is more robust here than round-tripping through minidom, + # and avoids parsing giant XML back into memory. + tree = ET.ElementTree(root) + try: + ET.indent(tree, space=" ") # Python 3.9+ + except Exception: + pass if str(path).lower().endswith(".gz"): with gzip.open(path, "wb") as f: - f.write(pretty) + tree.write(f, encoding="utf-8", xml_declaration=True) else: - path.write_bytes(pretty) + with path.open("wb") as f: + tree.write(f, encoding="utf-8", xml_declaration=True) def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: @@ -103,16 +128,19 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: for path in paths: with path.open("r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) - required = {"lon", "lat", "name", "garmin_kind", "garmin_type", "garmin_subtype"} - missing = required - set(reader.fieldnames or []) + fieldnames = set(reader.fieldnames or []) + required = {"lon", "lat", "name"} + missing = required - fieldnames if missing: raise ValueError(f"{path}: missing columns: {sorted(missing)}") + for row in reader: try: lon = float(row["lon"]) lat = float(row["lat"]) except Exception: continue + try: gpxsee_classes = json.loads(row.get("gpxsee_classes_json") or "[]") except Exception: @@ -121,27 +149,38 @@ def load_landmarks(paths: Iterable[Path]) -> list[Landmark]: semantic_tags = json.loads(row.get("semantic_tags_json") or "{}") except Exception: semantic_tags = {} + + name = sanitize_text((row.get("name") or "").strip()) + garmin_kind = sanitize_text((row.get("garmin_kind") or "point").strip()) + garmin_type = sanitize_text((row.get("garmin_type") or "").strip().lower()) + garmin_subtype = sanitize_text((row.get("garmin_subtype") or "").strip().lower()) + + if isinstance(semantic_tags, dict): + semantic_tags = {sanitize_text(k): sanitize_text(v) for k, v in semantic_tags.items() if sanitize_text(k)} + else: + semantic_tags = {} + if isinstance(gpxsee_classes, list): + gpxsee_classes = [sanitize_text(v).strip() for v in gpxsee_classes if sanitize_text(v).strip()] + else: + gpxsee_classes = [] + items.append(Landmark( lon=lon, lat=lat, - name=(row.get("name") or "").strip(), - garmin_kind=(row.get("garmin_kind") or "").strip(), - garmin_type=(row.get("garmin_type") or "").strip().lower(), - garmin_subtype=(row.get("garmin_subtype") or "").strip().lower(), - gpxsee_classes=gpxsee_classes if isinstance(gpxsee_classes, list) else [], - semantic_tags=semantic_tags if isinstance(semantic_tags, dict) else {}, + name=name, + garmin_kind=garmin_kind, + garmin_type=garmin_type, + garmin_subtype=garmin_subtype, + gpxsee_classes=gpxsee_classes, + semantic_tags=semantic_tags, source_files=[path.name], - duplicate_names=[(row.get("name") or "").strip()] if (row.get("name") or "").strip() else [], - duplicate_types=[(row.get("garmin_type") or "").strip().lower()], - duplicate_subtypes=[(row.get("garmin_subtype") or "").strip().lower()], + duplicate_names=[name] if name else [], + duplicate_types=[garmin_type] if garmin_type else [], + duplicate_subtypes=[garmin_subtype] if garmin_subtype else [], )) return items -# ---------------------------- -# Semantic inference / groups -# ---------------------------- - def gpxsee_class_flags(item: Landmark) -> set[str]: return {str(v).strip().lower() for v in item.gpxsee_classes if str(v).strip()} @@ -224,10 +263,6 @@ def infer_group_style(key: str) -> dict[str, str]: return {"name": humanize_group_name(key), "color": "#FB8C00", "icon": "marker", "background": "circle"} -# ---------------------------- -# Dedupe -# ---------------------------- - def meters_per_deg_lon(lat_deg: float) -> float: return EARTH_M_PER_DEG_LAT * max(0.01, math.cos(math.radians(lat_deg))) @@ -256,11 +291,10 @@ def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark: merged = best.clone() merged.duplicate_count = primary.duplicate_count + other.duplicate_count merged.source_files = sorted(set(primary.source_files + other.source_files)) - merged.duplicate_names = sorted({n for n in primary.duplicate_names + other.duplicate_names if n}) - merged.duplicate_types = sorted(set(primary.duplicate_types + other.duplicate_types)) - merged.duplicate_subtypes = sorted(set(primary.duplicate_subtypes + other.duplicate_subtypes)) + merged.duplicate_names = sorted({sanitize_text(n) for n in primary.duplicate_names + other.duplicate_names if sanitize_text(n)}) + merged.duplicate_types = sorted(set(filter(None, primary.duplicate_types + other.duplicate_types))) + merged.duplicate_subtypes = sorted(set(filter(None, primary.duplicate_subtypes + other.duplicate_subtypes))) - # Prefer the richest semantic tag set, but merge missing keys from the other side. richer = primary.semantic_tags if len(primary.semantic_tags) >= len(other.semantic_tags) else other.semantic_tags poorer = other.semantic_tags if richer is primary.semantic_tags else primary.semantic_tags merged.semantic_tags = dict(richer) @@ -270,7 +304,7 @@ def merge_landmarks(primary: Landmark, other: Landmark) -> Landmark: merged.gpxsee_classes = sorted(set(primary.gpxsee_classes + other.gpxsee_classes)) if not merged.name: - merged.name = primary.name or other.name + merged.name = sanitize_text(primary.name or other.name) return merged @@ -320,10 +354,6 @@ def dedupe(items: list[Landmark], radius_m: float = 12.0, mode: str = "coord") - return clusters -# ---------------------------- -# Filtering and grouping -# ---------------------------- - def apply_filters( items: list[Landmark], category: Optional[str], @@ -369,16 +399,63 @@ def make_group_key(it: Landmark, mode: str) -> str: def sample_label(it: Landmark) -> str: parts = [] if it.name: - parts.append(it.name) + parts.append(sanitize_text(it.name)) parts.append(f"{it.lon:.5f},{it.lat:.5f}") parts.append(f"{it.garmin_type}/{it.garmin_subtype}") if it.semantic_tags: cleaned = {k: v for k, v in it.semantic_tags.items() if k != "name"} if cleaned: - parts.append(json.dumps(cleaned, ensure_ascii=False, sort_keys=True)) + parts.append(safe_json(cleaned)) return " | ".join(parts) +def spread_examples(group_items: list[Landmark], example_count: int) -> list[str]: + if example_count <= 0 or not group_items: + return [] + + ordered = sorted(group_items, key=lambda it: ( + sanitize_text(it.source_files[0] if it.source_files else ""), + round(it.lon, 6), + round(it.lat, 6), + sanitize_text(it.name), + )) + + n = len(ordered) + if n <= example_count: + candidates = ordered + elif example_count == 1: + candidates = [ordered[n // 2]] + else: + idxs = [] + for i in range(example_count): + idx = round(i * (n - 1) / (example_count - 1)) + idxs.append(int(idx)) + # keep order, unique indices + seen_idx = set() + candidates = [] + for idx in idxs: + if idx not in seen_idx: + seen_idx.add(idx) + candidates.append(ordered[idx]) + + examples: list[str] = [] + seen = set() + if candidates: + chosen_set = set(id(x) for x in candidates) + else: + chosen_set = set() + + for it in candidates + ordered: + lbl = sample_label(it) + if lbl in seen: + continue + seen.add(lbl) + examples.append(lbl) + if len(examples) >= example_count: + break + return examples + + def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> dict[str, GroupDefinition]: grouped: dict[str, list[Landmark]] = defaultdict(list) for it in items: @@ -387,16 +464,7 @@ def build_groups(items: list[Landmark], mode: str, example_count: int = 3) -> di result: dict[str, GroupDefinition] = {} for key, group_items in sorted(grouped.items()): style = infer_group_style(key) - examples = [] - seen = set() - for it in group_items: - lbl = sample_label(it) - if lbl in seen: - continue - seen.add(lbl) - examples.append(lbl) - if len(examples) >= example_count: - break + examples = spread_examples(group_items, example_count) result[key] = GroupDefinition( key=key, name=style["name"], @@ -434,32 +502,34 @@ def interactive_rename_groups(groups: dict[str, GroupDefinition], enabled: bool, if reply == "!": keep_all = True elif reply: - group.name = reply + group.name = sanitize_text(reply) print(file=sys.stderr) return groups -# ---------------------------- -# Writers -# ---------------------------- +def xml_text(el: ET.Element, text: object) -> None: + value = sanitize_text(text) + if value: + el.text = value + def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_path: Path, by: str = "auto") -> None: gpx = ET.Element("gpx", { "version": "1.1", - "creator": "landmarks_csv_to_osmand_v2.py", + "creator": "landmarks_csv_to_osmand_v3.py", "xmlns": GPX_NS, }) metadata = ET.SubElement(gpx, "metadata") - ET.SubElement(metadata, "name").text = out_path.stem + xml_text(ET.SubElement(metadata, "name"), out_path.stem) groups_el = ET.SubElement(ET.SubElement(gpx, "extensions"), f"{{{OSMAND_NS}}}points_groups") for key in sorted(groups): cfg = groups[key] ET.SubElement(groups_el, f"{{{OSMAND_NS}}}group", { - "name": cfg.name, - "color": cfg.color, - "icon": cfg.icon, - "background": cfg.background, + "name": sanitize_text(cfg.name), + "color": sanitize_text(cfg.color), + "icon": sanitize_text(cfg.icon), + "background": sanitize_text(cfg.background), }) for it in items: @@ -467,43 +537,47 @@ def write_gpx(items: list[Landmark], groups: dict[str, GroupDefinition], out_pat cfg = groups[gkey] wpt = ET.SubElement(gpx, "wpt", {"lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"}) if it.name: - ET.SubElement(wpt, "name").text = it.name - ET.SubElement(wpt, "type").text = cfg.name + xml_text(ET.SubElement(wpt, "name"), it.name) + xml_text(ET.SubElement(wpt, "type"), cfg.name) desc_parts = [] if it.semantic_tags: - desc_parts.append("semantic: " + json.dumps(it.semantic_tags, ensure_ascii=False, sort_keys=True)) - desc_parts.append(f"garmin: kind={it.garmin_kind} type={it.garmin_type} subtype={it.garmin_subtype}") + desc_parts.append("semantic: " + safe_json(it.semantic_tags)) + desc_parts.append(f"garmin: kind={sanitize_text(it.garmin_kind)} type={sanitize_text(it.garmin_type)} subtype={sanitize_text(it.garmin_subtype)}") if it.duplicate_count > 1: desc_parts.append(f"dedupe: merged {it.duplicate_count} records") if it.source_files: - desc_parts.append("sources=" + ", ".join(it.source_files)) - ET.SubElement(wpt, "desc").text = "\n".join(desc_parts) + desc_parts.append("sources=" + ", ".join(sanitize_text(s) for s in it.source_files)) + xml_text(ET.SubElement(wpt, "desc"), "\n".join(desc_parts)) ext = ET.SubElement(wpt, "extensions") - ET.SubElement(ext, f"{{{OSMAND_NS}}}icon").text = cfg.icon - ET.SubElement(ext, f"{{{OSMAND_NS}}}color").text = cfg.color - ET.SubElement(ext, f"{{{OSMAND_NS}}}background").text = cfg.background + xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}icon"), cfg.icon) + xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}color"), cfg.color) + xml_text(ET.SubElement(ext, f"{{{OSMAND_NS}}}background"), cfg.background) write_xml(out_path, gpx) def write_osm(items: list[Landmark], out_path: Path) -> None: - osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v2.py"}) + osm = ET.Element("osm", {"version": "0.6", "generator": "landmarks_csv_to_osmand_v3.py"}) nid = -1 for it in items: node = ET.SubElement(osm, "node", {"id": str(nid), "lat": f"{it.lat:.8f}", "lon": f"{it.lon:.8f}"}) nid -= 1 if it.name: - ET.SubElement(node, "tag", {"k": "name", "v": it.name}) + ET.SubElement(node, "tag", {"k": "name", "v": sanitize_text(it.name)}) for k, v in sorted(it.semantic_tags.items()): - if v is None: + k2 = sanitize_text(k) + v2 = sanitize_text(v) + if not k2 or not v2: continue - ET.SubElement(node, "tag", {"k": str(k), "v": str(v)}) - ET.SubElement(node, "tag", {"k": "garmin:kind", "v": it.garmin_kind}) - ET.SubElement(node, "tag", {"k": "garmin:type", "v": it.garmin_type}) - ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": it.garmin_subtype}) + ET.SubElement(node, "tag", {"k": k2, "v": v2}) + ET.SubElement(node, "tag", {"k": "garmin:kind", "v": sanitize_text(it.garmin_kind)}) + if it.garmin_type: + ET.SubElement(node, "tag", {"k": "garmin:type", "v": sanitize_text(it.garmin_type)}) + if it.garmin_subtype: + ET.SubElement(node, "tag", {"k": "garmin:subtype", "v": sanitize_text(it.garmin_subtype)}) if it.duplicate_count > 1: ET.SubElement(node, "tag", {"k": "source:merge_count", "v": str(it.duplicate_count)}) if it.source_files: - ET.SubElement(node, "tag", {"k": "source:file", "v": ",".join(it.source_files)}) + ET.SubElement(node, "tag", {"k": "source:file", "v": sanitize_text(",".join(it.source_files))}) write_xml(out_path, osm) @@ -517,7 +591,7 @@ def write_summary(items: list[Landmark], groups: dict[str, GroupDefinition], out w.writerow(["group_key", "group_name", "count", "examples"]) for key, value in sorted(counts.items()): group = groups[key] - w.writerow([key, group.name, value, " || ".join(group.examples)]) + w.writerow([sanitize_text(key), sanitize_text(group.name), value, " || ".join(sanitize_text(x) for x in group.examples)]) if out_json: payload = { key: { @@ -546,7 +620,7 @@ def print_groups(groups: dict[str, GroupDefinition]) -> None: def main(argv: Optional[list[str]] = None) -> int: ap = argparse.ArgumentParser( - description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping and modular coordinate-based dedupe." + description="Convert landmark CSV exports into OsmAnd-friendly GPX overlays and/or OSM POI input, with stronger grouping, spread-out sampling, and modular coordinate-based dedupe." ) ap.add_argument("csv", nargs="+", type=Path, help="Input landmark CSV files") ap.add_argument("--gpx", type=Path, help="Write OsmAnd-friendly GPX waypoint overlay") @@ -556,7 +630,7 @@ def main(argv: Optional[list[str]] = None) -> int: ap.add_argument("--group-by", choices=["auto", "category", "type", "source"], default="auto", help="How GPX waypoint groups should be organized") ap.add_argument("--show-groups", action="store_true", help="Print detected groups with a few examples") ap.add_argument("--interactive-group-names", action="store_true", help="Interactively rename auto-detected groups") - ap.add_argument("--group-examples", type=int, default=3, help="How many example rows to show for each group") + ap.add_argument("--group-examples", type=int, default=3, help="How many spread-out example rows to show for each group") ap.add_argument("--category", help="Keep only one inferred category, e.g. drinking_water, springs, settlements_village") ap.add_argument("--filter-tag", action="append", default=[], help="Keep only landmarks whose semantic tags contain key=value") ap.add_argument("--filter-kind", help="Keep only one Garmin kind, e.g. point") diff --git a/stage-2-parse-stage-1/readme.md b/stage-2-parse-stage-1/readme.md index 0c13b0b..93676c1 100644 --- a/stage-2-parse-stage-1/readme.md +++ b/stage-2-parse-stage-1/readme.md @@ -1,89 +1,59 @@ -Done. +Found the two real problems. + +First, your crash: + +* `minidom.parseString(...)` was reparsing the entire generated XML in memory +* one of the landmark names or tag values contains an XML-illegal control character +* on a huge file, that blows up exactly like your `ExpatError: not well-formed (invalid token)` + +Second, the sampling: + +* the previous script was taking the first few examples in a group +* now it samples **spread out across the group**, so the examples are taken from different positions in the dataset instead of only the front Updated script: -[landmarks_csv_to_osmand_v2.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v2.py) +[landmarks_csv_to_osmand_v3.py](sandbox:/mnt/data/landmarks_csv_to_osmand_v3.py) What changed: -* default dedupe is now **coordinate-based only** -* dedupe is modular: +* removed the fragile `minidom.parseString()` round-trip +* writes XML directly with `ElementTree` +* sanitizes text and tag values to strip XML-invalid control characters +* supports `.gz` outputs correctly +* group examples are now **spread-out samples** +* interactive group renaming still works, but now the shown examples are better distributed +* loader still accepts both raw landmark CSVs and slimmer derived CSVs - * `coord` - * `coord_name` - * `coord_type` - * `coord_name_type` - * `off` -* dedupe uses a meter radius instead of naive decimal rounding -* duplicates are **merged**, not just dropped -* merged records keep: - - * best name - * richer semantic tags - * union of GPXSee classes - * source file list - * duplicate count -* grouping is improved: - - * `--group-by auto` - * `--group-by category` - * `--group-by type` - * `--group-by source` -* automatic grouping now distinguishes things like: - - * `amenity_drinking_water` - * `natural_spring` - * `place_village` - * fallback Garmin or GPXSee groups -* interactive group naming added: - - * `--interactive-group-names` - * shows sample examples from each detected group - * lets you keep or rename the proposed group name -* loader is now tolerant of both: - - * the raw landmark CSVs - * the slimmer per-category CSVs you are iterating through -* `.gpx.gz`, `.osm.gz`, `.json.gz`, `.csv.gz` now actually get gzip-written correctly - -Useful commands: - -Show detected groups with examples: +Use it instead of the old one: ```bash -python landmarks_csv_to_osmand_v2.py *.csv --show-groups +python landmarks_csv_to_osmand_v3.py *.csv --osm bgmountains_poi.osm.gz ``` -Interactive rename flow: +If you want to inspect grouping before writing: ```bash -python landmarks_csv_to_osmand_v2.py *.csv --show-groups --interactive-group-names --group-by auto --gpx landmarks.gpx +python landmarks_csv_to_osmand_v3.py *.csv --show-groups --group-examples 5 ``` -Tighter coordinate dedupe: +If you want interactive naming with spread-out examples: ```bash -python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord --dedupe-radius-m 6 --gpx landmarks.gpx +python landmarks_csv_to_osmand_v3.py *.csv --show-groups --interactive-group-names --group-examples 5 --gpx landmarks.gpx ``` -Coordinate dedupe, but only merge when type also matches: +If you want POI-only OSM for OsmAndMapCreator: ```bash -python landmarks_csv_to_osmand_v2.py *.csv --dedupe-mode coord_type --dedupe-radius-m 10 --gpx landmarks.gpx +python landmarks_csv_to_osmand_v3.py *.csv --osm landmarks.osm.gz ``` -Water-only overlay: +Then: ```bash -python landmarks_csv_to_osmand_v2.py *.csv --filter-tag amenity=drinking_water --gpx drinking_water.gpx +java.exe -Xms256M -Xmx4096M -cp "OsmAndMapCreator.jar;lib/*" net.osmand.MainUtilities generate-poi C:\path\to\landmarks.osm.gz ``` -Springs-only overlay: +The main fix is this: the script now treats dirty text safely instead of trusting all names/tags to already be valid XML. -```bash -python landmarks_csv_to_osmand_v2.py *.csv --filter-tag natural=spring --gpx springs.gpx -``` - -If you want the next revision, I’d push it further in two directions: - -* add **dedupe reports** showing which rows got merged into which canonical landmark -* add **interactive split/merge controls** for ambiguous groups, not just rename controls +If you want, the next refinement should be a `--debug-bad-rows` mode that writes out the original rows whose text had to be sanitized, so you can see exactly which source landmarks were malformed. diff --git a/summary.csv b/summary.csv new file mode 100644 index 0000000..4cfcdf2 --- /dev/null +++ b/summary.csv @@ -0,0 +1,6 @@ +group_key,group_name,count,examples +amenity_drinking_water,Вода,55724,"22.30130,43.70876 | 0x64/0x14 | {""amenity"": ""drinking_water""} || 22.41769,43.77090 | 0x64/0x14 | {""amenity"": ""drinking_water""} || 22.41846,43.77485 | 0x64/0x14 | {""amenity"": ""drinking_water""}" +gpxsee_poi,Убежище,19619,"22.33757,43.92231 | 0x66/0x00 || 22.33117,43.93072 | 0x66/0x00 || 22.27306,43.90484 | 0x64/0x0e" +natural_peak,Връх,26720,"t for sale | 21.89343,43.77614 | 0x66/0x16 | {""natural"": ""peak""} || 21.87644,43.77691 | 0x66/0x16 | {""natural"": ""peak""} || 21.87640,43.77691 | 0x66/0x16 | {""natural"": ""peak""}" +natural_volcano,Волкан,13,",45,60,N3 | 23.29994,42.81578 | 0x66/0x0e | {""natural"": ""volcano""} || дий | 24.51264,43.45196 | 0x66/0x0e | {""natural"": ""volcano""} || дий | 24.51264,43.45200 | 0x66/0x0e | {""natural"": ""volcano""}" +place_locality,Място,65580,"ука | 22.33143,43.90969 | 0x66/0x00 | {""place"": ""locality""} || ин рът | 22.31954,43.91115 | 0x66/0x00 | {""place"": ""locality""} || иткин рът | 22.31134,43.90703 | 0x66/0x00 | {""place"": ""locality""}"