#!/usr/bin/env python3 """ Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML. What it does well: - Reads classic Garmin IMG container FAT and extracts subfiles. - Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where TRE/RGN/LBL offsets are stored inside the .GMP container. - Parses TRE levels/subdivisions. - Parses LBL labels (coding 6, 9, 10) with common codepage handling. - Parses standard points, extended points, standard polylines/polygons, and extended polylines/polygons from RGN. - Exports GeoJSON and/or OSM XML. What it does NOT promise: - Full Garmin NT routing/address semantics. - Locked/compressed/vendor-obfuscated maps. - Perfect type-to-OSM semantic translation. The exporter preserves Garmin type codes as tags instead of inventing OSM semantics. This is a practical reverse-engineering tool, not a complete implementation of all Garmin IMG variants. """ from __future__ import annotations import argparse import csv import io import json import math import sys import gzip from collections import Counter, defaultdict from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Iterable, Iterator, List, Optional, Tuple from xml.sax.saxutils import escape as xml_escape # ------------------------- # Low-level helpers # ------------------------- COORD_FACTOR = 360.0 / (1 << 24) FAT_BLOCK_SIZE = 0x200 FAT_ENTRY_SIZE = 0x200 MAX_FAT_BLOCKLIST = 240 SEG_POINT = 0 SEG_IPOINT = 1 SEG_POLYLINE = 2 SEG_POLYGON = 3 SEG_EXTPOLYGON = 4 SEG_EXTPOLYLINE = 5 SEG_EXTPOINT = 6 OBJ_POINT = 0x10 OBJ_INDEXED_POINT = 0x20 OBJ_POLYLINE = 0x40 OBJ_POLYGON = 0x80 OBJ_EXT_POLYGON = 0x100 OBJ_EXT_POLYLINE = 0x200 OBJ_EXT_POINT = 0x400 def warn(msg: str) -> None: print(f"[warn] {msg}", file=sys.stderr) def info(msg: str) -> None: print(f"[info] {msg}", file=sys.stderr) def read_u16le(buf: bytes, off: int) -> int: return int.from_bytes(buf[off:off + 2], "little", signed=False) def read_s16le(buf: bytes, off: int) -> int: return int.from_bytes(buf[off:off + 2], "little", signed=True) def read_u24le(buf: bytes, off: int) -> int: return int.from_bytes(buf[off:off + 3], "little", signed=False) def read_s24le(buf: bytes, off: int) -> int: raw = read_u24le(buf, off) if raw & 0x800000: raw -= 1 << 24 return raw def read_u32le(buf: bytes, off: int) -> int: return int.from_bytes(buf[off:off + 4], "little", signed=False) def to_deg(coord: int) -> float: return coord * COORD_FACTOR def decode_ascii_z(data: bytes) -> str: return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip() # ------------------------- # Container extraction # ------------------------- @dataclass class FatRecord: filename: str ext: str size: int blocks: List[int] offset_in_fat: int class ImgContainer: def __init__(self, raw: bytes): self.raw = raw # Some IMG files are XOR'd by a single byte stored at byte 0. xor_byte = raw[0] if xor_byte not in (0x00,): maybe = bytes(b ^ xor_byte for b in raw) sig = maybe[0x10:0x17] ident = maybe[0x41:0x48] if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"): info(f"applied XOR decode with byte 0x{xor_byte:02x}") self.raw = maybe self.block_size = self._read_block_size() self.fat_start = self._read_fat_start() self.files = self._extract_subfiles() def _read_block_size(self) -> int: e1 = self.raw[0x61] e2 = self.raw[0x62] return 1 << (e1 + e2) def _read_fat_start(self) -> int: fat_phys_block = self.raw[0x40] return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE def _parse_fat_chain(self) -> List[FatRecord]: records: List[FatRecord] = [] off = self.fat_start seen_offsets = set() while off + FAT_ENTRY_SIZE <= len(self.raw): if off in seen_offsets: break seen_offsets.add(off) first = self.raw[off] if first != 0x01: break name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00") ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00") size = read_u32le(self.raw, off + 12) next_fat = read_u16le(self.raw, off + 16) blocks = [] boff = off + 0x20 for i in range(MAX_FAT_BLOCKLIST): blk = read_u16le(self.raw, boff + i * 2) if blk == 0xFFFF: break blocks.append(blk) if next_fat == 0: records.append(FatRecord(name, ext, size, blocks, off)) off += FAT_ENTRY_SIZE return records def _collect_blocks(self, start_record: FatRecord) -> bytes: data = bytearray() blocks = list(start_record.blocks) current_offset = start_record.offset_in_fat # Follow FAT continuation blocks when next_fat is used. while True: next_fat = read_u16le(self.raw, current_offset + 16) if next_fat == 0: break current_offset += FAT_ENTRY_SIZE if current_offset + FAT_ENTRY_SIZE > len(self.raw): break boff = current_offset + 0x20 for i in range(MAX_FAT_BLOCKLIST): blk = read_u16le(self.raw, boff + i * 2) if blk == 0xFFFF: break blocks.append(blk) for blk in blocks: start = blk * self.block_size end = start + self.block_size if end > len(self.raw): break data.extend(self.raw[start:end]) return bytes(data[:start_record.size]) def _extract_subfiles(self) -> Dict[str, bytes]: out: Dict[str, bytes] = {} for rec in self._parse_fat_chain(): key = f"{rec.filename}.{rec.ext}".upper() out[key] = self._collect_blocks(rec) return out # ------------------------- # Core format structures # ------------------------- @dataclass class LevelInfo: level: int bits_per_coord: int inherited: bool present: bool = True @dataclass class Subdivision: index: int level: int data_offset: int object_types: int lon_center: int lat_center: int width: int height: int index_next_level: int = 0 last: bool = False data_end: int = 0 data_ext_polygon_offset: int = 0 data_ext_polygon_end: int = 0 data_ext_polyline_offset: int = 0 data_ext_polyline_end: int = 0 data_ext_poi_offset: int = 0 data_ext_poi_end: int = 0 children: List["Subdivision"] = field(default_factory=list) def nb_object_types(self) -> int: count = 0 cur = 0x10 for _ in range(4): if self.object_types & cur: count += 1 cur <<= 1 return count @dataclass class Feature: geom_type: str # Point | LineString | Polygon coords: object props: Dict[str, object] # ------------------------- # LBL parser # ------------------------- class LBL: NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~', '~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~', '~', '~', '~', '~', '~', '[', '\\', ']', '^', '_'] SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] def __init__(self, data: Optional[bytes]): self.data = data or b"" self.ok = bool(data) self.data_offset = 0 self.data_length = 0 self.data_offset_multiplier = 1 self.label_coding = 6 self.codepage = 1252 if self.ok: self._parse_header() def _parse_header(self) -> None: header_length = read_u16le(self.data, 0) self.data_offset = read_u32le(self.data, 0x15) self.data_length = read_u32le(self.data, 0x19) self.data_offset_multiplier = 1 << self.data[0x1D] self.label_coding = self.data[0x1E] if len(self.data) >= 0xAC: self.codepage = read_u16le(self.data, 0xAA) def get_label(self, offset: int) -> str: if not self.ok or offset == 0: return "" actual = self.data_offset + offset * self.data_offset_multiplier if actual < 0 or actual >= len(self.data): return "" if self.label_coding == 6: return self._get_label6(actual) return self._get_label8_10(actual) def _get_label8_10(self, off: int) -> str: end = off while end < len(self.data) and self.data[end] != 0: end += 1 raw = self.data[off:end] enc = None cp = self.codepage if cp in (0, 850): enc = "cp1252" elif cp == 65001: enc = "utf-8" elif cp == 932: enc = "cp932" elif cp == 950: enc = "big5" else: enc = f"cp{cp}" try: return raw.decode(enc, errors="replace") except Exception: return raw.decode("latin1", errors="replace") def _get_label6(self, off: int) -> str: out: List[str] = [] charset = "NORMAL" pos = off while pos + 3 <= len(self.data): b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2] pos += 3 codes = [ b1 >> 2, ((b1 & 0x3) << 4) | (b2 >> 4), ((b2 & 0xF) << 2) | (b3 >> 6), b3 & 0x3F, ] for c in codes: if c > 0x2F: return "".join(out).strip() if charset == "NORMAL": if c == 0x1C: charset = "SYMBOL" elif c == 0x1B: charset = "SPECIAL" elif c == 0x1D: out.append("|") elif c in (0x1E, 0x1F): out.append(" ") else: out.append(self.NORMAL_CHARS[c]) elif charset == "SYMBOL": out.append(self.SYMBOL_CHARS[c]) charset = "NORMAL" else: out.append(self.SPECIAL_CHARS[c]) charset = "NORMAL" return "".join(out).strip() # ------------------------- # TRE parser # ------------------------- class TRE: def __init__(self, data: bytes): self.data = data self.header_length = read_u16le(data, 0) self.north = read_s24le(data, 0x15) self.east = read_s24le(data, 0x18) self.south = read_s24le(data, 0x1B) self.west = read_s24le(data, 0x1E) self.levels: Dict[int, LevelInfo] = {} self.max_level = 0 self.min_level = 15 self.extended_types = False self.extended_types_offset = 0 self.extended_types_length = 0 self.extended_types_size = 0 self.extended_types_number = 0 self.decalaje_extended_types = 0 self.subdivisions_count = 1 self.root_subdivisions: List[Subdivision] = [] self.subdivisions_by_index: Dict[int, Subdivision] = {} self._parse() def _parse(self) -> None: self._parse_levels() self._parse_tre7() self._parse_subdivisions() def _parse_tre7(self) -> None: if self.header_length >= 0x7C + 10: self.extended_types_offset = read_u32le(self.data, 0x7C) self.extended_types_length = read_u32le(self.data, 0x80) self.extended_types_size = read_u16le(self.data, 0x84) if self.extended_types_size > 0: self.extended_types_number = self.extended_types_length // self.extended_types_size self.extended_types = self.extended_types_length > 0 self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number def _parse_levels(self) -> None: levels_offset = read_u32le(self.data, 0x21) levels_length = read_u32le(self.data, 0x25) pos = levels_offset end = levels_offset + levels_length while pos + 4 <= end and pos + 4 <= len(self.data): zoom = self.data[pos] bits = self.data[pos + 1] count = read_u16le(self.data, pos + 2) _ = count level = zoom & 0xF inherited = bool(zoom & 0x80) self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited) self.max_level = max(self.max_level, level) self.min_level = min(self.min_level, level) self.subdivisions_count += count pos += 4 def get_resolution(self, level: int) -> int: return self.levels[level].bits_per_coord def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int: shift = 24 - self.get_resolution(level) - additional_accuracy if shift >= 0: return value << shift return value >> (-shift) def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]: data_offset = read_u24le(self.data, pos) object_types = self.data[pos + 3] if object_types & 0x0F: data_offset += (object_types & 0x0F) * (1 << 24) lon_center = read_s24le(self.data, pos + 4) lat_center = read_s24le(self.data, pos + 7) width = read_u16le(self.data, pos + 10) last = False if width & 0x8000: width &= 0x7FFF last = True height = read_u16le(self.data, pos + 12) index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0 sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types, lon_center=lon_center, lat_center=lat_center, width=width, height=height, index_next_level=index_next, last=last) # Extended offsets per subdivision, if present. if self.extended_types: indice = index - self.decalaje_extended_types if indice > 0 and self.extended_types_size >= 8: p = self.extended_types_offset + (indice - 1) * self.extended_types_size if p + self.extended_types_size <= len(self.data): sub.data_ext_polygon_offset = read_u32le(self.data, p) if self.extended_types_size >= 8: sub.data_ext_polyline_offset = read_u32le(self.data, p + 4) if self.extended_types_size >= 12: sub.data_ext_poi_offset = read_u32le(self.data, p + 8) return sub, pos + record_size def _parse_subdivisions(self) -> None: sub_offset = read_u32le(self.data, 0x29) sub_length = read_u32le(self.data, 0x2D) end = sub_offset + sub_length if end > len(self.data): end = len(self.data) present_levels = sorted(self.levels.keys(), reverse=True) if not present_levels: return current_root_level = present_levels[0] index = 1 pos = sub_offset roots: List[Subdivision] = [] # Parse all 16-byte records first until last root. while pos + 16 <= end: sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index) roots.append(sub) self.subdivisions_by_index[index] = sub index += 1 if sub.last: break self.root_subdivisions = roots # Recursively parse children using the index_next_level scheme. self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index) # Compute data ends by sorted data offsets. ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) for i, sub in enumerate(ordered): if i + 1 < len(ordered): sub.data_end = ordered[i + 1].data_offset else: sub.data_end = 0 # Extended segment ends. for attr_start, attr_end in [ ("data_ext_polygon_offset", "data_ext_polygon_end"), ("data_ext_polyline_offset", "data_ext_polyline_end"), ("data_ext_poi_offset", "data_ext_poi_end"), ]: items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)), key=lambda s: getattr(s, attr_start)) for i, sub in enumerate(items): if i + 1 < len(items): setattr(sub, attr_end, getattr(items[i + 1], attr_start)) else: setattr(sub, attr_end, 0) def _next_present_level(self, level: int) -> int: while level > 0 and level not in self.levels: level -= 1 return level def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None: level = self._next_present_level(level) if level <= 0: return for parent in parents: if parent.index_next_level <= 0: continue idx = parent.index_next_level if idx <= 0: continue # Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level. record_size = 14 if level == self.min_level else 16 pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14 # Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed. if record_size == 14 and pos + 14 > end: pos = min(end, sub_offset + len(self.root_subdivisions) * 16) children: List[Subdivision] = [] while pos + record_size <= end: try: sub, pos = self._parse_subdiv_record(pos, level, record_size, idx) except Exception: break children.append(sub) self.subdivisions_by_index[idx] = sub idx += 1 if sub.last: break parent.children = children child_level = self._next_present_level(level - 1) if child_level > 0 and children: self._parse_children(children, child_level, sub_offset, end, idx) # ------------------------- # RGN parser # ------------------------- class BitStreamReader: def __init__(self, data: bytes, start: int, length_bytes: int): self.data = data self.pos = start self.remaining_bytes = length_bytes self.remaining_bits = 0 self.cur_byte = 0 def has_next(self, nbits: int) -> bool: return self.remaining_bytes * 8 + self.remaining_bits >= nbits def finish(self) -> int: self.pos += self.remaining_bytes self.remaining_bytes = 0 self.remaining_bits = 0 return self.pos def _get_if_needed(self) -> None: if self.remaining_bits == 0: if self.remaining_bytes <= 0: raise EOFError self.cur_byte = self.data[self.pos] self.pos += 1 self.remaining_bytes -= 1 self.remaining_bits = 8 def read_next_bits(self, to_get: int) -> int: cur_pos = 0 result = 0 while cur_pos < to_get: self._get_if_needed() remaining_to_get = to_get - cur_pos if remaining_to_get >= self.remaining_bits: result |= self.cur_byte << cur_pos cur_pos += self.remaining_bits self.remaining_bits = 0 else: mask = (1 << remaining_to_get) - 1 result |= (self.cur_byte & mask) << cur_pos self.cur_byte >>= remaining_to_get self.remaining_bits -= remaining_to_get return result return result def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int: if sign == 0: value = self.read_next_bits(nbits) sign_mask = 1 << (nbits - 1) if value & sign_mask: comp = value ^ sign_mask if extra_bit == 0: if comp != 0: return comp - sign_mask other = self.read_coord_offset(nbits, sign, extra_bit) if other < 0: return 1 - value + other return value - 1 + other else: if comp & 0xFFFFFE: return (comp & 0xFFFFFE) - sign_mask other = self.read_coord_offset(nbits - 1, sign, 0) if other < 0: return 1 - sign_mask + 1 + (other << 1) return sign_mask - 1 - 1 + (other << 1) else: if extra_bit > 0: return value & 0xFFFFFE return value else: value = self.read_next_bits(nbits) if extra_bit > 0: return (((value >> 1) * sign) << 1) return value * sign class RGN: def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]): self.data = data self.tre = tre self.lbl = lbl or LBL(None) self.header_length = read_u16le(data, 0) self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0 self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0 self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0 self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0 self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0 self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0 self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0 self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0 def data_end(self) -> int: return self.data_length def ext_polygon_end(self) -> int: return self.ext_poly_length def ext_polyline_end(self) -> int: return self.ext_line_length def ext_poi_end(self) -> int: return self.ext_poi_length @staticmethod def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int: add = 0 if sign == 0: add += 1 add += extra_bit if i <= 9: return i + 2 + add return 2 * i - 9 + 2 + add def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int: return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc) def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int: return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc) def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]: result: List[Optional[Tuple[int, int]]] = [None] * 7 offset = sub.data_offset + self.data_offset end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset if sub.object_types == 0: return result if sub.data_end and end > len(self.data): end = len(self.data) if sub.data_end and end > offset and sub.nb_object_types() > 0: if sub.object_types & OBJ_POINT: result[SEG_POINT] = (0, 0) if sub.object_types & OBJ_INDEXED_POINT: result[SEG_IPOINT] = (0, 0) if sub.object_types & OBJ_POLYLINE: result[SEG_POLYLINE] = (0, 0) if sub.object_types & OBJ_POLYGON: result[SEG_POLYGON] = (0, 0) order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON] nb_pointers = sub.nb_object_types() - 1 if offset + nb_pointers * 2 <= len(self.data): segment_start = offset + nb_pointers * 2 cur_idx = 0 p = offset for _ in range(nb_pointers): while cur_idx < 4 and result[order[cur_idx]] is None: cur_idx += 1 if cur_idx >= 4: break segment_end = read_u16le(self.data, p) + offset p += 2 if segment_end > end or segment_end <= segment_start: result[order[cur_idx]] = None else: result[order[cur_idx]] = (segment_start, segment_end) segment_start = segment_end cur_idx += 1 while cur_idx < 4 and result[order[cur_idx]] is None: cur_idx += 1 if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end: result[order[cur_idx]] = (segment_start, end) if sub.data_ext_polygon_offset: s = self.ext_poly_offset + sub.data_ext_polygon_offset e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length) if e > s: result[SEG_EXTPOLYGON] = (s, e) if sub.data_ext_polyline_offset: s = self.ext_line_offset + sub.data_ext_polyline_offset e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length) if e > s: result[SEG_EXTPOLYLINE] = (s, e) if sub.data_ext_poi_offset: s = self.ext_poi_offset + sub.data_ext_poi_offset e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length) if e > s: result[SEG_EXTPOINT] = (s, e) return result def parse_features(self) -> List[Feature]: # Finalize subdivision end markers using RGN section lengths. ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) for i, sub in enumerate(ordered): if sub.data_end == 0: sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset for attr_start, final_end in [ ("data_ext_polygon_offset", self.ext_poly_length), ("data_ext_polyline_offset", self.ext_line_length), ("data_ext_poi_offset", self.ext_poi_length), ]: items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)), key=lambda s: getattr(s, attr_start)) for i, sub in enumerate(items): if attr_start == "data_ext_polygon_offset": setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) elif attr_start == "data_ext_polyline_offset": setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) else: setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) feats: List[Feature] = [] for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index): segs = self._segments(sub) if segs[SEG_POINT]: feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False)) if segs[SEG_IPOINT]: feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True)) if segs[SEG_EXTPOINT]: feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT])) if segs[SEG_POLYLINE]: feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False)) if segs[SEG_POLYGON]: feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False)) if segs[SEG_EXTPOLYLINE]: feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True)) if segs[SEG_EXTPOLYGON]: feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True)) return feats def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]: feats: List[Feature] = [] pos, end = seg while pos < end and pos + 8 <= len(self.data): typ = self.data[pos] info24 = read_u24le(self.data, pos + 1) has_subtype = bool(info24 & 0x800000) is_poi = bool(info24 & 0x400000) lbl_off = info24 & 0x3FFFFF lon_delta = read_s16le(self.data, pos + 4) lat_delta = read_s16le(self.data, pos + 6) pos += 8 subtype = 0 if has_subtype and pos < end: subtype = self.data[pos] pos += 1 name = self.lbl.get_label(lbl_off) if lbl_off else "" lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) feats.append(Feature( geom_type="Point", coords=[lon, lat], props={ "garmin_kind": "indexed_point" if indexed else "point", "garmin_type": f"0x{typ:02x}", "garmin_subtype": f"0x{subtype:02x}", "garmin_is_poi": is_poi, "name": name, }, )) return feats def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]: feats: List[Feature] = [] pos, end = seg while pos < end and pos + 6 <= len(self.data): typ = self.data[pos] subtype_raw = self.data[pos + 1] has_lbl = bool(subtype_raw & 0x20) subtype = subtype_raw % 32 full_type = ((typ + 0x100) << 8) + subtype lon_delta = read_s16le(self.data, pos + 2) lat_delta = read_s16le(self.data, pos + 4) pos += 6 lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 if has_lbl: pos += 3 name = self.lbl.get_label(lbl_off) if lbl_off else "" lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) feats.append(Feature( geom_type="Point", coords=[lon, lat], props={ "garmin_kind": "extended_point", "garmin_type": f"0x{full_type:04x}", "name": name, }, )) return feats def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]: feats: List[Feature] = [] pos, end = seg while pos < end: try: if not extended: if pos + 10 > end: break info1 = self.data[pos] pos += 1 if line: typ = info1 & 0x3F direction = bool(info1 & 0x40) else: typ = info1 & 0x7F direction = False two_byte_len = bool(info1 & 0x80) info24 = read_u24le(self.data, pos) pos += 3 lbl_off = info24 & 0x3FFFFF extra_bit = 1 if (info24 & 0x400000) else 0 data_in_net = bool(info24 & 0x800000) lon_delta = read_s16le(self.data, pos) lat_delta = read_s16le(self.data, pos + 2) pos += 4 bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos] pos += 2 if two_byte_len else 1 bitstream_info = self.data[pos] pos += 1 long_sign = 0 lat_sign = 0 long_extra_bit = extra_bit lat_extra_bit = 0 full_type = typ else: if pos + 8 > end: break typ = self.data[pos] subtype_raw = self.data[pos + 1] has_lbl = bool(subtype_raw & 0x20) subtype = subtype_raw % 32 full_type = ((typ + 0x100) << 8) + subtype lon_delta = read_s16le(self.data, pos + 2) lat_delta = read_s16le(self.data, pos + 4) pos += 6 bitstream_len_byte = self.data[pos] pos += 1 if bitstream_len_byte % 2 == 0: if pos >= end: break bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1 pos += 1 else: bitstream_len = bitstream_len_byte // 2 - 1 bitstream_info = self.data[pos] pos += 1 direction = False data_in_net = False long_sign = 0 lat_sign = 0 long_extra_bit = 0 lat_extra_bit = 0 reader = BitStreamReader(self.data, pos, bitstream_len) if reader.read_next_bits(1) != 0: long_sign = +1 if reader.read_next_bits(1) == 0 else -1 if reader.read_next_bits(1) != 0: lat_sign = +1 if reader.read_next_bits(1) == 0 else -1 if extended: long_extra_bit = reader.read_next_bits(1) long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit) lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit) cur_lon = lon_delta cur_lat = lat_delta pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]] cur_lon <<= long_extra_bit cur_lat <<= lat_extra_bit while reader.has_next(long_bits + lat_bits): dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit) dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit) cur_lon += dlon cur_lat += dlat pts.append([ to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)), to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)), ]) pos = reader.finish() lbl_off = 0 if extended else lbl_off if extended: lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 if has_lbl: pos += 3 name = self.lbl.get_label(lbl_off) if lbl_off else "" if not line: if pts and pts[0] != pts[-1]: pts.append(pts[0]) feats.append(Feature( geom_type="Polygon", coords=[pts], props={ "garmin_kind": "extended_polygon" if extended else "polygon", "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", "garmin_direction": direction, "garmin_data_in_net": data_in_net, "name": name, }, )) else: feats.append(Feature( geom_type="LineString", coords=pts, props={ "garmin_kind": "extended_polyline" if extended else "polyline", "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", "garmin_direction": direction, "garmin_data_in_net": data_in_net, "name": name, }, )) except Exception: # Stop current segment on malformed data instead of crashing the whole file. break return feats # ------------------------- # Output writers and semantic mapping # ------------------------- def feature_to_geojson(f: Feature) -> Dict[str, object]: props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})} return { "type": "Feature", "geometry": {"type": f.geom_type, "coordinates": f.coords}, "properties": props, } def _osm_escape(v: object) -> str: return xml_escape(str(v), {'"': '"'}) def _maybe_open_text(path: Path): if str(path).lower().endswith('.gz'): return gzip.open(path, 'wt', encoding='utf-8', newline='\n') return open(path, 'w', encoding='utf-8', newline='\n') def _parse_bbox(text: Optional[str]) -> Optional[Tuple[float, float, float, float]]: if not text: return None parts = [p.strip() for p in text.split(',')] if len(parts) != 4: raise ValueError('bbox must be west,south,east,north') west, south, east, north = map(float, parts) if west > east or south > north: raise ValueError('invalid bbox ordering') return west, south, east, north def _feature_bounds(f: Feature) -> Tuple[float, float, float, float]: if f.geom_type == 'Point': lon, lat = f.coords return lon, lat, lon, lat if f.geom_type == 'LineString': pts = f.coords else: pts = f.coords[0] xs = [p[0] for p in pts] ys = [p[1] for p in pts] return min(xs), min(ys), max(xs), max(ys) def _intersects_bbox(f: Feature, bbox: Optional[Tuple[float, float, float, float]]) -> bool: if bbox is None: return True west, south, east, north = bbox a_w, a_s, a_e, a_n = _feature_bounds(f) return not (a_w > east or a_e < west or a_s > north or a_n < south) def _all_mapsets(files: Dict[str, bytes]) -> Dict[str, Dict[str, bytes]]: groups: Dict[str, Dict[str, bytes]] = defaultdict(dict) for key, data in files.items(): if '.' not in key: continue base, ext = key.rsplit('.', 1) groups[base.upper()][ext.upper()] = data out: Dict[str, Dict[str, bytes]] = {} for base, subs in groups.items(): if 'TRE' in subs and 'RGN' in subs: out[base] = subs return dict(sorted(out.items())) # Default semantic mapping. These are based on common Garmin/mkgmap conventions, # plus a few heuristics for map labels commonly found in topographic IMG files. LINE_TAGS: Dict[str, Dict[str, str]] = { '0x01': {'highway': 'motorway'}, '0x02': {'highway': 'primary'}, '0x03': {'highway': 'secondary'}, '0x04': {'highway': 'tertiary'}, '0x05': {'highway': 'unclassified'}, '0x06': {'highway': 'residential'}, '0x07': {'highway': 'service'}, '0x08': {'highway': 'construction'}, '0x09': {'highway': 'road'}, '0x0a': {'highway': 'track', 'surface': 'unpaved'}, '0x0c': {'highway': 'road', 'junction': 'roundabout'}, '0x0d': {'highway': 'path'}, '0x0e': {'highway': 'track', 'tracktype': 'grade1'}, '0x0f': {'highway': 'track', 'tracktype': 'grade2'}, '0x10': {'highway': 'track', 'tracktype': 'grade3'}, '0x11': {'highway': 'track', 'tracktype': 'grade4'}, '0x12': {'highway': 'track', 'tracktype': 'grade5'}, '0x13': {'highway': 'steps'}, '0x14': {'railway': 'rail'}, '0x15': {'natural': 'coastline'}, '0x16': {'highway': 'cycleway'}, '0x17': {'highway': 'bridleway'}, '0x18': {'waterway': 'stream'}, '0x1a': {'route': 'ferry'}, '0x1f': {'waterway': 'river'}, '0x27': {'aeroway': 'runway'}, '0x28': {'man_made': 'pipeline'}, '0x29': {'power': 'line'}, '0x31': {'natural': 'cliff'}, '0x32': {'barrier': 'wall'}, '0x33': {'barrier': 'fence'}, '0x34': {'barrier': 'hedge'}, '0x38': {'aerialway': 'cable_car'}, '0x39': {'railway': 'tram'}, } POLYGON_TAGS: Dict[str, Dict[str, str]] = { '0x03': {'landuse': 'residential'}, '0x05': {'amenity': 'parking'}, '0x09': {'leisure': 'marina'}, '0x0b': {'amenity': 'hospital'}, '0x0c': {'landuse': 'industrial'}, '0x14': {'natural': 'heath'}, '0x15': {'natural': 'wood'}, '0x16': {'leisure': 'nature_reserve'}, '0x17': {'leisure': 'park'}, '0x18': {'leisure': 'golf_course'}, '0x19': {'leisure': 'sports_centre'}, '0x1a': {'landuse': 'cemetery'}, '0x2a': {'landuse': 'farmland'}, '0x2b': {'landuse': 'farmyard'}, '0x2c': {'landuse': 'vineyard'}, '0x2d': {'landuse': 'quarry'}, '0x2e': {'tourism': 'camp_site'}, '0x32': {'natural': 'water', 'water': 'sea'}, '0x35': {'landuse': 'meadow'}, '0x3c': {'natural': 'water'}, '0x3d': {'natural': 'beach'}, '0x3e': {'natural': 'water'}, '0x3f': {'landuse': 'reservoir'}, '0x40': {'natural': 'water'}, '0x41': {'natural': 'water'}, '0x46': {'waterway': 'riverbank'}, '0x4c': {'natural': 'water', 'intermittent': 'yes'}, '0x4d': {'natural': 'glacier'}, '0x4e': {'landuse': 'orchard'}, '0x4f': {'natural': 'scrub'}, '0x50': {'natural': 'wood'}, '0x51': {'natural': 'wetland'}, '0x52': {'natural': 'heath'}, # heuristic: Garmin default "Tundra" '0x53': {'natural': 'bare_rock'}, # heuristic: Garmin default "Flat" } POINT_TAGS: Dict[Tuple[str, Optional[str]], Dict[str, str]] = { ('0x04', '0x00'): {'place': 'city'}, ('0x08', '0x00'): {'place': 'town'}, ('0x0a', '0x00'): {'place': 'suburb'}, ('0x0b', '0x00'): {'place': 'village'}, ('0x0d', '0x00'): {'place': 'village'}, # heuristic for this sample topo IMG ('0x11', '0x00'): {'place': 'hamlet'}, ('0x28', '0x00'): {'place': 'locality'}, # heuristic: local named spot labels in sample ('0x64', '0x03'): {'amenity': 'grave_yard'}, ('0x64', '0x06'): {'highway': 'crossing'}, ('0x64', '0x11'): {'man_made': 'tower'}, ('0x64', '0x14'): {'amenity': 'drinking_water'}, ('0x64', '0x17'): {'amenity': 'hunting_stand'}, ('0x64', '0x18'): {'amenity': 'grit_bin'}, ('0x65', '0x0a'): {'natural': 'glacier'}, ('0x65', '0x0c'): {'place': 'island'}, ('0x65', '0x11'): {'natural': 'spring'}, ('0x66', '0x04'): {'natural': 'beach'}, ('0x66', '0x07'): {'natural': 'cliff'}, ('0x66', '0x0e'): {'natural': 'volcano'}, ('0x66', '0x16'): {'natural': 'peak'}, ('0x66', '0x19'): {'natural': 'cave_entrance'}, } def _garmin_type_int(value: Optional[str]) -> Optional[int]: if value is None: return None s = str(value).strip().lower() if not s: return None try: return int(s, 16) if s.startswith('0x') else int(s, 0) except ValueError: return None def gpxsee_classes_for_feature(f: Feature) -> List[str]: """Classify a Garmin object using GPXSee-style type predicates from style_img.h. GPXSee stores classic Garmin object ids as type<<8 (and standard points as type<<8|subtype). Extended objects already carry their expanded ids. """ gt = _garmin_type_int(f.props.get('garmin_type')) if gt is None: return [] kind = str(f.props.get('garmin_kind') or '') st = _garmin_type_int(f.props.get('garmin_subtype')) or 0 if gt < 0x10000: if kind in ('point', 'indexed_point'): gt = (gt << 8) | st else: gt = gt << 8 classes: List[str] = [] # GPXSee Style:: static predicates (ported from style_img.h). if not ((0x0100 <= gt <= 0x1F00) or (0x11400 <= gt < 0x11500)): classes.append('poi') if (0x2000 <= gt <= 0x2500) or ((gt & 0xFFFF00) == 0x10900): classes.append('contour_line') if (0x3C00 <= gt <= 0x4400) or ((gt & 0xFFFF00) == 0x10B00): classes.append('water_area') if gt in (0x2600, 0x1800, 0x1F00): classes.append('water_line') if gt in (0x0400, 0x10901): classes.append('military_area') if gt in (0x1600, 0x10A03): classes.append('nature_reserve') if gt in (0x6200, 0x6300): classes.append('spot') if gt == 0x6616: classes.append('summit') if gt <= 0x0400: classes.append('major_road') if 0x1400 <= gt <= 0x153F: classes.append('country') if gt == 0x1E00: classes.append('state') if gt == 0x10703: classes.append('marina') if gt == 0x10613: classes.append('raster') if 0x10301 <= gt <= 0x10302: classes.append('depth_point') if 0x10400 <= gt <= 0x10401: classes.append('obstruction_point') if 0x10200 <= gt < 0x10300: classes.append('buoy') if 0x10100 <= gt < 0x10200: classes.append('light') if gt == 0x10500: classes.append('label_point') if gt == 0x10300: classes.append('dh_point') if 0x10100 <= gt < 0x10A00: classes.append('marine_point') if 0x10400 <= gt < 0x10700: classes.append('styled_line') if gt == 0x10601: classes.append('cartographic_line') if gt == 0x10108: classes.append('recommended_route') return classes def _feature_type_rows(features: List[Feature], point_only: bool = False) -> List[Dict[str, object]]: groups: Dict[Tuple[str, str, str, str], Dict[str, object]] = {} for f in features: if point_only and not _point_feature(f): continue sem = semantic_tags_for_feature(f) classes = gpxsee_classes_for_feature(f) key = ( f.geom_type, str(f.props.get('garmin_kind') or ''), str(f.props.get('garmin_type') or ''), str(f.props.get('garmin_subtype') or ''), ) g = groups.setdefault(key, { 'geom_type': key[0], 'garmin_kind': key[1], 'garmin_type': key[2], 'garmin_subtype': key[3], 'count': 0, 'named_count': 0, 'sample_name': '', 'semantic': {}, 'gpxsee_classes': set(), }) g['count'] += 1 if sem.get('name'): g['named_count'] += 1 if not g['sample_name']: g['sample_name'] = sem['name'] if not g['semantic']: g['semantic'] = {k: v for k, v in sem.items() if k != 'name'} for c in classes: g['gpxsee_classes'].add(c) rows = [] for (_, _, _, _), meta in sorted(groups.items(), key=lambda kv: (-kv[1]['count'], kv[0])): row = dict(meta) row['gpxsee_classes'] = sorted(row['gpxsee_classes']) rows.append(row) return rows def write_type_summary_csv(rows: List[Dict[str, object]], path: Path) -> None: fields = [ 'geom_type', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'count', 'named_count', 'gpxsee_classes_json', 'semantic_tags_json', 'sample_name' ] if str(path).lower().endswith('.gz'): fh = gzip.open(path, 'wt', encoding='utf-8', newline='') else: fh = open(path, 'w', encoding='utf-8', newline='') with fh: w = csv.DictWriter(fh, fieldnames=fields) w.writeheader() for row in rows: w.writerow({ 'geom_type': row['geom_type'], 'garmin_kind': row['garmin_kind'], 'garmin_type': row['garmin_type'], 'garmin_subtype': row['garmin_subtype'], 'count': row['count'], 'named_count': row['named_count'], 'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False), 'semantic_tags_json': json.dumps(row['semantic'], ensure_ascii=False, sort_keys=True), 'sample_name': row['sample_name'], }) def write_type_summary_json(rows: List[Dict[str, object]], path: Path) -> None: payload = {'rows': rows} if str(path).lower().endswith('.gz'): with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh: json.dump(payload, fh, ensure_ascii=False) else: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8') def _parse_ele_from_name(name: str) -> Optional[str]: if not name: return None t = name.strip().replace(',', '.') if not t: return None try: v = float(t) except ValueError: return None if abs(v) < 20000: if v.is_integer(): return str(int(v)) return str(v) return None def semantic_tags_for_feature(f: Feature) -> Dict[str, str]: kind = f.props.get('garmin_kind', '') gtype = f.props.get('garmin_type') subtype = f.props.get('garmin_subtype') name = f.props.get('name') or '' sem: Dict[str, str] = {} if kind in ('polyline', 'extended_polyline'): if gtype in ('0x20', '0x21', '0x22'): sem['contour'] = 'elevation' sem['contour_ext'] = { '0x20': 'elevation_minor', '0x21': 'elevation_medium', '0x22': 'elevation_major', }[gtype] ele = _parse_ele_from_name(name) if ele is not None: sem['ele'] = ele elif gtype in LINE_TAGS: sem.update(LINE_TAGS[gtype]) elif kind == 'extended_polyline': # Fallback heuristic for common topo extended trail/path style objects. if gtype in ('0x10e11', '0x10e12', '0x10e13', '0x10e14', '0x10e1c', '0x10e1d', '0x10e1f', '0x10f12', '0x10f14', '0x10f16'): sem['highway'] = 'path' elif kind in ('polygon', 'extended_polygon'): if gtype in POLYGON_TAGS: sem.update(POLYGON_TAGS[gtype]) elif kind in ('point', 'indexed_point', 'extended_point'): key = (gtype, subtype) if key in POINT_TAGS: sem.update(POINT_TAGS[key]) elif gtype == '0x66' and subtype == '0x18': sem['natural'] = 'hill' # heuristic fallback elif gtype == '0x65' and subtype == '0x00' and name: sem['place'] = 'locality' elif gtype == '0x66' and name: sem['place'] = 'locality' if name: sem['name'] = name return sem def tags_for_feature(f: Feature, semantic: bool = True) -> Dict[str, str]: tags: Dict[str, str] = {} if semantic: tags.update(semantic_tags_for_feature(f)) kind = f.props.get('garmin_kind') gtype = f.props.get('garmin_type') if kind: tags['garmin:kind'] = str(kind) if gtype: tags['garmin:type'] = str(gtype) if f.props.get('garmin_subtype'): tags['garmin:subtype'] = str(f.props['garmin_subtype']) if f.props.get('garmin_is_poi'): tags['garmin:is_poi'] = 'yes' return tags def _is_useful_feature(tags: Dict[str, str]) -> bool: # Keep only features with at least one semantic tag or a name. for k in tags: if not k.startswith('garmin:'): return True return 'name' in tags def _node_key(lon: float, lat: float) -> Tuple[int, int]: # Quantized key for shared way node reuse. return (int(round(lon * 1e7)), int(round(lat * 1e7))) def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple[List[Feature], Dict[str, object]]: tre = TRE(subfiles['TRE']) lbl = LBL(subfiles.get('LBL')) rgn = RGN(subfiles['RGN'], tre=tre, lbl=lbl) features = rgn.parse_features() meta = { 'mapset': mapset_name, 'bounds_wgs84': { 'north': to_deg(tre.north), 'east': to_deg(tre.east), 'south': to_deg(tre.south), 'west': to_deg(tre.west), }, 'feature_count': len(features), 'levels': {lvl: {'bits_per_coord': li.bits_per_coord, 'inherited': li.inherited} for lvl, li in tre.levels.items()}, } return features, meta def collect_type_stats(features: Iterable[Feature]) -> Dict[str, object]: by_kind = Counter() by_type = Counter() by_type_sub = Counter() for f in features: kind = f.props.get('garmin_kind') or 'unknown' typ = f.props.get('garmin_type') or 'unknown' sub = f.props.get('garmin_subtype') or '' by_kind[kind] += 1 by_type[f'{kind}:{typ}'] += 1 if sub: by_type_sub[f'{kind}:{typ}:{sub}'] += 1 return { 'by_kind': dict(by_kind.most_common()), 'by_type': dict(by_type.most_common()), 'by_type_subtype': dict(by_type_sub.most_common()), } def write_geojson(features: List[Feature], path: Path) -> None: if str(path).lower().endswith('.gz'): with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh: json.dump({ 'type': 'FeatureCollection', 'features': [feature_to_geojson(f) for f in features], }, fh, ensure_ascii=False) else: path.write_text(json.dumps({ 'type': 'FeatureCollection', 'features': [feature_to_geojson(f) for f in features], }, ensure_ascii=False, indent=2), encoding='utf-8') def _serialize_osm_chunk(fh, features: List[Feature], node_id: int, way_id: int, semantic: bool = True) -> Tuple[int, int]: line_nodes: Dict[Tuple[int, int], int] = {} plain_nodes: Dict[int, Tuple[float, float]] = {} point_nodes: List[str] = [] ways: List[Tuple[int, List[int], Dict[str, str]]] = [] def alloc_node(lon: float, lat: float) -> int: nonlocal node_id key = _node_key(lon, lat) if key in line_nodes: return line_nodes[key] nid = node_id node_id -= 1 line_nodes[key] = nid plain_nodes[nid] = (lon, lat) return nid for f in features: tags = tags_for_feature(f, semantic=semantic) if not _is_useful_feature(tags): continue if f.geom_type == 'Point': lon, lat = f.coords nid = node_id node_id -= 1 node_lines = [f' '] for k, v in tags.items(): node_lines.append(f' ') node_lines.append(' ') point_nodes.append('\n'.join(node_lines)) else: coords = f.coords if f.geom_type == 'LineString' else f.coords[0] node_ids = [alloc_node(lon, lat) for lon, lat in coords] if len(node_ids) < 2: continue wid = way_id way_id -= 1 if f.geom_type == 'Polygon': tags['area'] = 'yes' ways.append((wid, node_ids, tags)) for nid in sorted(plain_nodes.keys(), reverse=True): lon, lat = plain_nodes[nid] fh.write(f' \n') for chunk in point_nodes: fh.write(chunk) fh.write('\n') for wid, node_ids, tags in ways: fh.write(f' \n') for nid in node_ids: fh.write(f' \n') for k, v in tags.items(): fh.write(f' \n') fh.write(' \n') return node_id, way_id def write_osm(features: List[Feature], path: Path, semantic: bool = True) -> None: with _maybe_open_text(path) as fh: fh.write('\n') fh.write('\n') _serialize_osm_chunk(fh, features, node_id=-1, way_id=-1, semantic=semantic) fh.write('\n') def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] = None, bbox: Optional[Tuple[float, float, float, float]] = None, semantic: bool = True) -> Dict[str, object]: raw = img_path.read_bytes() container = ImgContainer(raw) all_sets = _all_mapsets(container.files) selected = set(s.upper() for s in mapsets) if mapsets else None total_kind_counter = Counter() total_features = 0 mapset_meta: List[Dict[str, object]] = [] node_id = -1 way_id = -1 with _maybe_open_text(path) as fh: fh.write('\n') fh.write('\n') for name, subs in all_sets.items(): if selected and name.upper() not in selected: continue feats, meta = parse_mapset_features(name, subs) if bbox is not None: feats = [f for f in feats if _intersects_bbox(f, bbox)] meta['feature_count_after_bbox'] = len(feats) total_features += len(feats) for f in feats: total_kind_counter[f.props.get('garmin_kind') or 'unknown'] += 1 node_id, way_id = _serialize_osm_chunk(fh, feats, node_id=node_id, way_id=way_id, semantic=semantic) mapset_meta.append(meta) fh.write('\n') return { 'img_file': str(img_path), 'block_size': container.block_size, 'mapset_count': len(all_sets), 'selected_mapsets': mapsets or sorted(all_sets.keys()), 'mapsets': mapset_meta, 'feature_count': total_features, 'kind_counts': dict(total_kind_counter), } def load_features_from_img( img_path: Path, mapsets: Optional[List[str]] = None, bbox: Optional[Tuple[float, float, float, float]] = None, ) -> Tuple[List[Feature], Dict[str, object]]: raw = img_path.read_bytes() container = ImgContainer(raw) all_sets = _all_mapsets(container.files) selected = set(s.upper() for s in mapsets) if mapsets else None features: List[Feature] = [] mapset_meta: List[Dict[str, object]] = [] for name, subs in all_sets.items(): if selected and name.upper() not in selected: continue feats, meta = parse_mapset_features(name, subs) if bbox is not None: feats = [f for f in feats if _intersects_bbox(f, bbox)] meta['feature_count_after_bbox'] = len(feats) features.extend(feats) mapset_meta.append(meta) meta = { 'img_file': str(img_path), 'block_size': container.block_size, 'mapset_count': len(all_sets), 'selected_mapsets': mapsets or sorted(all_sets.keys()), 'mapsets': mapset_meta, 'feature_count': len(features), 'type_stats': collect_type_stats(features), } return features, meta def _point_feature(f: Feature) -> bool: return f.geom_type == 'Point' and (f.props.get('garmin_kind') in ('point', 'indexed_point', 'extended_point')) def _parse_kv_filters(values: Optional[List[str]]) -> List[Tuple[str, str]]: out: List[Tuple[str, str]] = [] for v in values or []: if '=' not in v: raise SystemExit(f'invalid --filter-tag value {v!r}, expected key=value') k, val = v.split('=', 1) out.append((k.strip(), val.strip())) return out def _category_match(f: Feature, sem: Dict[str, str], category: Optional[str]) -> bool: if not category: return True c = category.strip().lower() classes = set(gpxsee_classes_for_feature(f)) if c == 'water_sources': return sem.get('amenity') == 'drinking_water' or sem.get('natural') == 'spring' if c == 'peaks': return sem.get('natural') == 'peak' or 'summit' in classes if c == 'caves': return sem.get('natural') == 'cave_entrance' if c == 'settlements': return 'place' in sem if c == 'water_landmarks': return sem.get('amenity') == 'drinking_water' or sem.get('natural') == 'spring' or sem.get('natural') == 'water' or sem.get('waterway') in ('stream', 'river') or 'water_area' in classes or 'water_line' in classes if c == 'marine_points': return 'marine_point' in classes or 'light' in classes or 'buoy' in classes if c == 'depth_points': return 'depth_point' in classes if c == 'lights': return 'light' in classes if c == 'buoys': return 'buoy' in classes raise SystemExit(f'unknown --category {category!r}; supported: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys') def _feature_matches( f: Feature, *, point_only: bool = False, categories: Optional[List[str]] = None, filter_kind: Optional[List[str]] = None, filter_type: Optional[List[str]] = None, filter_subtype: Optional[List[str]] = None, filter_tags: Optional[List[Tuple[str, str]]] = None, gpxsee_classes: Optional[List[str]] = None, named_only: bool = False, ) -> bool: if point_only and not _point_feature(f): return False kind = str(f.props.get('garmin_kind') or '') gtype = str(f.props.get('garmin_type') or '') subtype = str(f.props.get('garmin_subtype') or '') if filter_kind and kind not in set(filter_kind): return False if filter_type and gtype not in set(filter_type): return False if filter_subtype and subtype not in set(filter_subtype): return False sem = semantic_tags_for_feature(f) if categories: if not any(_category_match(f, sem, c) for c in categories): return False for k, v in (filter_tags or []): if sem.get(k) != v: return False if gpxsee_classes: classes = set(gpxsee_classes_for_feature(f)) wanted = {c.strip() for c in gpxsee_classes if c.strip()} if not (classes & wanted): return False if named_only and not sem.get('name'): return False return True def _feature_point_row(f: Feature) -> Dict[str, object]: sem = semantic_tags_for_feature(f) lon, lat = f.coords return { 'lon': lon, 'lat': lat, 'name': sem.get('name', ''), 'garmin_kind': f.props.get('garmin_kind', ''), 'garmin_type': f.props.get('garmin_type', ''), 'garmin_subtype': f.props.get('garmin_subtype', ''), 'semantic_tags': sem, 'gpxsee_classes': gpxsee_classes_for_feature(f), } def write_landmarks_csv(features: List[Feature], path: Path) -> None: fields = ['lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'gpxsee_classes_json', 'semantic_tags_json'] if str(path).lower().endswith('.gz'): fh = gzip.open(path, 'wt', encoding='utf-8', newline='') else: fh = open(path, 'w', encoding='utf-8', newline='') with fh: w = csv.DictWriter(fh, fieldnames=fields) w.writeheader() for f in features: row = _feature_point_row(f) w.writerow({ 'lon': f'{row["lon"]:.8f}', 'lat': f'{row["lat"]:.8f}', 'name': row['name'], 'garmin_kind': row['garmin_kind'], 'garmin_type': row['garmin_type'], 'garmin_subtype': row['garmin_subtype'], 'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False), 'semantic_tags_json': json.dumps(row['semantic_tags'], ensure_ascii=False, sort_keys=True), }) def write_landmarks_geojson(features: List[Feature], path: Path) -> None: fc = { 'type': 'FeatureCollection', 'features': [], } for f in features: row = _feature_point_row(f) props = { 'name': row['name'], 'garmin_kind': row['garmin_kind'], 'garmin_type': row['garmin_type'], 'garmin_subtype': row['garmin_subtype'], 'gpxsee_classes': ','.join(row['gpxsee_classes']), } props.update(row['semantic_tags']) fc['features'].append({ 'type': 'Feature', 'geometry': {'type': 'Point', 'coordinates': [row['lon'], row['lat']]}, 'properties': props, }) if str(path).lower().endswith('.gz'): with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh: json.dump(fc, fh, ensure_ascii=False) else: path.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding='utf-8') def print_feature_type_table(features: List[Feature], point_only: bool = False) -> None: rows = _feature_type_rows(features, point_only=point_only) print('geom_type garmin_kind garmin_type garmin_subtype count named_count gpxsee_classes semantic_tags sample_name') for row in rows: print('\t'.join([ row['geom_type'], row['garmin_kind'], row['garmin_type'], row['garmin_subtype'], str(row['count']), str(row['named_count']), json.dumps(row['gpxsee_classes'], ensure_ascii=False), json.dumps(row['semantic'], ensure_ascii=False, sort_keys=True), str(row['sample_name']), ])) def main() -> int: ap = argparse.ArgumentParser(description='Extract vector features from a Garmin IMG and export GeoJSON / OSM XML suitable for further conversion to OsmAnd .obf.') ap.add_argument('img', type=Path, help='Input Garmin .img file') ap.add_argument('--geojson', type=Path, help='Write GeoJSON or .geojson.gz output') ap.add_argument('--osm', type=Path, help='Write OSM XML or .osm.gz output') ap.add_argument('--meta-json', type=Path, help='Write parse metadata JSON') ap.add_argument('--mapset', action='append', help='Process only this TRE/RGN family id (repeatable), e.g. 02234008') ap.add_argument('--bbox', help='Clip by WGS84 bbox: west,south,east,north') ap.add_argument('--list-mapsets', action='store_true', help='List available mapsets and exit') ap.add_argument('--list-feature-types', action='store_true', help='List unique parsed Garmin feature types with counts') ap.add_argument('--list-landmark-types', action='store_true', help='List unique point landmark types with counts') ap.add_argument('--landmark-types-csv', type=Path, help='Export landmark type summary table to CSV or CSV.GZ') ap.add_argument('--landmark-types-json', type=Path, help='Export landmark type summary table to JSON or JSON.GZ') ap.add_argument('--landmarks-csv', type=Path, help='Export exact-coordinate point landmarks to CSV or CSV.GZ') ap.add_argument('--landmarks-geojson', type=Path, help='Export exact-coordinate point landmarks to GeoJSON or GeoJSON.GZ') ap.add_argument('--category', action='append', help='Filter landmarks/features by semantic category: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys') ap.add_argument('--filter-kind', action='append', help='Filter by garmin kind, e.g. point, indexed_point, extended_point, polyline') ap.add_argument('--filter-type', action='append', help='Filter by Garmin type hex string, e.g. 0x64') ap.add_argument('--filter-subtype', action='append', help='Filter by Garmin subtype hex string, e.g. 0x14') ap.add_argument('--filter-tag', action='append', help='Filter by semantic tag key=value, e.g. natural=spring') ap.add_argument('--gpxsee-class', action='append', help='Filter by GPXSee-style class predicate, e.g. water_line, water_area, summit, marine_point, buoy, light') ap.add_argument('--named-only', action='store_true', help='Keep only features with a decoded name') ap.add_argument('--raw-only', action='store_true', help='Do not add semantic OSM tags; only preserve raw garmin:* tags') args = ap.parse_args() if args.list_mapsets: container = ImgContainer(args.img.read_bytes()) for name, subs in _all_mapsets(container.files).items(): tre = TRE(subs['TRE']) print(f'{name}\t{to_deg(tre.west):.6f},{to_deg(tre.south):.6f},{to_deg(tre.east):.6f},{to_deg(tre.north):.6f}') return 0 if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson: ap.error('provide at least one export/list option or use --list-mapsets') bbox = _parse_bbox(args.bbox) filter_tags = _parse_kv_filters(args.filter_tag) # Fast streaming OSM path when no feature post-filtering is requested. if args.osm and not args.geojson and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.category and not args.filter_kind and not args.filter_type and not args.filter_subtype and not args.filter_tag and not args.gpxsee_class and not args.named_only: meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only) info(f'parsed {meta.get("feature_count", 0)} features from {len(meta.get("mapsets", []))} mapsets') info(f'wrote OSM XML: {args.osm}') if args.meta_json: args.meta_json.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8') info(f'wrote metadata: {args.meta_json}') return 0 features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox) info(f'parsed {len(features)} features from {len(meta.get("mapsets", []))} mapsets') filtered = [ f for f in features if _feature_matches( f, point_only=bool(args.landmarks_csv or args.landmarks_geojson or args.list_landmark_types), categories=args.category, filter_kind=args.filter_kind, filter_type=args.filter_type, filter_subtype=args.filter_subtype, filter_tags=filter_tags, gpxsee_classes=args.gpxsee_class, named_only=args.named_only, ) ] if args.list_feature_types: print_feature_type_table(filtered, point_only=False) if args.list_landmark_types: print_feature_type_table(filtered, point_only=True) if args.landmark_types_csv or args.landmark_types_json: rows = _feature_type_rows(filtered, point_only=True) if args.landmark_types_csv: write_type_summary_csv(rows, args.landmark_types_csv) info(f'wrote landmark type CSV: {args.landmark_types_csv}') if args.landmark_types_json: write_type_summary_json(rows, args.landmark_types_json) info(f'wrote landmark type JSON: {args.landmark_types_json}') if args.landmarks_csv: point_features = [f for f in filtered if _point_feature(f)] write_landmarks_csv(point_features, args.landmarks_csv) info(f'wrote landmark CSV: {args.landmarks_csv}') if args.landmarks_geojson: point_features = [f for f in filtered if _point_feature(f)] write_landmarks_geojson(point_features, args.landmarks_geojson) info(f'wrote landmark GeoJSON: {args.landmarks_geojson}') if args.geojson: write_geojson(filtered, args.geojson) info(f'wrote GeoJSON: {args.geojson}') if args.osm: write_osm(filtered, args.osm, semantic=not args.raw_only) info(f'wrote OSM XML: {args.osm}') if args.meta_json: meta2 = dict(meta) meta2['feature_count_after_filters'] = len(filtered) args.meta_json.write_text(json.dumps(meta2, ensure_ascii=False, indent=2), encoding='utf-8') info(f'wrote metadata: {args.meta_json}') return 0 if __name__ == '__main__': raise SystemExit(main())