From 4be165918c776e66d2050e1c62ed28bdd7492322 Mon Sep 17 00:00:00 2001 From: nq Date: Tue, 14 Apr 2026 14:48:12 -0700 Subject: [PATCH] v1 --- garmin_img_to_osmand.py | 1118 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1118 insertions(+) create mode 100644 garmin_img_to_osmand.py diff --git a/garmin_img_to_osmand.py b/garmin_img_to_osmand.py new file mode 100644 index 0000000..faa0881 --- /dev/null +++ b/garmin_img_to_osmand.py @@ -0,0 +1,1118 @@ +#!/usr/bin/env python3 +""" +Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML. + +What it does well: +- Reads classic Garmin IMG container FAT and extracts subfiles. +- Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where + TRE/RGN/LBL offsets are stored inside the .GMP container. +- Parses TRE levels/subdivisions. +- Parses LBL labels (coding 6, 9, 10) with common codepage handling. +- Parses standard points, extended points, standard polylines/polygons, and + extended polylines/polygons from RGN. +- Exports GeoJSON and/or OSM XML. + +What it does NOT promise: +- Full Garmin NT routing/address semantics. +- Locked/compressed/vendor-obfuscated maps. +- Perfect type-to-OSM semantic translation. The exporter preserves Garmin type + codes as tags instead of inventing OSM semantics. + +This is a practical reverse-engineering tool, not a complete implementation of +all Garmin IMG variants. +""" + +from __future__ import annotations + +import argparse +import io +import json +import math +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Iterable, Iterator, List, Optional, Tuple +from xml.sax.saxutils import escape as xml_escape + +# ------------------------- +# Low-level helpers +# ------------------------- + +COORD_FACTOR = 360.0 / (1 << 24) +FAT_BLOCK_SIZE = 0x200 +FAT_ENTRY_SIZE = 0x200 +MAX_FAT_BLOCKLIST = 240 +SEG_POINT = 0 +SEG_IPOINT = 1 +SEG_POLYLINE = 2 +SEG_POLYGON = 3 +SEG_EXTPOLYGON = 4 +SEG_EXTPOLYLINE = 5 +SEG_EXTPOINT = 6 + +OBJ_POINT = 0x10 +OBJ_INDEXED_POINT = 0x20 +OBJ_POLYLINE = 0x40 +OBJ_POLYGON = 0x80 +OBJ_EXT_POLYGON = 0x100 +OBJ_EXT_POLYLINE = 0x200 +OBJ_EXT_POINT = 0x400 + + +def warn(msg: str) -> None: + print(f"[warn] {msg}", file=sys.stderr) + + +def info(msg: str) -> None: + print(f"[info] {msg}", file=sys.stderr) + + +def read_u16le(buf: bytes, off: int) -> int: + return int.from_bytes(buf[off:off + 2], "little", signed=False) + + +def read_s16le(buf: bytes, off: int) -> int: + return int.from_bytes(buf[off:off + 2], "little", signed=True) + + +def read_u24le(buf: bytes, off: int) -> int: + return int.from_bytes(buf[off:off + 3], "little", signed=False) + + +def read_s24le(buf: bytes, off: int) -> int: + raw = read_u24le(buf, off) + if raw & 0x800000: + raw -= 1 << 24 + return raw + + +def read_u32le(buf: bytes, off: int) -> int: + return int.from_bytes(buf[off:off + 4], "little", signed=False) + + +def to_deg(coord: int) -> float: + return coord * COORD_FACTOR + + +def decode_ascii_z(data: bytes) -> str: + return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip() + + +# ------------------------- +# Container extraction +# ------------------------- + +@dataclass +class FatRecord: + filename: str + ext: str + size: int + blocks: List[int] + offset_in_fat: int + + +class ImgContainer: + def __init__(self, raw: bytes): + self.raw = raw + # Some IMG files are XOR'd by a single byte stored at byte 0. + xor_byte = raw[0] + if xor_byte not in (0x00,): + maybe = bytes(b ^ xor_byte for b in raw) + sig = maybe[0x10:0x17] + ident = maybe[0x41:0x48] + if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"): + info(f"applied XOR decode with byte 0x{xor_byte:02x}") + self.raw = maybe + self.block_size = self._read_block_size() + self.fat_start = self._read_fat_start() + self.files = self._extract_subfiles() + + def _read_block_size(self) -> int: + e1 = self.raw[0x61] + e2 = self.raw[0x62] + return 1 << (e1 + e2) + + def _read_fat_start(self) -> int: + fat_phys_block = self.raw[0x40] + return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE + + def _parse_fat_chain(self) -> List[FatRecord]: + records: List[FatRecord] = [] + off = self.fat_start + seen_offsets = set() + while off + FAT_ENTRY_SIZE <= len(self.raw): + if off in seen_offsets: + break + seen_offsets.add(off) + first = self.raw[off] + if first != 0x01: + break + name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00") + ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00") + size = read_u32le(self.raw, off + 12) + next_fat = read_u16le(self.raw, off + 16) + blocks = [] + boff = off + 0x20 + for i in range(MAX_FAT_BLOCKLIST): + blk = read_u16le(self.raw, boff + i * 2) + if blk == 0xFFFF: + break + blocks.append(blk) + if next_fat == 0: + records.append(FatRecord(name, ext, size, blocks, off)) + off += FAT_ENTRY_SIZE + return records + + def _collect_blocks(self, start_record: FatRecord) -> bytes: + data = bytearray() + blocks = list(start_record.blocks) + current_offset = start_record.offset_in_fat + # Follow FAT continuation blocks when next_fat is used. + while True: + next_fat = read_u16le(self.raw, current_offset + 16) + if next_fat == 0: + break + current_offset += FAT_ENTRY_SIZE + if current_offset + FAT_ENTRY_SIZE > len(self.raw): + break + boff = current_offset + 0x20 + for i in range(MAX_FAT_BLOCKLIST): + blk = read_u16le(self.raw, boff + i * 2) + if blk == 0xFFFF: + break + blocks.append(blk) + for blk in blocks: + start = blk * self.block_size + end = start + self.block_size + if end > len(self.raw): + break + data.extend(self.raw[start:end]) + return bytes(data[:start_record.size]) + + def _extract_subfiles(self) -> Dict[str, bytes]: + out: Dict[str, bytes] = {} + for rec in self._parse_fat_chain(): + key = f"{rec.filename}.{rec.ext}".upper() + out[key] = self._collect_blocks(rec) + return out + + +# ------------------------- +# Core format structures +# ------------------------- + +@dataclass +class LevelInfo: + level: int + bits_per_coord: int + inherited: bool + present: bool = True + + +@dataclass +class Subdivision: + index: int + level: int + data_offset: int + object_types: int + lon_center: int + lat_center: int + width: int + height: int + index_next_level: int = 0 + last: bool = False + data_end: int = 0 + data_ext_polygon_offset: int = 0 + data_ext_polygon_end: int = 0 + data_ext_polyline_offset: int = 0 + data_ext_polyline_end: int = 0 + data_ext_poi_offset: int = 0 + data_ext_poi_end: int = 0 + children: List["Subdivision"] = field(default_factory=list) + + def nb_object_types(self) -> int: + count = 0 + cur = 0x10 + for _ in range(4): + if self.object_types & cur: + count += 1 + cur <<= 1 + return count + + +@dataclass +class Feature: + geom_type: str # Point | LineString | Polygon + coords: object + props: Dict[str, object] + + +# ------------------------- +# LBL parser +# ------------------------- + +class LBL: + NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', + 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', + '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] + SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~', + '~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~', + '~', '~', '~', '~', '~', '[', '\\', ']', '^', '_'] + SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', + '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] + + def __init__(self, data: Optional[bytes]): + self.data = data or b"" + self.ok = bool(data) + self.data_offset = 0 + self.data_length = 0 + self.data_offset_multiplier = 1 + self.label_coding = 6 + self.codepage = 1252 + if self.ok: + self._parse_header() + + def _parse_header(self) -> None: + header_length = read_u16le(self.data, 0) + self.data_offset = read_u32le(self.data, 0x15) + self.data_length = read_u32le(self.data, 0x19) + self.data_offset_multiplier = 1 << self.data[0x1D] + self.label_coding = self.data[0x1E] + if len(self.data) >= 0xAC: + self.codepage = read_u16le(self.data, 0xAA) + + def get_label(self, offset: int) -> str: + if not self.ok or offset == 0: + return "" + actual = self.data_offset + offset * self.data_offset_multiplier + if actual < 0 or actual >= len(self.data): + return "" + if self.label_coding == 6: + return self._get_label6(actual) + return self._get_label8_10(actual) + + def _get_label8_10(self, off: int) -> str: + end = off + while end < len(self.data) and self.data[end] != 0: + end += 1 + raw = self.data[off:end] + enc = None + cp = self.codepage + if cp in (0, 850): + enc = "cp1252" + elif cp == 65001: + enc = "utf-8" + elif cp == 932: + enc = "cp932" + elif cp == 950: + enc = "big5" + else: + enc = f"cp{cp}" + try: + return raw.decode(enc, errors="replace") + except Exception: + return raw.decode("latin1", errors="replace") + + def _get_label6(self, off: int) -> str: + out: List[str] = [] + charset = "NORMAL" + pos = off + while pos + 3 <= len(self.data): + b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2] + pos += 3 + codes = [ + b1 >> 2, + ((b1 & 0x3) << 4) | (b2 >> 4), + ((b2 & 0xF) << 2) | (b3 >> 6), + b3 & 0x3F, + ] + for c in codes: + if c > 0x2F: + return "".join(out).strip() + if charset == "NORMAL": + if c == 0x1C: + charset = "SYMBOL" + elif c == 0x1B: + charset = "SPECIAL" + elif c == 0x1D: + out.append("|") + elif c in (0x1E, 0x1F): + out.append(" ") + else: + out.append(self.NORMAL_CHARS[c]) + elif charset == "SYMBOL": + out.append(self.SYMBOL_CHARS[c]) + charset = "NORMAL" + else: + out.append(self.SPECIAL_CHARS[c]) + charset = "NORMAL" + return "".join(out).strip() + + +# ------------------------- +# TRE parser +# ------------------------- + +class TRE: + def __init__(self, data: bytes): + self.data = data + self.header_length = read_u16le(data, 0) + self.north = read_s24le(data, 0x15) + self.east = read_s24le(data, 0x18) + self.south = read_s24le(data, 0x1B) + self.west = read_s24le(data, 0x1E) + self.levels: Dict[int, LevelInfo] = {} + self.max_level = 0 + self.min_level = 15 + self.extended_types = False + self.extended_types_offset = 0 + self.extended_types_length = 0 + self.extended_types_size = 0 + self.extended_types_number = 0 + self.decalaje_extended_types = 0 + self.subdivisions_count = 1 + self.root_subdivisions: List[Subdivision] = [] + self.subdivisions_by_index: Dict[int, Subdivision] = {} + self._parse() + + def _parse(self) -> None: + self._parse_levels() + self._parse_tre7() + self._parse_subdivisions() + + def _parse_tre7(self) -> None: + if self.header_length >= 0x7C + 10: + self.extended_types_offset = read_u32le(self.data, 0x7C) + self.extended_types_length = read_u32le(self.data, 0x80) + self.extended_types_size = read_u16le(self.data, 0x84) + if self.extended_types_size > 0: + self.extended_types_number = self.extended_types_length // self.extended_types_size + self.extended_types = self.extended_types_length > 0 + self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number + + def _parse_levels(self) -> None: + levels_offset = read_u32le(self.data, 0x21) + levels_length = read_u32le(self.data, 0x25) + pos = levels_offset + end = levels_offset + levels_length + while pos + 4 <= end and pos + 4 <= len(self.data): + zoom = self.data[pos] + bits = self.data[pos + 1] + count = read_u16le(self.data, pos + 2) + _ = count + level = zoom & 0xF + inherited = bool(zoom & 0x80) + self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited) + self.max_level = max(self.max_level, level) + self.min_level = min(self.min_level, level) + self.subdivisions_count += count + pos += 4 + + def get_resolution(self, level: int) -> int: + return self.levels[level].bits_per_coord + + def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int: + shift = 24 - self.get_resolution(level) - additional_accuracy + if shift >= 0: + return value << shift + return value >> (-shift) + + def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]: + data_offset = read_u24le(self.data, pos) + object_types = self.data[pos + 3] + if object_types & 0x0F: + data_offset += (object_types & 0x0F) * (1 << 24) + lon_center = read_s24le(self.data, pos + 4) + lat_center = read_s24le(self.data, pos + 7) + width = read_u16le(self.data, pos + 10) + last = False + if width & 0x8000: + width &= 0x7FFF + last = True + height = read_u16le(self.data, pos + 12) + index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0 + sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types, + lon_center=lon_center, lat_center=lat_center, width=width, height=height, + index_next_level=index_next, last=last) + # Extended offsets per subdivision, if present. + if self.extended_types: + indice = index - self.decalaje_extended_types + if indice > 0 and self.extended_types_size >= 8: + p = self.extended_types_offset + (indice - 1) * self.extended_types_size + if p + self.extended_types_size <= len(self.data): + sub.data_ext_polygon_offset = read_u32le(self.data, p) + if self.extended_types_size >= 8: + sub.data_ext_polyline_offset = read_u32le(self.data, p + 4) + if self.extended_types_size >= 12: + sub.data_ext_poi_offset = read_u32le(self.data, p + 8) + return sub, pos + record_size + + def _parse_subdivisions(self) -> None: + sub_offset = read_u32le(self.data, 0x29) + sub_length = read_u32le(self.data, 0x2D) + end = sub_offset + sub_length + if end > len(self.data): + end = len(self.data) + + present_levels = sorted(self.levels.keys(), reverse=True) + if not present_levels: + return + current_root_level = present_levels[0] + index = 1 + pos = sub_offset + roots: List[Subdivision] = [] + # Parse all 16-byte records first until last root. + while pos + 16 <= end: + sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index) + roots.append(sub) + self.subdivisions_by_index[index] = sub + index += 1 + if sub.last: + break + self.root_subdivisions = roots + # Recursively parse children using the index_next_level scheme. + self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index) + # Compute data ends by sorted data offsets. + ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) + for i, sub in enumerate(ordered): + if i + 1 < len(ordered): + sub.data_end = ordered[i + 1].data_offset + else: + sub.data_end = 0 + # Extended segment ends. + for attr_start, attr_end in [ + ("data_ext_polygon_offset", "data_ext_polygon_end"), + ("data_ext_polyline_offset", "data_ext_polyline_end"), + ("data_ext_poi_offset", "data_ext_poi_end"), + ]: + items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)), + key=lambda s: getattr(s, attr_start)) + for i, sub in enumerate(items): + if i + 1 < len(items): + setattr(sub, attr_end, getattr(items[i + 1], attr_start)) + else: + setattr(sub, attr_end, 0) + + def _next_present_level(self, level: int) -> int: + while level > 0 and level not in self.levels: + level -= 1 + return level + + def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None: + level = self._next_present_level(level) + if level <= 0: + return + for parent in parents: + if parent.index_next_level <= 0: + continue + idx = parent.index_next_level + if idx <= 0: + continue + # Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level. + record_size = 14 if level == self.min_level else 16 + pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14 + # Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed. + if record_size == 14 and pos + 14 > end: + pos = min(end, sub_offset + len(self.root_subdivisions) * 16) + children: List[Subdivision] = [] + while pos + record_size <= end: + try: + sub, pos = self._parse_subdiv_record(pos, level, record_size, idx) + except Exception: + break + children.append(sub) + self.subdivisions_by_index[idx] = sub + idx += 1 + if sub.last: + break + parent.children = children + child_level = self._next_present_level(level - 1) + if child_level > 0 and children: + self._parse_children(children, child_level, sub_offset, end, idx) + + +# ------------------------- +# RGN parser +# ------------------------- + +class BitStreamReader: + def __init__(self, data: bytes, start: int, length_bytes: int): + self.data = data + self.pos = start + self.remaining_bytes = length_bytes + self.remaining_bits = 0 + self.cur_byte = 0 + + def has_next(self, nbits: int) -> bool: + return self.remaining_bytes * 8 + self.remaining_bits >= nbits + + def finish(self) -> int: + self.pos += self.remaining_bytes + self.remaining_bytes = 0 + self.remaining_bits = 0 + return self.pos + + def _get_if_needed(self) -> None: + if self.remaining_bits == 0: + if self.remaining_bytes <= 0: + raise EOFError + self.cur_byte = self.data[self.pos] + self.pos += 1 + self.remaining_bytes -= 1 + self.remaining_bits = 8 + + def read_next_bits(self, to_get: int) -> int: + cur_pos = 0 + result = 0 + while cur_pos < to_get: + self._get_if_needed() + remaining_to_get = to_get - cur_pos + if remaining_to_get >= self.remaining_bits: + result |= self.cur_byte << cur_pos + cur_pos += self.remaining_bits + self.remaining_bits = 0 + else: + mask = (1 << remaining_to_get) - 1 + result |= (self.cur_byte & mask) << cur_pos + self.cur_byte >>= remaining_to_get + self.remaining_bits -= remaining_to_get + return result + return result + + def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int: + if sign == 0: + value = self.read_next_bits(nbits) + sign_mask = 1 << (nbits - 1) + if value & sign_mask: + comp = value ^ sign_mask + if extra_bit == 0: + if comp != 0: + return comp - sign_mask + other = self.read_coord_offset(nbits, sign, extra_bit) + if other < 0: + return 1 - value + other + return value - 1 + other + else: + if comp & 0xFFFFFE: + return (comp & 0xFFFFFE) - sign_mask + other = self.read_coord_offset(nbits - 1, sign, 0) + if other < 0: + return 1 - sign_mask + 1 + (other << 1) + return sign_mask - 1 - 1 + (other << 1) + else: + if extra_bit > 0: + return value & 0xFFFFFE + return value + else: + value = self.read_next_bits(nbits) + if extra_bit > 0: + return (((value >> 1) * sign) << 1) + return value * sign + + +class RGN: + def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]): + self.data = data + self.tre = tre + self.lbl = lbl or LBL(None) + self.header_length = read_u16le(data, 0) + self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0 + self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0 + self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0 + self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0 + self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0 + self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0 + self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0 + self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0 + + def data_end(self) -> int: + return self.data_length + + def ext_polygon_end(self) -> int: + return self.ext_poly_length + + def ext_polyline_end(self) -> int: + return self.ext_line_length + + def ext_poi_end(self) -> int: + return self.ext_poi_length + + @staticmethod + def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int: + add = 0 + if sign == 0: + add += 1 + add += extra_bit + if i <= 9: + return i + 2 + add + return 2 * i - 9 + 2 + add + + def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int: + return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc) + + def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int: + return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc) + + def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]: + result: List[Optional[Tuple[int, int]]] = [None] * 7 + offset = sub.data_offset + self.data_offset + end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset + if sub.object_types == 0: + return result + if sub.data_end and end > len(self.data): + end = len(self.data) + if sub.data_end and end > offset and sub.nb_object_types() > 0: + if sub.object_types & OBJ_POINT: + result[SEG_POINT] = (0, 0) + if sub.object_types & OBJ_INDEXED_POINT: + result[SEG_IPOINT] = (0, 0) + if sub.object_types & OBJ_POLYLINE: + result[SEG_POLYLINE] = (0, 0) + if sub.object_types & OBJ_POLYGON: + result[SEG_POLYGON] = (0, 0) + order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON] + nb_pointers = sub.nb_object_types() - 1 + if offset + nb_pointers * 2 <= len(self.data): + segment_start = offset + nb_pointers * 2 + cur_idx = 0 + p = offset + for _ in range(nb_pointers): + while cur_idx < 4 and result[order[cur_idx]] is None: + cur_idx += 1 + if cur_idx >= 4: + break + segment_end = read_u16le(self.data, p) + offset + p += 2 + if segment_end > end or segment_end <= segment_start: + result[order[cur_idx]] = None + else: + result[order[cur_idx]] = (segment_start, segment_end) + segment_start = segment_end + cur_idx += 1 + while cur_idx < 4 and result[order[cur_idx]] is None: + cur_idx += 1 + if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end: + result[order[cur_idx]] = (segment_start, end) + if sub.data_ext_polygon_offset: + s = self.ext_poly_offset + sub.data_ext_polygon_offset + e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length) + if e > s: + result[SEG_EXTPOLYGON] = (s, e) + if sub.data_ext_polyline_offset: + s = self.ext_line_offset + sub.data_ext_polyline_offset + e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length) + if e > s: + result[SEG_EXTPOLYLINE] = (s, e) + if sub.data_ext_poi_offset: + s = self.ext_poi_offset + sub.data_ext_poi_offset + e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length) + if e > s: + result[SEG_EXTPOINT] = (s, e) + return result + + def parse_features(self) -> List[Feature]: + # Finalize subdivision end markers using RGN section lengths. + ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) + for i, sub in enumerate(ordered): + if sub.data_end == 0: + sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset + for attr_start, final_end in [ + ("data_ext_polygon_offset", self.ext_poly_length), + ("data_ext_polyline_offset", self.ext_line_length), + ("data_ext_poi_offset", self.ext_poi_length), + ]: + items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)), + key=lambda s: getattr(s, attr_start)) + for i, sub in enumerate(items): + if attr_start == "data_ext_polygon_offset": + setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) + elif attr_start == "data_ext_polyline_offset": + setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) + else: + setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) + + feats: List[Feature] = [] + for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index): + segs = self._segments(sub) + if segs[SEG_POINT]: + feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False)) + if segs[SEG_IPOINT]: + feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True)) + if segs[SEG_EXTPOINT]: + feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT])) + if segs[SEG_POLYLINE]: + feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False)) + if segs[SEG_POLYGON]: + feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False)) + if segs[SEG_EXTPOLYLINE]: + feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True)) + if segs[SEG_EXTPOLYGON]: + feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True)) + return feats + + def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]: + feats: List[Feature] = [] + pos, end = seg + while pos < end and pos + 8 <= len(self.data): + typ = self.data[pos] + info24 = read_u24le(self.data, pos + 1) + has_subtype = bool(info24 & 0x800000) + is_poi = bool(info24 & 0x400000) + lbl_off = info24 & 0x3FFFFF + lon_delta = read_s16le(self.data, pos + 4) + lat_delta = read_s16le(self.data, pos + 6) + pos += 8 + subtype = 0 + if has_subtype and pos < end: + subtype = self.data[pos] + pos += 1 + name = self.lbl.get_label(lbl_off) if lbl_off else "" + lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) + lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) + feats.append(Feature( + geom_type="Point", + coords=[lon, lat], + props={ + "garmin_kind": "indexed_point" if indexed else "point", + "garmin_type": f"0x{typ:02x}", + "garmin_subtype": f"0x{subtype:02x}", + "garmin_is_poi": is_poi, + "name": name, + }, + )) + return feats + + def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]: + feats: List[Feature] = [] + pos, end = seg + while pos < end and pos + 6 <= len(self.data): + typ = self.data[pos] + subtype_raw = self.data[pos + 1] + has_lbl = bool(subtype_raw & 0x20) + subtype = subtype_raw % 32 + full_type = ((typ + 0x100) << 8) + subtype + lon_delta = read_s16le(self.data, pos + 2) + lat_delta = read_s16le(self.data, pos + 4) + pos += 6 + lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 + if has_lbl: + pos += 3 + name = self.lbl.get_label(lbl_off) if lbl_off else "" + lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) + lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) + feats.append(Feature( + geom_type="Point", + coords=[lon, lat], + props={ + "garmin_kind": "extended_point", + "garmin_type": f"0x{full_type:04x}", + "name": name, + }, + )) + return feats + + def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]: + feats: List[Feature] = [] + pos, end = seg + while pos < end: + try: + if not extended: + if pos + 10 > end: + break + info1 = self.data[pos] + pos += 1 + if line: + typ = info1 & 0x3F + direction = bool(info1 & 0x40) + else: + typ = info1 & 0x7F + direction = False + two_byte_len = bool(info1 & 0x80) + info24 = read_u24le(self.data, pos) + pos += 3 + lbl_off = info24 & 0x3FFFFF + extra_bit = 1 if (info24 & 0x400000) else 0 + data_in_net = bool(info24 & 0x800000) + lon_delta = read_s16le(self.data, pos) + lat_delta = read_s16le(self.data, pos + 2) + pos += 4 + bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos] + pos += 2 if two_byte_len else 1 + bitstream_info = self.data[pos] + pos += 1 + long_sign = 0 + lat_sign = 0 + long_extra_bit = extra_bit + lat_extra_bit = 0 + full_type = typ + else: + if pos + 8 > end: + break + typ = self.data[pos] + subtype_raw = self.data[pos + 1] + has_lbl = bool(subtype_raw & 0x20) + subtype = subtype_raw % 32 + full_type = ((typ + 0x100) << 8) + subtype + lon_delta = read_s16le(self.data, pos + 2) + lat_delta = read_s16le(self.data, pos + 4) + pos += 6 + bitstream_len_byte = self.data[pos] + pos += 1 + if bitstream_len_byte % 2 == 0: + if pos >= end: + break + bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1 + pos += 1 + else: + bitstream_len = bitstream_len_byte // 2 - 1 + bitstream_info = self.data[pos] + pos += 1 + direction = False + data_in_net = False + long_sign = 0 + lat_sign = 0 + long_extra_bit = 0 + lat_extra_bit = 0 + reader = BitStreamReader(self.data, pos, bitstream_len) + if reader.read_next_bits(1) != 0: + long_sign = +1 if reader.read_next_bits(1) == 0 else -1 + if reader.read_next_bits(1) != 0: + lat_sign = +1 if reader.read_next_bits(1) == 0 else -1 + if extended: + long_extra_bit = reader.read_next_bits(1) + long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit) + lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit) + cur_lon = lon_delta + cur_lat = lat_delta + pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]] + cur_lon <<= long_extra_bit + cur_lat <<= lat_extra_bit + while reader.has_next(long_bits + lat_bits): + dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit) + dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit) + cur_lon += dlon + cur_lat += dlat + pts.append([ + to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)), + to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)), + ]) + pos = reader.finish() + lbl_off = 0 if extended else lbl_off + if extended: + lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 + if has_lbl: + pos += 3 + name = self.lbl.get_label(lbl_off) if lbl_off else "" + if not line: + if pts and pts[0] != pts[-1]: + pts.append(pts[0]) + feats.append(Feature( + geom_type="Polygon", + coords=[pts], + props={ + "garmin_kind": "extended_polygon" if extended else "polygon", + "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", + "garmin_direction": direction, + "garmin_data_in_net": data_in_net, + "name": name, + }, + )) + else: + feats.append(Feature( + geom_type="LineString", + coords=pts, + props={ + "garmin_kind": "extended_polyline" if extended else "polyline", + "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", + "garmin_direction": direction, + "garmin_data_in_net": data_in_net, + "name": name, + }, + )) + except Exception: + # Stop current segment on malformed data instead of crashing the whole file. + break + return feats + + +# ------------------------- +# Output writers +# ------------------------- + +def feature_to_geojson(f: Feature) -> Dict[str, object]: + props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})} + return { + "type": "Feature", + "geometry": {"type": f.geom_type, "coordinates": f.coords}, + "properties": props, + } + + +def write_geojson(features: List[Feature], path: Path) -> None: + fc = { + "type": "FeatureCollection", + "features": [feature_to_geojson(f) for f in features], + } + path.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding="utf-8") + + +def _osm_escape(v: object) -> str: + return xml_escape(str(v), {'"': '"'}) + + +def write_osm(features: List[Feature], path: Path) -> None: + node_id = -1 + way_id = -1 + lines: List[str] = ['', ''] + + def add_node(lon: float, lat: float, tags: Optional[Dict[str, object]] = None) -> int: + nonlocal node_id + nid = node_id + node_id -= 1 + if tags: + lines.append(f' ') + for k, v in tags.items(): + if v in (None, ""): + continue + lines.append(f' ') + lines.append(' ') + else: + lines.append(f' ') + return nid + + def feature_tags(f: Feature) -> Dict[str, object]: + tags = {} + kind = f.props.get("garmin_kind") + gtype = f.props.get("garmin_type") + if kind: + tags["garmin:kind"] = kind + if gtype: + tags["garmin:type"] = gtype + if f.props.get("garmin_subtype"): + tags["garmin:subtype"] = f.props["garmin_subtype"] + if f.props.get("name"): + tags["name"] = f.props["name"] + return tags + + for f in features: + tags = feature_tags(f) + if f.geom_type == "Point": + lon, lat = f.coords + add_node(lon, lat, tags) + elif f.geom_type in ("LineString", "Polygon"): + coords = f.coords if f.geom_type == "LineString" else f.coords[0] + node_ids = [add_node(lon, lat) for lon, lat in coords] + wid = way_id + way_id -= 1 + lines.append(f' ') + for nid in node_ids: + lines.append(f' ') + if f.geom_type == "Polygon": + tags["area"] = "yes" + for k, v in tags.items(): + lines.append(f' ') + lines.append(' ') + lines.append('') + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +# ------------------------- +# High-level orchestration +# ------------------------- + +def resolve_map_subfiles(files: Dict[str, bytes]) -> Dict[str, bytes]: + names = {k.upper(): v for k, v in files.items()} + # First, classic top-level TRE/RGN/LBL. + classic = {} + for ext in ("TRE", "RGN", "LBL", "NET"): + for k, v in names.items(): + if k.endswith(f".{ext}"): + classic[ext] = v + break + if "TRE" in classic and "RGN" in classic: + return classic + # Then GMP bundle. Many NT maps expose offsets from the GMP header. + gmp = None + for k, v in names.items(): + if k.endswith(".GMP"): + gmp = v + break + if gmp and len(gmp) > 0x29: + try: + tre_of = read_u32le(gmp, 0x19) + rgn_of = read_u32le(gmp, 0x1D) + lbl_of = read_u32le(gmp, 0x21) + net_of = read_u32le(gmp, 0x25) + offsets = [("TRE", tre_of), ("RGN", rgn_of), ("LBL", lbl_of), ("NET", net_of)] + positive = [(n, o) for n, o in offsets if o > 0] + positive_sorted = sorted(positive, key=lambda t: t[1]) + out = {} + for i, (name, start) in enumerate(positive_sorted): + end = positive_sorted[i + 1][1] if i + 1 < len(positive_sorted) else len(gmp) + out[name] = gmp[start:end] + if "TRE" in out and "RGN" in out: + return out + except Exception: + pass + raise RuntimeError("Could not find a usable TRE/RGN pair in IMG container") + + +def load_features_from_img(img_path: Path) -> Tuple[List[Feature], Dict[str, object]]: + raw = img_path.read_bytes() + container = ImgContainer(raw) + subfiles = resolve_map_subfiles(container.files) + tre = TRE(subfiles["TRE"]) + lbl = LBL(subfiles.get("LBL")) + rgn = RGN(subfiles["RGN"], tre=tre, lbl=lbl) + features = rgn.parse_features() + meta = { + "img_file": str(img_path), + "block_size": container.block_size, + "subfiles": sorted(container.files.keys()), + "bounds_garmin": { + "north": tre.north, + "east": tre.east, + "south": tre.south, + "west": tre.west, + }, + "bounds_wgs84": { + "north": to_deg(tre.north), + "east": to_deg(tre.east), + "south": to_deg(tre.south), + "west": to_deg(tre.west), + }, + "levels": {lvl: {"bits_per_coord": li.bits_per_coord, "inherited": li.inherited} for lvl, li in tre.levels.items()}, + "feature_count": len(features), + } + return features, meta + + +def main() -> int: + ap = argparse.ArgumentParser(description="Extract vector features from a Garmin IMG and export GeoJSON / OSM XML.") + ap.add_argument("img", type=Path, help="Input Garmin .img file") + ap.add_argument("--geojson", type=Path, help="Write GeoJSON output") + ap.add_argument("--osm", type=Path, help="Write OSM XML output") + ap.add_argument("--meta-json", type=Path, help="Write parse metadata JSON") + args = ap.parse_args() + + if not args.geojson and not args.osm and not args.meta_json: + ap.error("provide at least one of --geojson, --osm, --meta-json") + + features, meta = load_features_from_img(args.img) + info(f"parsed {len(features)} features") + + if args.geojson: + write_geojson(features, args.geojson) + info(f"wrote GeoJSON: {args.geojson}") + if args.osm: + write_osm(features, args.osm) + info(f"wrote OSM XML: {args.osm}") + if args.meta_json: + args.meta_json.write_text(json.dumps(meta, indent=2), encoding="utf-8") + info(f"wrote metadata: {args.meta_json}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())