diff --git a/garmin_img_to_osmand.py b/garmin_img_to_osmand.py deleted file mode 100644 index faa0881..0000000 --- a/garmin_img_to_osmand.py +++ /dev/null @@ -1,1118 +0,0 @@ -#!/usr/bin/env python3 -""" -Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML. - -What it does well: -- Reads classic Garmin IMG container FAT and extracts subfiles. -- Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where - TRE/RGN/LBL offsets are stored inside the .GMP container. -- Parses TRE levels/subdivisions. -- Parses LBL labels (coding 6, 9, 10) with common codepage handling. -- Parses standard points, extended points, standard polylines/polygons, and - extended polylines/polygons from RGN. -- Exports GeoJSON and/or OSM XML. - -What it does NOT promise: -- Full Garmin NT routing/address semantics. -- Locked/compressed/vendor-obfuscated maps. -- Perfect type-to-OSM semantic translation. The exporter preserves Garmin type - codes as tags instead of inventing OSM semantics. - -This is a practical reverse-engineering tool, not a complete implementation of -all Garmin IMG variants. -""" - -from __future__ import annotations - -import argparse -import io -import json -import math -import sys -from dataclasses import dataclass, field -from pathlib import Path -from typing import Dict, Iterable, Iterator, List, Optional, Tuple -from xml.sax.saxutils import escape as xml_escape - -# ------------------------- -# Low-level helpers -# ------------------------- - -COORD_FACTOR = 360.0 / (1 << 24) -FAT_BLOCK_SIZE = 0x200 -FAT_ENTRY_SIZE = 0x200 -MAX_FAT_BLOCKLIST = 240 -SEG_POINT = 0 -SEG_IPOINT = 1 -SEG_POLYLINE = 2 -SEG_POLYGON = 3 -SEG_EXTPOLYGON = 4 -SEG_EXTPOLYLINE = 5 -SEG_EXTPOINT = 6 - -OBJ_POINT = 0x10 -OBJ_INDEXED_POINT = 0x20 -OBJ_POLYLINE = 0x40 -OBJ_POLYGON = 0x80 -OBJ_EXT_POLYGON = 0x100 -OBJ_EXT_POLYLINE = 0x200 -OBJ_EXT_POINT = 0x400 - - -def warn(msg: str) -> None: - print(f"[warn] {msg}", file=sys.stderr) - - -def info(msg: str) -> None: - print(f"[info] {msg}", file=sys.stderr) - - -def read_u16le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 2], "little", signed=False) - - -def read_s16le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 2], "little", signed=True) - - -def read_u24le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 3], "little", signed=False) - - -def read_s24le(buf: bytes, off: int) -> int: - raw = read_u24le(buf, off) - if raw & 0x800000: - raw -= 1 << 24 - return raw - - -def read_u32le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 4], "little", signed=False) - - -def to_deg(coord: int) -> float: - return coord * COORD_FACTOR - - -def decode_ascii_z(data: bytes) -> str: - return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip() - - -# ------------------------- -# Container extraction -# ------------------------- - -@dataclass -class FatRecord: - filename: str - ext: str - size: int - blocks: List[int] - offset_in_fat: int - - -class ImgContainer: - def __init__(self, raw: bytes): - self.raw = raw - # Some IMG files are XOR'd by a single byte stored at byte 0. - xor_byte = raw[0] - if xor_byte not in (0x00,): - maybe = bytes(b ^ xor_byte for b in raw) - sig = maybe[0x10:0x17] - ident = maybe[0x41:0x48] - if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"): - info(f"applied XOR decode with byte 0x{xor_byte:02x}") - self.raw = maybe - self.block_size = self._read_block_size() - self.fat_start = self._read_fat_start() - self.files = self._extract_subfiles() - - def _read_block_size(self) -> int: - e1 = self.raw[0x61] - e2 = self.raw[0x62] - return 1 << (e1 + e2) - - def _read_fat_start(self) -> int: - fat_phys_block = self.raw[0x40] - return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE - - def _parse_fat_chain(self) -> List[FatRecord]: - records: List[FatRecord] = [] - off = self.fat_start - seen_offsets = set() - while off + FAT_ENTRY_SIZE <= len(self.raw): - if off in seen_offsets: - break - seen_offsets.add(off) - first = self.raw[off] - if first != 0x01: - break - name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00") - ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00") - size = read_u32le(self.raw, off + 12) - next_fat = read_u16le(self.raw, off + 16) - blocks = [] - boff = off + 0x20 - for i in range(MAX_FAT_BLOCKLIST): - blk = read_u16le(self.raw, boff + i * 2) - if blk == 0xFFFF: - break - blocks.append(blk) - if next_fat == 0: - records.append(FatRecord(name, ext, size, blocks, off)) - off += FAT_ENTRY_SIZE - return records - - def _collect_blocks(self, start_record: FatRecord) -> bytes: - data = bytearray() - blocks = list(start_record.blocks) - current_offset = start_record.offset_in_fat - # Follow FAT continuation blocks when next_fat is used. - while True: - next_fat = read_u16le(self.raw, current_offset + 16) - if next_fat == 0: - break - current_offset += FAT_ENTRY_SIZE - if current_offset + FAT_ENTRY_SIZE > len(self.raw): - break - boff = current_offset + 0x20 - for i in range(MAX_FAT_BLOCKLIST): - blk = read_u16le(self.raw, boff + i * 2) - if blk == 0xFFFF: - break - blocks.append(blk) - for blk in blocks: - start = blk * self.block_size - end = start + self.block_size - if end > len(self.raw): - break - data.extend(self.raw[start:end]) - return bytes(data[:start_record.size]) - - def _extract_subfiles(self) -> Dict[str, bytes]: - out: Dict[str, bytes] = {} - for rec in self._parse_fat_chain(): - key = f"{rec.filename}.{rec.ext}".upper() - out[key] = self._collect_blocks(rec) - return out - - -# ------------------------- -# Core format structures -# ------------------------- - -@dataclass -class LevelInfo: - level: int - bits_per_coord: int - inherited: bool - present: bool = True - - -@dataclass -class Subdivision: - index: int - level: int - data_offset: int - object_types: int - lon_center: int - lat_center: int - width: int - height: int - index_next_level: int = 0 - last: bool = False - data_end: int = 0 - data_ext_polygon_offset: int = 0 - data_ext_polygon_end: int = 0 - data_ext_polyline_offset: int = 0 - data_ext_polyline_end: int = 0 - data_ext_poi_offset: int = 0 - data_ext_poi_end: int = 0 - children: List["Subdivision"] = field(default_factory=list) - - def nb_object_types(self) -> int: - count = 0 - cur = 0x10 - for _ in range(4): - if self.object_types & cur: - count += 1 - cur <<= 1 - return count - - -@dataclass -class Feature: - geom_type: str # Point | LineString | Polygon - coords: object - props: Dict[str, object] - - -# ------------------------- -# LBL parser -# ------------------------- - -class LBL: - NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', - 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', - '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] - SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~', - '~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~', - '~', '~', '~', '~', '~', '[', '\\', ']', '^', '_'] - SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', - 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', - '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] - - def __init__(self, data: Optional[bytes]): - self.data = data or b"" - self.ok = bool(data) - self.data_offset = 0 - self.data_length = 0 - self.data_offset_multiplier = 1 - self.label_coding = 6 - self.codepage = 1252 - if self.ok: - self._parse_header() - - def _parse_header(self) -> None: - header_length = read_u16le(self.data, 0) - self.data_offset = read_u32le(self.data, 0x15) - self.data_length = read_u32le(self.data, 0x19) - self.data_offset_multiplier = 1 << self.data[0x1D] - self.label_coding = self.data[0x1E] - if len(self.data) >= 0xAC: - self.codepage = read_u16le(self.data, 0xAA) - - def get_label(self, offset: int) -> str: - if not self.ok or offset == 0: - return "" - actual = self.data_offset + offset * self.data_offset_multiplier - if actual < 0 or actual >= len(self.data): - return "" - if self.label_coding == 6: - return self._get_label6(actual) - return self._get_label8_10(actual) - - def _get_label8_10(self, off: int) -> str: - end = off - while end < len(self.data) and self.data[end] != 0: - end += 1 - raw = self.data[off:end] - enc = None - cp = self.codepage - if cp in (0, 850): - enc = "cp1252" - elif cp == 65001: - enc = "utf-8" - elif cp == 932: - enc = "cp932" - elif cp == 950: - enc = "big5" - else: - enc = f"cp{cp}" - try: - return raw.decode(enc, errors="replace") - except Exception: - return raw.decode("latin1", errors="replace") - - def _get_label6(self, off: int) -> str: - out: List[str] = [] - charset = "NORMAL" - pos = off - while pos + 3 <= len(self.data): - b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2] - pos += 3 - codes = [ - b1 >> 2, - ((b1 & 0x3) << 4) | (b2 >> 4), - ((b2 & 0xF) << 2) | (b3 >> 6), - b3 & 0x3F, - ] - for c in codes: - if c > 0x2F: - return "".join(out).strip() - if charset == "NORMAL": - if c == 0x1C: - charset = "SYMBOL" - elif c == 0x1B: - charset = "SPECIAL" - elif c == 0x1D: - out.append("|") - elif c in (0x1E, 0x1F): - out.append(" ") - else: - out.append(self.NORMAL_CHARS[c]) - elif charset == "SYMBOL": - out.append(self.SYMBOL_CHARS[c]) - charset = "NORMAL" - else: - out.append(self.SPECIAL_CHARS[c]) - charset = "NORMAL" - return "".join(out).strip() - - -# ------------------------- -# TRE parser -# ------------------------- - -class TRE: - def __init__(self, data: bytes): - self.data = data - self.header_length = read_u16le(data, 0) - self.north = read_s24le(data, 0x15) - self.east = read_s24le(data, 0x18) - self.south = read_s24le(data, 0x1B) - self.west = read_s24le(data, 0x1E) - self.levels: Dict[int, LevelInfo] = {} - self.max_level = 0 - self.min_level = 15 - self.extended_types = False - self.extended_types_offset = 0 - self.extended_types_length = 0 - self.extended_types_size = 0 - self.extended_types_number = 0 - self.decalaje_extended_types = 0 - self.subdivisions_count = 1 - self.root_subdivisions: List[Subdivision] = [] - self.subdivisions_by_index: Dict[int, Subdivision] = {} - self._parse() - - def _parse(self) -> None: - self._parse_levels() - self._parse_tre7() - self._parse_subdivisions() - - def _parse_tre7(self) -> None: - if self.header_length >= 0x7C + 10: - self.extended_types_offset = read_u32le(self.data, 0x7C) - self.extended_types_length = read_u32le(self.data, 0x80) - self.extended_types_size = read_u16le(self.data, 0x84) - if self.extended_types_size > 0: - self.extended_types_number = self.extended_types_length // self.extended_types_size - self.extended_types = self.extended_types_length > 0 - self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number - - def _parse_levels(self) -> None: - levels_offset = read_u32le(self.data, 0x21) - levels_length = read_u32le(self.data, 0x25) - pos = levels_offset - end = levels_offset + levels_length - while pos + 4 <= end and pos + 4 <= len(self.data): - zoom = self.data[pos] - bits = self.data[pos + 1] - count = read_u16le(self.data, pos + 2) - _ = count - level = zoom & 0xF - inherited = bool(zoom & 0x80) - self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited) - self.max_level = max(self.max_level, level) - self.min_level = min(self.min_level, level) - self.subdivisions_count += count - pos += 4 - - def get_resolution(self, level: int) -> int: - return self.levels[level].bits_per_coord - - def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int: - shift = 24 - self.get_resolution(level) - additional_accuracy - if shift >= 0: - return value << shift - return value >> (-shift) - - def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]: - data_offset = read_u24le(self.data, pos) - object_types = self.data[pos + 3] - if object_types & 0x0F: - data_offset += (object_types & 0x0F) * (1 << 24) - lon_center = read_s24le(self.data, pos + 4) - lat_center = read_s24le(self.data, pos + 7) - width = read_u16le(self.data, pos + 10) - last = False - if width & 0x8000: - width &= 0x7FFF - last = True - height = read_u16le(self.data, pos + 12) - index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0 - sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types, - lon_center=lon_center, lat_center=lat_center, width=width, height=height, - index_next_level=index_next, last=last) - # Extended offsets per subdivision, if present. - if self.extended_types: - indice = index - self.decalaje_extended_types - if indice > 0 and self.extended_types_size >= 8: - p = self.extended_types_offset + (indice - 1) * self.extended_types_size - if p + self.extended_types_size <= len(self.data): - sub.data_ext_polygon_offset = read_u32le(self.data, p) - if self.extended_types_size >= 8: - sub.data_ext_polyline_offset = read_u32le(self.data, p + 4) - if self.extended_types_size >= 12: - sub.data_ext_poi_offset = read_u32le(self.data, p + 8) - return sub, pos + record_size - - def _parse_subdivisions(self) -> None: - sub_offset = read_u32le(self.data, 0x29) - sub_length = read_u32le(self.data, 0x2D) - end = sub_offset + sub_length - if end > len(self.data): - end = len(self.data) - - present_levels = sorted(self.levels.keys(), reverse=True) - if not present_levels: - return - current_root_level = present_levels[0] - index = 1 - pos = sub_offset - roots: List[Subdivision] = [] - # Parse all 16-byte records first until last root. - while pos + 16 <= end: - sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index) - roots.append(sub) - self.subdivisions_by_index[index] = sub - index += 1 - if sub.last: - break - self.root_subdivisions = roots - # Recursively parse children using the index_next_level scheme. - self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index) - # Compute data ends by sorted data offsets. - ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) - for i, sub in enumerate(ordered): - if i + 1 < len(ordered): - sub.data_end = ordered[i + 1].data_offset - else: - sub.data_end = 0 - # Extended segment ends. - for attr_start, attr_end in [ - ("data_ext_polygon_offset", "data_ext_polygon_end"), - ("data_ext_polyline_offset", "data_ext_polyline_end"), - ("data_ext_poi_offset", "data_ext_poi_end"), - ]: - items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)), - key=lambda s: getattr(s, attr_start)) - for i, sub in enumerate(items): - if i + 1 < len(items): - setattr(sub, attr_end, getattr(items[i + 1], attr_start)) - else: - setattr(sub, attr_end, 0) - - def _next_present_level(self, level: int) -> int: - while level > 0 and level not in self.levels: - level -= 1 - return level - - def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None: - level = self._next_present_level(level) - if level <= 0: - return - for parent in parents: - if parent.index_next_level <= 0: - continue - idx = parent.index_next_level - if idx <= 0: - continue - # Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level. - record_size = 14 if level == self.min_level else 16 - pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14 - # Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed. - if record_size == 14 and pos + 14 > end: - pos = min(end, sub_offset + len(self.root_subdivisions) * 16) - children: List[Subdivision] = [] - while pos + record_size <= end: - try: - sub, pos = self._parse_subdiv_record(pos, level, record_size, idx) - except Exception: - break - children.append(sub) - self.subdivisions_by_index[idx] = sub - idx += 1 - if sub.last: - break - parent.children = children - child_level = self._next_present_level(level - 1) - if child_level > 0 and children: - self._parse_children(children, child_level, sub_offset, end, idx) - - -# ------------------------- -# RGN parser -# ------------------------- - -class BitStreamReader: - def __init__(self, data: bytes, start: int, length_bytes: int): - self.data = data - self.pos = start - self.remaining_bytes = length_bytes - self.remaining_bits = 0 - self.cur_byte = 0 - - def has_next(self, nbits: int) -> bool: - return self.remaining_bytes * 8 + self.remaining_bits >= nbits - - def finish(self) -> int: - self.pos += self.remaining_bytes - self.remaining_bytes = 0 - self.remaining_bits = 0 - return self.pos - - def _get_if_needed(self) -> None: - if self.remaining_bits == 0: - if self.remaining_bytes <= 0: - raise EOFError - self.cur_byte = self.data[self.pos] - self.pos += 1 - self.remaining_bytes -= 1 - self.remaining_bits = 8 - - def read_next_bits(self, to_get: int) -> int: - cur_pos = 0 - result = 0 - while cur_pos < to_get: - self._get_if_needed() - remaining_to_get = to_get - cur_pos - if remaining_to_get >= self.remaining_bits: - result |= self.cur_byte << cur_pos - cur_pos += self.remaining_bits - self.remaining_bits = 0 - else: - mask = (1 << remaining_to_get) - 1 - result |= (self.cur_byte & mask) << cur_pos - self.cur_byte >>= remaining_to_get - self.remaining_bits -= remaining_to_get - return result - return result - - def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int: - if sign == 0: - value = self.read_next_bits(nbits) - sign_mask = 1 << (nbits - 1) - if value & sign_mask: - comp = value ^ sign_mask - if extra_bit == 0: - if comp != 0: - return comp - sign_mask - other = self.read_coord_offset(nbits, sign, extra_bit) - if other < 0: - return 1 - value + other - return value - 1 + other - else: - if comp & 0xFFFFFE: - return (comp & 0xFFFFFE) - sign_mask - other = self.read_coord_offset(nbits - 1, sign, 0) - if other < 0: - return 1 - sign_mask + 1 + (other << 1) - return sign_mask - 1 - 1 + (other << 1) - else: - if extra_bit > 0: - return value & 0xFFFFFE - return value - else: - value = self.read_next_bits(nbits) - if extra_bit > 0: - return (((value >> 1) * sign) << 1) - return value * sign - - -class RGN: - def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]): - self.data = data - self.tre = tre - self.lbl = lbl or LBL(None) - self.header_length = read_u16le(data, 0) - self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0 - self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0 - self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0 - self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0 - self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0 - self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0 - self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0 - self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0 - - def data_end(self) -> int: - return self.data_length - - def ext_polygon_end(self) -> int: - return self.ext_poly_length - - def ext_polyline_end(self) -> int: - return self.ext_line_length - - def ext_poi_end(self) -> int: - return self.ext_poi_length - - @staticmethod - def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int: - add = 0 - if sign == 0: - add += 1 - add += extra_bit - if i <= 9: - return i + 2 + add - return 2 * i - 9 + 2 + add - - def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int: - return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc) - - def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int: - return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc) - - def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]: - result: List[Optional[Tuple[int, int]]] = [None] * 7 - offset = sub.data_offset + self.data_offset - end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset - if sub.object_types == 0: - return result - if sub.data_end and end > len(self.data): - end = len(self.data) - if sub.data_end and end > offset and sub.nb_object_types() > 0: - if sub.object_types & OBJ_POINT: - result[SEG_POINT] = (0, 0) - if sub.object_types & OBJ_INDEXED_POINT: - result[SEG_IPOINT] = (0, 0) - if sub.object_types & OBJ_POLYLINE: - result[SEG_POLYLINE] = (0, 0) - if sub.object_types & OBJ_POLYGON: - result[SEG_POLYGON] = (0, 0) - order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON] - nb_pointers = sub.nb_object_types() - 1 - if offset + nb_pointers * 2 <= len(self.data): - segment_start = offset + nb_pointers * 2 - cur_idx = 0 - p = offset - for _ in range(nb_pointers): - while cur_idx < 4 and result[order[cur_idx]] is None: - cur_idx += 1 - if cur_idx >= 4: - break - segment_end = read_u16le(self.data, p) + offset - p += 2 - if segment_end > end or segment_end <= segment_start: - result[order[cur_idx]] = None - else: - result[order[cur_idx]] = (segment_start, segment_end) - segment_start = segment_end - cur_idx += 1 - while cur_idx < 4 and result[order[cur_idx]] is None: - cur_idx += 1 - if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end: - result[order[cur_idx]] = (segment_start, end) - if sub.data_ext_polygon_offset: - s = self.ext_poly_offset + sub.data_ext_polygon_offset - e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length) - if e > s: - result[SEG_EXTPOLYGON] = (s, e) - if sub.data_ext_polyline_offset: - s = self.ext_line_offset + sub.data_ext_polyline_offset - e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length) - if e > s: - result[SEG_EXTPOLYLINE] = (s, e) - if sub.data_ext_poi_offset: - s = self.ext_poi_offset + sub.data_ext_poi_offset - e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length) - if e > s: - result[SEG_EXTPOINT] = (s, e) - return result - - def parse_features(self) -> List[Feature]: - # Finalize subdivision end markers using RGN section lengths. - ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) - for i, sub in enumerate(ordered): - if sub.data_end == 0: - sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset - for attr_start, final_end in [ - ("data_ext_polygon_offset", self.ext_poly_length), - ("data_ext_polyline_offset", self.ext_line_length), - ("data_ext_poi_offset", self.ext_poi_length), - ]: - items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)), - key=lambda s: getattr(s, attr_start)) - for i, sub in enumerate(items): - if attr_start == "data_ext_polygon_offset": - setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - elif attr_start == "data_ext_polyline_offset": - setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - else: - setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - - feats: List[Feature] = [] - for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index): - segs = self._segments(sub) - if segs[SEG_POINT]: - feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False)) - if segs[SEG_IPOINT]: - feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True)) - if segs[SEG_EXTPOINT]: - feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT])) - if segs[SEG_POLYLINE]: - feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False)) - if segs[SEG_POLYGON]: - feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False)) - if segs[SEG_EXTPOLYLINE]: - feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True)) - if segs[SEG_EXTPOLYGON]: - feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True)) - return feats - - def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end and pos + 8 <= len(self.data): - typ = self.data[pos] - info24 = read_u24le(self.data, pos + 1) - has_subtype = bool(info24 & 0x800000) - is_poi = bool(info24 & 0x400000) - lbl_off = info24 & 0x3FFFFF - lon_delta = read_s16le(self.data, pos + 4) - lat_delta = read_s16le(self.data, pos + 6) - pos += 8 - subtype = 0 - if has_subtype and pos < end: - subtype = self.data[pos] - pos += 1 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ - "garmin_kind": "indexed_point" if indexed else "point", - "garmin_type": f"0x{typ:02x}", - "garmin_subtype": f"0x{subtype:02x}", - "garmin_is_poi": is_poi, - "name": name, - }, - )) - return feats - - def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end and pos + 6 <= len(self.data): - typ = self.data[pos] - subtype_raw = self.data[pos + 1] - has_lbl = bool(subtype_raw & 0x20) - subtype = subtype_raw % 32 - full_type = ((typ + 0x100) << 8) + subtype - lon_delta = read_s16le(self.data, pos + 2) - lat_delta = read_s16le(self.data, pos + 4) - pos += 6 - lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 - if has_lbl: - pos += 3 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ - "garmin_kind": "extended_point", - "garmin_type": f"0x{full_type:04x}", - "name": name, - }, - )) - return feats - - def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end: - try: - if not extended: - if pos + 10 > end: - break - info1 = self.data[pos] - pos += 1 - if line: - typ = info1 & 0x3F - direction = bool(info1 & 0x40) - else: - typ = info1 & 0x7F - direction = False - two_byte_len = bool(info1 & 0x80) - info24 = read_u24le(self.data, pos) - pos += 3 - lbl_off = info24 & 0x3FFFFF - extra_bit = 1 if (info24 & 0x400000) else 0 - data_in_net = bool(info24 & 0x800000) - lon_delta = read_s16le(self.data, pos) - lat_delta = read_s16le(self.data, pos + 2) - pos += 4 - bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos] - pos += 2 if two_byte_len else 1 - bitstream_info = self.data[pos] - pos += 1 - long_sign = 0 - lat_sign = 0 - long_extra_bit = extra_bit - lat_extra_bit = 0 - full_type = typ - else: - if pos + 8 > end: - break - typ = self.data[pos] - subtype_raw = self.data[pos + 1] - has_lbl = bool(subtype_raw & 0x20) - subtype = subtype_raw % 32 - full_type = ((typ + 0x100) << 8) + subtype - lon_delta = read_s16le(self.data, pos + 2) - lat_delta = read_s16le(self.data, pos + 4) - pos += 6 - bitstream_len_byte = self.data[pos] - pos += 1 - if bitstream_len_byte % 2 == 0: - if pos >= end: - break - bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1 - pos += 1 - else: - bitstream_len = bitstream_len_byte // 2 - 1 - bitstream_info = self.data[pos] - pos += 1 - direction = False - data_in_net = False - long_sign = 0 - lat_sign = 0 - long_extra_bit = 0 - lat_extra_bit = 0 - reader = BitStreamReader(self.data, pos, bitstream_len) - if reader.read_next_bits(1) != 0: - long_sign = +1 if reader.read_next_bits(1) == 0 else -1 - if reader.read_next_bits(1) != 0: - lat_sign = +1 if reader.read_next_bits(1) == 0 else -1 - if extended: - long_extra_bit = reader.read_next_bits(1) - long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit) - lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit) - cur_lon = lon_delta - cur_lat = lat_delta - pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]] - cur_lon <<= long_extra_bit - cur_lat <<= lat_extra_bit - while reader.has_next(long_bits + lat_bits): - dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit) - dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit) - cur_lon += dlon - cur_lat += dlat - pts.append([ - to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)), - to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)), - ]) - pos = reader.finish() - lbl_off = 0 if extended else lbl_off - if extended: - lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 - if has_lbl: - pos += 3 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - if not line: - if pts and pts[0] != pts[-1]: - pts.append(pts[0]) - feats.append(Feature( - geom_type="Polygon", - coords=[pts], - props={ - "garmin_kind": "extended_polygon" if extended else "polygon", - "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", - "garmin_direction": direction, - "garmin_data_in_net": data_in_net, - "name": name, - }, - )) - else: - feats.append(Feature( - geom_type="LineString", - coords=pts, - props={ - "garmin_kind": "extended_polyline" if extended else "polyline", - "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", - "garmin_direction": direction, - "garmin_data_in_net": data_in_net, - "name": name, - }, - )) - except Exception: - # Stop current segment on malformed data instead of crashing the whole file. - break - return feats - - -# ------------------------- -# Output writers -# ------------------------- - -def feature_to_geojson(f: Feature) -> Dict[str, object]: - props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})} - return { - "type": "Feature", - "geometry": {"type": f.geom_type, "coordinates": f.coords}, - "properties": props, - } - - -def write_geojson(features: List[Feature], path: Path) -> None: - fc = { - "type": "FeatureCollection", - "features": [feature_to_geojson(f) for f in features], - } - path.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding="utf-8") - - -def _osm_escape(v: object) -> str: - return xml_escape(str(v), {'"': '"'}) - - -def write_osm(features: List[Feature], path: Path) -> None: - node_id = -1 - way_id = -1 - lines: List[str] = ['', ''] - - def add_node(lon: float, lat: float, tags: Optional[Dict[str, object]] = None) -> int: - nonlocal node_id - nid = node_id - node_id -= 1 - if tags: - lines.append(f' ') - for k, v in tags.items(): - if v in (None, ""): - continue - lines.append(f' ') - lines.append(' ') - else: - lines.append(f' ') - return nid - - def feature_tags(f: Feature) -> Dict[str, object]: - tags = {} - kind = f.props.get("garmin_kind") - gtype = f.props.get("garmin_type") - if kind: - tags["garmin:kind"] = kind - if gtype: - tags["garmin:type"] = gtype - if f.props.get("garmin_subtype"): - tags["garmin:subtype"] = f.props["garmin_subtype"] - if f.props.get("name"): - tags["name"] = f.props["name"] - return tags - - for f in features: - tags = feature_tags(f) - if f.geom_type == "Point": - lon, lat = f.coords - add_node(lon, lat, tags) - elif f.geom_type in ("LineString", "Polygon"): - coords = f.coords if f.geom_type == "LineString" else f.coords[0] - node_ids = [add_node(lon, lat) for lon, lat in coords] - wid = way_id - way_id -= 1 - lines.append(f' ') - for nid in node_ids: - lines.append(f' ') - if f.geom_type == "Polygon": - tags["area"] = "yes" - for k, v in tags.items(): - lines.append(f' ') - lines.append(' ') - lines.append('') - path.write_text("\n".join(lines) + "\n", encoding="utf-8") - - -# ------------------------- -# High-level orchestration -# ------------------------- - -def resolve_map_subfiles(files: Dict[str, bytes]) -> Dict[str, bytes]: - names = {k.upper(): v for k, v in files.items()} - # First, classic top-level TRE/RGN/LBL. - classic = {} - for ext in ("TRE", "RGN", "LBL", "NET"): - for k, v in names.items(): - if k.endswith(f".{ext}"): - classic[ext] = v - break - if "TRE" in classic and "RGN" in classic: - return classic - # Then GMP bundle. Many NT maps expose offsets from the GMP header. - gmp = None - for k, v in names.items(): - if k.endswith(".GMP"): - gmp = v - break - if gmp and len(gmp) > 0x29: - try: - tre_of = read_u32le(gmp, 0x19) - rgn_of = read_u32le(gmp, 0x1D) - lbl_of = read_u32le(gmp, 0x21) - net_of = read_u32le(gmp, 0x25) - offsets = [("TRE", tre_of), ("RGN", rgn_of), ("LBL", lbl_of), ("NET", net_of)] - positive = [(n, o) for n, o in offsets if o > 0] - positive_sorted = sorted(positive, key=lambda t: t[1]) - out = {} - for i, (name, start) in enumerate(positive_sorted): - end = positive_sorted[i + 1][1] if i + 1 < len(positive_sorted) else len(gmp) - out[name] = gmp[start:end] - if "TRE" in out and "RGN" in out: - return out - except Exception: - pass - raise RuntimeError("Could not find a usable TRE/RGN pair in IMG container") - - -def load_features_from_img(img_path: Path) -> Tuple[List[Feature], Dict[str, object]]: - raw = img_path.read_bytes() - container = ImgContainer(raw) - subfiles = resolve_map_subfiles(container.files) - tre = TRE(subfiles["TRE"]) - lbl = LBL(subfiles.get("LBL")) - rgn = RGN(subfiles["RGN"], tre=tre, lbl=lbl) - features = rgn.parse_features() - meta = { - "img_file": str(img_path), - "block_size": container.block_size, - "subfiles": sorted(container.files.keys()), - "bounds_garmin": { - "north": tre.north, - "east": tre.east, - "south": tre.south, - "west": tre.west, - }, - "bounds_wgs84": { - "north": to_deg(tre.north), - "east": to_deg(tre.east), - "south": to_deg(tre.south), - "west": to_deg(tre.west), - }, - "levels": {lvl: {"bits_per_coord": li.bits_per_coord, "inherited": li.inherited} for lvl, li in tre.levels.items()}, - "feature_count": len(features), - } - return features, meta - - -def main() -> int: - ap = argparse.ArgumentParser(description="Extract vector features from a Garmin IMG and export GeoJSON / OSM XML.") - ap.add_argument("img", type=Path, help="Input Garmin .img file") - ap.add_argument("--geojson", type=Path, help="Write GeoJSON output") - ap.add_argument("--osm", type=Path, help="Write OSM XML output") - ap.add_argument("--meta-json", type=Path, help="Write parse metadata JSON") - args = ap.parse_args() - - if not args.geojson and not args.osm and not args.meta_json: - ap.error("provide at least one of --geojson, --osm, --meta-json") - - features, meta = load_features_from_img(args.img) - info(f"parsed {len(features)} features") - - if args.geojson: - write_geojson(features, args.geojson) - info(f"wrote GeoJSON: {args.geojson}") - if args.osm: - write_osm(features, args.osm) - info(f"wrote OSM XML: {args.osm}") - if args.meta_json: - args.meta_json.write_text(json.dumps(meta, indent=2), encoding="utf-8") - info(f"wrote metadata: {args.meta_json}") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/garmin_img_to_osmand_v2.py b/garmin_img_to_osmand_v2.py deleted file mode 100644 index 3d9cc65..0000000 --- a/garmin_img_to_osmand_v2.py +++ /dev/null @@ -1,1439 +0,0 @@ -#!/usr/bin/env python3 -""" -Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML. - -What it does well: -- Reads classic Garmin IMG container FAT and extracts subfiles. -- Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where - TRE/RGN/LBL offsets are stored inside the .GMP container. -- Parses TRE levels/subdivisions. -- Parses LBL labels (coding 6, 9, 10) with common codepage handling. -- Parses standard points, extended points, standard polylines/polygons, and - extended polylines/polygons from RGN. -- Exports GeoJSON and/or OSM XML. - -What it does NOT promise: -- Full Garmin NT routing/address semantics. -- Locked/compressed/vendor-obfuscated maps. -- Perfect type-to-OSM semantic translation. The exporter preserves Garmin type - codes as tags instead of inventing OSM semantics. - -This is a practical reverse-engineering tool, not a complete implementation of -all Garmin IMG variants. -""" - -from __future__ import annotations - -import argparse -import io -import json -import math -import sys -import gzip -from collections import Counter, defaultdict -from dataclasses import dataclass, field -from pathlib import Path -from typing import Dict, Iterable, Iterator, List, Optional, Tuple -from xml.sax.saxutils import escape as xml_escape - -# ------------------------- -# Low-level helpers -# ------------------------- - -COORD_FACTOR = 360.0 / (1 << 24) -FAT_BLOCK_SIZE = 0x200 -FAT_ENTRY_SIZE = 0x200 -MAX_FAT_BLOCKLIST = 240 -SEG_POINT = 0 -SEG_IPOINT = 1 -SEG_POLYLINE = 2 -SEG_POLYGON = 3 -SEG_EXTPOLYGON = 4 -SEG_EXTPOLYLINE = 5 -SEG_EXTPOINT = 6 - -OBJ_POINT = 0x10 -OBJ_INDEXED_POINT = 0x20 -OBJ_POLYLINE = 0x40 -OBJ_POLYGON = 0x80 -OBJ_EXT_POLYGON = 0x100 -OBJ_EXT_POLYLINE = 0x200 -OBJ_EXT_POINT = 0x400 - - -def warn(msg: str) -> None: - print(f"[warn] {msg}", file=sys.stderr) - - -def info(msg: str) -> None: - print(f"[info] {msg}", file=sys.stderr) - - -def read_u16le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 2], "little", signed=False) - - -def read_s16le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 2], "little", signed=True) - - -def read_u24le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 3], "little", signed=False) - - -def read_s24le(buf: bytes, off: int) -> int: - raw = read_u24le(buf, off) - if raw & 0x800000: - raw -= 1 << 24 - return raw - - -def read_u32le(buf: bytes, off: int) -> int: - return int.from_bytes(buf[off:off + 4], "little", signed=False) - - -def to_deg(coord: int) -> float: - return coord * COORD_FACTOR - - -def decode_ascii_z(data: bytes) -> str: - return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip() - - -# ------------------------- -# Container extraction -# ------------------------- - -@dataclass -class FatRecord: - filename: str - ext: str - size: int - blocks: List[int] - offset_in_fat: int - - -class ImgContainer: - def __init__(self, raw: bytes): - self.raw = raw - # Some IMG files are XOR'd by a single byte stored at byte 0. - xor_byte = raw[0] - if xor_byte not in (0x00,): - maybe = bytes(b ^ xor_byte for b in raw) - sig = maybe[0x10:0x17] - ident = maybe[0x41:0x48] - if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"): - info(f"applied XOR decode with byte 0x{xor_byte:02x}") - self.raw = maybe - self.block_size = self._read_block_size() - self.fat_start = self._read_fat_start() - self.files = self._extract_subfiles() - - def _read_block_size(self) -> int: - e1 = self.raw[0x61] - e2 = self.raw[0x62] - return 1 << (e1 + e2) - - def _read_fat_start(self) -> int: - fat_phys_block = self.raw[0x40] - return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE - - def _parse_fat_chain(self) -> List[FatRecord]: - records: List[FatRecord] = [] - off = self.fat_start - seen_offsets = set() - while off + FAT_ENTRY_SIZE <= len(self.raw): - if off in seen_offsets: - break - seen_offsets.add(off) - first = self.raw[off] - if first != 0x01: - break - name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00") - ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00") - size = read_u32le(self.raw, off + 12) - next_fat = read_u16le(self.raw, off + 16) - blocks = [] - boff = off + 0x20 - for i in range(MAX_FAT_BLOCKLIST): - blk = read_u16le(self.raw, boff + i * 2) - if blk == 0xFFFF: - break - blocks.append(blk) - if next_fat == 0: - records.append(FatRecord(name, ext, size, blocks, off)) - off += FAT_ENTRY_SIZE - return records - - def _collect_blocks(self, start_record: FatRecord) -> bytes: - data = bytearray() - blocks = list(start_record.blocks) - current_offset = start_record.offset_in_fat - # Follow FAT continuation blocks when next_fat is used. - while True: - next_fat = read_u16le(self.raw, current_offset + 16) - if next_fat == 0: - break - current_offset += FAT_ENTRY_SIZE - if current_offset + FAT_ENTRY_SIZE > len(self.raw): - break - boff = current_offset + 0x20 - for i in range(MAX_FAT_BLOCKLIST): - blk = read_u16le(self.raw, boff + i * 2) - if blk == 0xFFFF: - break - blocks.append(blk) - for blk in blocks: - start = blk * self.block_size - end = start + self.block_size - if end > len(self.raw): - break - data.extend(self.raw[start:end]) - return bytes(data[:start_record.size]) - - def _extract_subfiles(self) -> Dict[str, bytes]: - out: Dict[str, bytes] = {} - for rec in self._parse_fat_chain(): - key = f"{rec.filename}.{rec.ext}".upper() - out[key] = self._collect_blocks(rec) - return out - - -# ------------------------- -# Core format structures -# ------------------------- - -@dataclass -class LevelInfo: - level: int - bits_per_coord: int - inherited: bool - present: bool = True - - -@dataclass -class Subdivision: - index: int - level: int - data_offset: int - object_types: int - lon_center: int - lat_center: int - width: int - height: int - index_next_level: int = 0 - last: bool = False - data_end: int = 0 - data_ext_polygon_offset: int = 0 - data_ext_polygon_end: int = 0 - data_ext_polyline_offset: int = 0 - data_ext_polyline_end: int = 0 - data_ext_poi_offset: int = 0 - data_ext_poi_end: int = 0 - children: List["Subdivision"] = field(default_factory=list) - - def nb_object_types(self) -> int: - count = 0 - cur = 0x10 - for _ in range(4): - if self.object_types & cur: - count += 1 - cur <<= 1 - return count - - -@dataclass -class Feature: - geom_type: str # Point | LineString | Polygon - coords: object - props: Dict[str, object] - - -# ------------------------- -# LBL parser -# ------------------------- - -class LBL: - NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', - 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', - '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] - SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~', - '~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~', - '~', '~', '~', '~', '~', '[', '\\', ']', '^', '_'] - SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', - 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5', - '6', '7', '8', '9', '~', '~', '~', '~', '~', '~'] - - def __init__(self, data: Optional[bytes]): - self.data = data or b"" - self.ok = bool(data) - self.data_offset = 0 - self.data_length = 0 - self.data_offset_multiplier = 1 - self.label_coding = 6 - self.codepage = 1252 - if self.ok: - self._parse_header() - - def _parse_header(self) -> None: - header_length = read_u16le(self.data, 0) - self.data_offset = read_u32le(self.data, 0x15) - self.data_length = read_u32le(self.data, 0x19) - self.data_offset_multiplier = 1 << self.data[0x1D] - self.label_coding = self.data[0x1E] - if len(self.data) >= 0xAC: - self.codepage = read_u16le(self.data, 0xAA) - - def get_label(self, offset: int) -> str: - if not self.ok or offset == 0: - return "" - actual = self.data_offset + offset * self.data_offset_multiplier - if actual < 0 or actual >= len(self.data): - return "" - if self.label_coding == 6: - return self._get_label6(actual) - return self._get_label8_10(actual) - - def _get_label8_10(self, off: int) -> str: - end = off - while end < len(self.data) and self.data[end] != 0: - end += 1 - raw = self.data[off:end] - enc = None - cp = self.codepage - if cp in (0, 850): - enc = "cp1252" - elif cp == 65001: - enc = "utf-8" - elif cp == 932: - enc = "cp932" - elif cp == 950: - enc = "big5" - else: - enc = f"cp{cp}" - try: - return raw.decode(enc, errors="replace") - except Exception: - return raw.decode("latin1", errors="replace") - - def _get_label6(self, off: int) -> str: - out: List[str] = [] - charset = "NORMAL" - pos = off - while pos + 3 <= len(self.data): - b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2] - pos += 3 - codes = [ - b1 >> 2, - ((b1 & 0x3) << 4) | (b2 >> 4), - ((b2 & 0xF) << 2) | (b3 >> 6), - b3 & 0x3F, - ] - for c in codes: - if c > 0x2F: - return "".join(out).strip() - if charset == "NORMAL": - if c == 0x1C: - charset = "SYMBOL" - elif c == 0x1B: - charset = "SPECIAL" - elif c == 0x1D: - out.append("|") - elif c in (0x1E, 0x1F): - out.append(" ") - else: - out.append(self.NORMAL_CHARS[c]) - elif charset == "SYMBOL": - out.append(self.SYMBOL_CHARS[c]) - charset = "NORMAL" - else: - out.append(self.SPECIAL_CHARS[c]) - charset = "NORMAL" - return "".join(out).strip() - - -# ------------------------- -# TRE parser -# ------------------------- - -class TRE: - def __init__(self, data: bytes): - self.data = data - self.header_length = read_u16le(data, 0) - self.north = read_s24le(data, 0x15) - self.east = read_s24le(data, 0x18) - self.south = read_s24le(data, 0x1B) - self.west = read_s24le(data, 0x1E) - self.levels: Dict[int, LevelInfo] = {} - self.max_level = 0 - self.min_level = 15 - self.extended_types = False - self.extended_types_offset = 0 - self.extended_types_length = 0 - self.extended_types_size = 0 - self.extended_types_number = 0 - self.decalaje_extended_types = 0 - self.subdivisions_count = 1 - self.root_subdivisions: List[Subdivision] = [] - self.subdivisions_by_index: Dict[int, Subdivision] = {} - self._parse() - - def _parse(self) -> None: - self._parse_levels() - self._parse_tre7() - self._parse_subdivisions() - - def _parse_tre7(self) -> None: - if self.header_length >= 0x7C + 10: - self.extended_types_offset = read_u32le(self.data, 0x7C) - self.extended_types_length = read_u32le(self.data, 0x80) - self.extended_types_size = read_u16le(self.data, 0x84) - if self.extended_types_size > 0: - self.extended_types_number = self.extended_types_length // self.extended_types_size - self.extended_types = self.extended_types_length > 0 - self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number - - def _parse_levels(self) -> None: - levels_offset = read_u32le(self.data, 0x21) - levels_length = read_u32le(self.data, 0x25) - pos = levels_offset - end = levels_offset + levels_length - while pos + 4 <= end and pos + 4 <= len(self.data): - zoom = self.data[pos] - bits = self.data[pos + 1] - count = read_u16le(self.data, pos + 2) - _ = count - level = zoom & 0xF - inherited = bool(zoom & 0x80) - self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited) - self.max_level = max(self.max_level, level) - self.min_level = min(self.min_level, level) - self.subdivisions_count += count - pos += 4 - - def get_resolution(self, level: int) -> int: - return self.levels[level].bits_per_coord - - def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int: - shift = 24 - self.get_resolution(level) - additional_accuracy - if shift >= 0: - return value << shift - return value >> (-shift) - - def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]: - data_offset = read_u24le(self.data, pos) - object_types = self.data[pos + 3] - if object_types & 0x0F: - data_offset += (object_types & 0x0F) * (1 << 24) - lon_center = read_s24le(self.data, pos + 4) - lat_center = read_s24le(self.data, pos + 7) - width = read_u16le(self.data, pos + 10) - last = False - if width & 0x8000: - width &= 0x7FFF - last = True - height = read_u16le(self.data, pos + 12) - index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0 - sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types, - lon_center=lon_center, lat_center=lat_center, width=width, height=height, - index_next_level=index_next, last=last) - # Extended offsets per subdivision, if present. - if self.extended_types: - indice = index - self.decalaje_extended_types - if indice > 0 and self.extended_types_size >= 8: - p = self.extended_types_offset + (indice - 1) * self.extended_types_size - if p + self.extended_types_size <= len(self.data): - sub.data_ext_polygon_offset = read_u32le(self.data, p) - if self.extended_types_size >= 8: - sub.data_ext_polyline_offset = read_u32le(self.data, p + 4) - if self.extended_types_size >= 12: - sub.data_ext_poi_offset = read_u32le(self.data, p + 8) - return sub, pos + record_size - - def _parse_subdivisions(self) -> None: - sub_offset = read_u32le(self.data, 0x29) - sub_length = read_u32le(self.data, 0x2D) - end = sub_offset + sub_length - if end > len(self.data): - end = len(self.data) - - present_levels = sorted(self.levels.keys(), reverse=True) - if not present_levels: - return - current_root_level = present_levels[0] - index = 1 - pos = sub_offset - roots: List[Subdivision] = [] - # Parse all 16-byte records first until last root. - while pos + 16 <= end: - sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index) - roots.append(sub) - self.subdivisions_by_index[index] = sub - index += 1 - if sub.last: - break - self.root_subdivisions = roots - # Recursively parse children using the index_next_level scheme. - self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index) - # Compute data ends by sorted data offsets. - ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) - for i, sub in enumerate(ordered): - if i + 1 < len(ordered): - sub.data_end = ordered[i + 1].data_offset - else: - sub.data_end = 0 - # Extended segment ends. - for attr_start, attr_end in [ - ("data_ext_polygon_offset", "data_ext_polygon_end"), - ("data_ext_polyline_offset", "data_ext_polyline_end"), - ("data_ext_poi_offset", "data_ext_poi_end"), - ]: - items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)), - key=lambda s: getattr(s, attr_start)) - for i, sub in enumerate(items): - if i + 1 < len(items): - setattr(sub, attr_end, getattr(items[i + 1], attr_start)) - else: - setattr(sub, attr_end, 0) - - def _next_present_level(self, level: int) -> int: - while level > 0 and level not in self.levels: - level -= 1 - return level - - def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None: - level = self._next_present_level(level) - if level <= 0: - return - for parent in parents: - if parent.index_next_level <= 0: - continue - idx = parent.index_next_level - if idx <= 0: - continue - # Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level. - record_size = 14 if level == self.min_level else 16 - pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14 - # Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed. - if record_size == 14 and pos + 14 > end: - pos = min(end, sub_offset + len(self.root_subdivisions) * 16) - children: List[Subdivision] = [] - while pos + record_size <= end: - try: - sub, pos = self._parse_subdiv_record(pos, level, record_size, idx) - except Exception: - break - children.append(sub) - self.subdivisions_by_index[idx] = sub - idx += 1 - if sub.last: - break - parent.children = children - child_level = self._next_present_level(level - 1) - if child_level > 0 and children: - self._parse_children(children, child_level, sub_offset, end, idx) - - -# ------------------------- -# RGN parser -# ------------------------- - -class BitStreamReader: - def __init__(self, data: bytes, start: int, length_bytes: int): - self.data = data - self.pos = start - self.remaining_bytes = length_bytes - self.remaining_bits = 0 - self.cur_byte = 0 - - def has_next(self, nbits: int) -> bool: - return self.remaining_bytes * 8 + self.remaining_bits >= nbits - - def finish(self) -> int: - self.pos += self.remaining_bytes - self.remaining_bytes = 0 - self.remaining_bits = 0 - return self.pos - - def _get_if_needed(self) -> None: - if self.remaining_bits == 0: - if self.remaining_bytes <= 0: - raise EOFError - self.cur_byte = self.data[self.pos] - self.pos += 1 - self.remaining_bytes -= 1 - self.remaining_bits = 8 - - def read_next_bits(self, to_get: int) -> int: - cur_pos = 0 - result = 0 - while cur_pos < to_get: - self._get_if_needed() - remaining_to_get = to_get - cur_pos - if remaining_to_get >= self.remaining_bits: - result |= self.cur_byte << cur_pos - cur_pos += self.remaining_bits - self.remaining_bits = 0 - else: - mask = (1 << remaining_to_get) - 1 - result |= (self.cur_byte & mask) << cur_pos - self.cur_byte >>= remaining_to_get - self.remaining_bits -= remaining_to_get - return result - return result - - def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int: - if sign == 0: - value = self.read_next_bits(nbits) - sign_mask = 1 << (nbits - 1) - if value & sign_mask: - comp = value ^ sign_mask - if extra_bit == 0: - if comp != 0: - return comp - sign_mask - other = self.read_coord_offset(nbits, sign, extra_bit) - if other < 0: - return 1 - value + other - return value - 1 + other - else: - if comp & 0xFFFFFE: - return (comp & 0xFFFFFE) - sign_mask - other = self.read_coord_offset(nbits - 1, sign, 0) - if other < 0: - return 1 - sign_mask + 1 + (other << 1) - return sign_mask - 1 - 1 + (other << 1) - else: - if extra_bit > 0: - return value & 0xFFFFFE - return value - else: - value = self.read_next_bits(nbits) - if extra_bit > 0: - return (((value >> 1) * sign) << 1) - return value * sign - - -class RGN: - def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]): - self.data = data - self.tre = tre - self.lbl = lbl or LBL(None) - self.header_length = read_u16le(data, 0) - self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0 - self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0 - self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0 - self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0 - self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0 - self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0 - self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0 - self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0 - - def data_end(self) -> int: - return self.data_length - - def ext_polygon_end(self) -> int: - return self.ext_poly_length - - def ext_polyline_end(self) -> int: - return self.ext_line_length - - def ext_poi_end(self) -> int: - return self.ext_poi_length - - @staticmethod - def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int: - add = 0 - if sign == 0: - add += 1 - add += extra_bit - if i <= 9: - return i + 2 + add - return 2 * i - 9 + 2 + add - - def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int: - return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc) - - def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int: - return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc) - - def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]: - result: List[Optional[Tuple[int, int]]] = [None] * 7 - offset = sub.data_offset + self.data_offset - end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset - if sub.object_types == 0: - return result - if sub.data_end and end > len(self.data): - end = len(self.data) - if sub.data_end and end > offset and sub.nb_object_types() > 0: - if sub.object_types & OBJ_POINT: - result[SEG_POINT] = (0, 0) - if sub.object_types & OBJ_INDEXED_POINT: - result[SEG_IPOINT] = (0, 0) - if sub.object_types & OBJ_POLYLINE: - result[SEG_POLYLINE] = (0, 0) - if sub.object_types & OBJ_POLYGON: - result[SEG_POLYGON] = (0, 0) - order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON] - nb_pointers = sub.nb_object_types() - 1 - if offset + nb_pointers * 2 <= len(self.data): - segment_start = offset + nb_pointers * 2 - cur_idx = 0 - p = offset - for _ in range(nb_pointers): - while cur_idx < 4 and result[order[cur_idx]] is None: - cur_idx += 1 - if cur_idx >= 4: - break - segment_end = read_u16le(self.data, p) + offset - p += 2 - if segment_end > end or segment_end <= segment_start: - result[order[cur_idx]] = None - else: - result[order[cur_idx]] = (segment_start, segment_end) - segment_start = segment_end - cur_idx += 1 - while cur_idx < 4 and result[order[cur_idx]] is None: - cur_idx += 1 - if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end: - result[order[cur_idx]] = (segment_start, end) - if sub.data_ext_polygon_offset: - s = self.ext_poly_offset + sub.data_ext_polygon_offset - e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length) - if e > s: - result[SEG_EXTPOLYGON] = (s, e) - if sub.data_ext_polyline_offset: - s = self.ext_line_offset + sub.data_ext_polyline_offset - e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length) - if e > s: - result[SEG_EXTPOLYLINE] = (s, e) - if sub.data_ext_poi_offset: - s = self.ext_poi_offset + sub.data_ext_poi_offset - e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length) - if e > s: - result[SEG_EXTPOINT] = (s, e) - return result - - def parse_features(self) -> List[Feature]: - # Finalize subdivision end markers using RGN section lengths. - ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index)) - for i, sub in enumerate(ordered): - if sub.data_end == 0: - sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset - for attr_start, final_end in [ - ("data_ext_polygon_offset", self.ext_poly_length), - ("data_ext_polyline_offset", self.ext_line_length), - ("data_ext_poi_offset", self.ext_poi_length), - ]: - items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)), - key=lambda s: getattr(s, attr_start)) - for i, sub in enumerate(items): - if attr_start == "data_ext_polygon_offset": - setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - elif attr_start == "data_ext_polyline_offset": - setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - else: - setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start)) - - feats: List[Feature] = [] - for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index): - segs = self._segments(sub) - if segs[SEG_POINT]: - feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False)) - if segs[SEG_IPOINT]: - feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True)) - if segs[SEG_EXTPOINT]: - feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT])) - if segs[SEG_POLYLINE]: - feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False)) - if segs[SEG_POLYGON]: - feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False)) - if segs[SEG_EXTPOLYLINE]: - feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True)) - if segs[SEG_EXTPOLYGON]: - feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True)) - return feats - - def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end and pos + 8 <= len(self.data): - typ = self.data[pos] - info24 = read_u24le(self.data, pos + 1) - has_subtype = bool(info24 & 0x800000) - is_poi = bool(info24 & 0x400000) - lbl_off = info24 & 0x3FFFFF - lon_delta = read_s16le(self.data, pos + 4) - lat_delta = read_s16le(self.data, pos + 6) - pos += 8 - subtype = 0 - if has_subtype and pos < end: - subtype = self.data[pos] - pos += 1 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ - "garmin_kind": "indexed_point" if indexed else "point", - "garmin_type": f"0x{typ:02x}", - "garmin_subtype": f"0x{subtype:02x}", - "garmin_is_poi": is_poi, - "name": name, - }, - )) - return feats - - def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end and pos + 6 <= len(self.data): - typ = self.data[pos] - subtype_raw = self.data[pos + 1] - has_lbl = bool(subtype_raw & 0x20) - subtype = subtype_raw % 32 - full_type = ((typ + 0x100) << 8) + subtype - lon_delta = read_s16le(self.data, pos + 2) - lat_delta = read_s16le(self.data, pos + 4) - pos += 6 - lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 - if has_lbl: - pos += 3 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - lon = to_deg(self._subdiv_lon(sub, lon_delta, 0)) - lat = to_deg(self._subdiv_lat(sub, lat_delta, 0)) - feats.append(Feature( - geom_type="Point", - coords=[lon, lat], - props={ - "garmin_kind": "extended_point", - "garmin_type": f"0x{full_type:04x}", - "name": name, - }, - )) - return feats - - def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]: - feats: List[Feature] = [] - pos, end = seg - while pos < end: - try: - if not extended: - if pos + 10 > end: - break - info1 = self.data[pos] - pos += 1 - if line: - typ = info1 & 0x3F - direction = bool(info1 & 0x40) - else: - typ = info1 & 0x7F - direction = False - two_byte_len = bool(info1 & 0x80) - info24 = read_u24le(self.data, pos) - pos += 3 - lbl_off = info24 & 0x3FFFFF - extra_bit = 1 if (info24 & 0x400000) else 0 - data_in_net = bool(info24 & 0x800000) - lon_delta = read_s16le(self.data, pos) - lat_delta = read_s16le(self.data, pos + 2) - pos += 4 - bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos] - pos += 2 if two_byte_len else 1 - bitstream_info = self.data[pos] - pos += 1 - long_sign = 0 - lat_sign = 0 - long_extra_bit = extra_bit - lat_extra_bit = 0 - full_type = typ - else: - if pos + 8 > end: - break - typ = self.data[pos] - subtype_raw = self.data[pos + 1] - has_lbl = bool(subtype_raw & 0x20) - subtype = subtype_raw % 32 - full_type = ((typ + 0x100) << 8) + subtype - lon_delta = read_s16le(self.data, pos + 2) - lat_delta = read_s16le(self.data, pos + 4) - pos += 6 - bitstream_len_byte = self.data[pos] - pos += 1 - if bitstream_len_byte % 2 == 0: - if pos >= end: - break - bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1 - pos += 1 - else: - bitstream_len = bitstream_len_byte // 2 - 1 - bitstream_info = self.data[pos] - pos += 1 - direction = False - data_in_net = False - long_sign = 0 - lat_sign = 0 - long_extra_bit = 0 - lat_extra_bit = 0 - reader = BitStreamReader(self.data, pos, bitstream_len) - if reader.read_next_bits(1) != 0: - long_sign = +1 if reader.read_next_bits(1) == 0 else -1 - if reader.read_next_bits(1) != 0: - lat_sign = +1 if reader.read_next_bits(1) == 0 else -1 - if extended: - long_extra_bit = reader.read_next_bits(1) - long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit) - lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit) - cur_lon = lon_delta - cur_lat = lat_delta - pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]] - cur_lon <<= long_extra_bit - cur_lat <<= lat_extra_bit - while reader.has_next(long_bits + lat_bits): - dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit) - dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit) - cur_lon += dlon - cur_lat += dlat - pts.append([ - to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)), - to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)), - ]) - pos = reader.finish() - lbl_off = 0 if extended else lbl_off - if extended: - lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0 - if has_lbl: - pos += 3 - name = self.lbl.get_label(lbl_off) if lbl_off else "" - if not line: - if pts and pts[0] != pts[-1]: - pts.append(pts[0]) - feats.append(Feature( - geom_type="Polygon", - coords=[pts], - props={ - "garmin_kind": "extended_polygon" if extended else "polygon", - "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", - "garmin_direction": direction, - "garmin_data_in_net": data_in_net, - "name": name, - }, - )) - else: - feats.append(Feature( - geom_type="LineString", - coords=pts, - props={ - "garmin_kind": "extended_polyline" if extended else "polyline", - "garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}", - "garmin_direction": direction, - "garmin_data_in_net": data_in_net, - "name": name, - }, - )) - except Exception: - # Stop current segment on malformed data instead of crashing the whole file. - break - return feats - - -# ------------------------- -# Output writers and semantic mapping -# ------------------------- - -def feature_to_geojson(f: Feature) -> Dict[str, object]: - props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})} - return { - "type": "Feature", - "geometry": {"type": f.geom_type, "coordinates": f.coords}, - "properties": props, - } - - -def _osm_escape(v: object) -> str: - return xml_escape(str(v), {'"': '"'}) - - -def _maybe_open_text(path: Path): - if str(path).lower().endswith('.gz'): - return gzip.open(path, 'wt', encoding='utf-8', newline='\n') - return open(path, 'w', encoding='utf-8', newline='\n') - - -def _parse_bbox(text: Optional[str]) -> Optional[Tuple[float, float, float, float]]: - if not text: - return None - parts = [p.strip() for p in text.split(',')] - if len(parts) != 4: - raise ValueError('bbox must be west,south,east,north') - west, south, east, north = map(float, parts) - if west > east or south > north: - raise ValueError('invalid bbox ordering') - return west, south, east, north - - -def _feature_bounds(f: Feature) -> Tuple[float, float, float, float]: - if f.geom_type == 'Point': - lon, lat = f.coords - return lon, lat, lon, lat - if f.geom_type == 'LineString': - pts = f.coords - else: - pts = f.coords[0] - xs = [p[0] for p in pts] - ys = [p[1] for p in pts] - return min(xs), min(ys), max(xs), max(ys) - - -def _intersects_bbox(f: Feature, bbox: Optional[Tuple[float, float, float, float]]) -> bool: - if bbox is None: - return True - west, south, east, north = bbox - a_w, a_s, a_e, a_n = _feature_bounds(f) - return not (a_w > east or a_e < west or a_s > north or a_n < south) - - -def _all_mapsets(files: Dict[str, bytes]) -> Dict[str, Dict[str, bytes]]: - groups: Dict[str, Dict[str, bytes]] = defaultdict(dict) - for key, data in files.items(): - if '.' not in key: - continue - base, ext = key.rsplit('.', 1) - groups[base.upper()][ext.upper()] = data - out: Dict[str, Dict[str, bytes]] = {} - for base, subs in groups.items(): - if 'TRE' in subs and 'RGN' in subs: - out[base] = subs - return dict(sorted(out.items())) - - -# Default semantic mapping. These are based on common Garmin/mkgmap conventions, -# plus a few heuristics for map labels commonly found in topographic IMG files. -LINE_TAGS: Dict[str, Dict[str, str]] = { - '0x01': {'highway': 'motorway'}, - '0x02': {'highway': 'primary'}, - '0x03': {'highway': 'secondary'}, - '0x04': {'highway': 'tertiary'}, - '0x05': {'highway': 'unclassified'}, - '0x06': {'highway': 'residential'}, - '0x07': {'highway': 'service'}, - '0x08': {'highway': 'construction'}, - '0x09': {'highway': 'road'}, - '0x0a': {'highway': 'track', 'surface': 'unpaved'}, - '0x0c': {'highway': 'road', 'junction': 'roundabout'}, - '0x0d': {'highway': 'path'}, - '0x0e': {'highway': 'track', 'tracktype': 'grade1'}, - '0x0f': {'highway': 'track', 'tracktype': 'grade2'}, - '0x10': {'highway': 'track', 'tracktype': 'grade3'}, - '0x11': {'highway': 'track', 'tracktype': 'grade4'}, - '0x12': {'highway': 'track', 'tracktype': 'grade5'}, - '0x13': {'highway': 'steps'}, - '0x14': {'railway': 'rail'}, - '0x15': {'natural': 'coastline'}, - '0x16': {'highway': 'cycleway'}, - '0x17': {'highway': 'bridleway'}, - '0x18': {'waterway': 'stream'}, - '0x1a': {'route': 'ferry'}, - '0x1f': {'waterway': 'river'}, - '0x27': {'aeroway': 'runway'}, - '0x28': {'man_made': 'pipeline'}, - '0x29': {'power': 'line'}, - '0x31': {'natural': 'cliff'}, - '0x32': {'barrier': 'wall'}, - '0x33': {'barrier': 'fence'}, - '0x34': {'barrier': 'hedge'}, - '0x38': {'aerialway': 'cable_car'}, - '0x39': {'railway': 'tram'}, -} - -POLYGON_TAGS: Dict[str, Dict[str, str]] = { - '0x03': {'landuse': 'residential'}, - '0x05': {'amenity': 'parking'}, - '0x09': {'leisure': 'marina'}, - '0x0b': {'amenity': 'hospital'}, - '0x0c': {'landuse': 'industrial'}, - '0x14': {'natural': 'heath'}, - '0x15': {'natural': 'wood'}, - '0x16': {'leisure': 'nature_reserve'}, - '0x17': {'leisure': 'park'}, - '0x18': {'leisure': 'golf_course'}, - '0x19': {'leisure': 'sports_centre'}, - '0x1a': {'landuse': 'cemetery'}, - '0x2a': {'landuse': 'farmland'}, - '0x2b': {'landuse': 'farmyard'}, - '0x2c': {'landuse': 'vineyard'}, - '0x2d': {'landuse': 'quarry'}, - '0x2e': {'tourism': 'camp_site'}, - '0x32': {'natural': 'water', 'water': 'sea'}, - '0x35': {'landuse': 'meadow'}, - '0x3c': {'natural': 'water'}, - '0x3d': {'natural': 'beach'}, - '0x3e': {'natural': 'water'}, - '0x3f': {'landuse': 'reservoir'}, - '0x40': {'natural': 'water'}, - '0x41': {'natural': 'water'}, - '0x46': {'waterway': 'riverbank'}, - '0x4c': {'natural': 'water', 'intermittent': 'yes'}, - '0x4d': {'natural': 'glacier'}, - '0x4e': {'landuse': 'orchard'}, - '0x4f': {'natural': 'scrub'}, - '0x50': {'natural': 'wood'}, - '0x51': {'natural': 'wetland'}, - '0x52': {'natural': 'heath'}, # heuristic: Garmin default "Tundra" - '0x53': {'natural': 'bare_rock'}, # heuristic: Garmin default "Flat" -} - -POINT_TAGS: Dict[Tuple[str, Optional[str]], Dict[str, str]] = { - ('0x04', '0x00'): {'place': 'city'}, - ('0x08', '0x00'): {'place': 'town'}, - ('0x0a', '0x00'): {'place': 'suburb'}, - ('0x0b', '0x00'): {'place': 'village'}, - ('0x0d', '0x00'): {'place': 'village'}, # heuristic for this sample topo IMG - ('0x11', '0x00'): {'place': 'hamlet'}, - ('0x28', '0x00'): {'place': 'locality'}, # heuristic: local named spot labels in sample - ('0x64', '0x03'): {'amenity': 'grave_yard'}, - ('0x64', '0x06'): {'highway': 'crossing'}, - ('0x64', '0x11'): {'man_made': 'tower'}, - ('0x64', '0x14'): {'amenity': 'drinking_water'}, - ('0x64', '0x17'): {'amenity': 'hunting_stand'}, - ('0x64', '0x18'): {'amenity': 'grit_bin'}, - ('0x65', '0x0a'): {'natural': 'glacier'}, - ('0x65', '0x0c'): {'place': 'island'}, - ('0x65', '0x11'): {'natural': 'spring'}, - ('0x66', '0x04'): {'natural': 'beach'}, - ('0x66', '0x07'): {'natural': 'cliff'}, - ('0x66', '0x0e'): {'natural': 'volcano'}, - ('0x66', '0x16'): {'natural': 'peak'}, - ('0x66', '0x19'): {'natural': 'cave_entrance'}, -} - - -def _parse_ele_from_name(name: str) -> Optional[str]: - if not name: - return None - t = name.strip().replace(',', '.') - if not t: - return None - try: - v = float(t) - except ValueError: - return None - if abs(v) < 20000: - if v.is_integer(): - return str(int(v)) - return str(v) - return None - - -def semantic_tags_for_feature(f: Feature) -> Dict[str, str]: - kind = f.props.get('garmin_kind', '') - gtype = f.props.get('garmin_type') - subtype = f.props.get('garmin_subtype') - name = f.props.get('name') or '' - sem: Dict[str, str] = {} - - if kind in ('polyline', 'extended_polyline'): - if gtype in ('0x20', '0x21', '0x22'): - sem['contour'] = 'elevation' - sem['contour_ext'] = { - '0x20': 'elevation_minor', - '0x21': 'elevation_medium', - '0x22': 'elevation_major', - }[gtype] - ele = _parse_ele_from_name(name) - if ele is not None: - sem['ele'] = ele - elif gtype in LINE_TAGS: - sem.update(LINE_TAGS[gtype]) - elif kind == 'extended_polyline': - # Fallback heuristic for common topo extended trail/path style objects. - if gtype in ('0x10e11', '0x10e12', '0x10e13', '0x10e14', '0x10e1c', '0x10e1d', '0x10e1f', - '0x10f12', '0x10f14', '0x10f16'): - sem['highway'] = 'path' - elif kind in ('polygon', 'extended_polygon'): - if gtype in POLYGON_TAGS: - sem.update(POLYGON_TAGS[gtype]) - elif kind in ('point', 'indexed_point', 'extended_point'): - key = (gtype, subtype) - if key in POINT_TAGS: - sem.update(POINT_TAGS[key]) - elif gtype == '0x66' and subtype == '0x18': - sem['natural'] = 'hill' # heuristic fallback - elif gtype == '0x65' and subtype == '0x00' and name: - sem['place'] = 'locality' - elif gtype == '0x66' and name: - sem['place'] = 'locality' - - if name: - sem['name'] = name - return sem - - -def tags_for_feature(f: Feature, semantic: bool = True) -> Dict[str, str]: - tags: Dict[str, str] = {} - if semantic: - tags.update(semantic_tags_for_feature(f)) - kind = f.props.get('garmin_kind') - gtype = f.props.get('garmin_type') - if kind: - tags['garmin:kind'] = str(kind) - if gtype: - tags['garmin:type'] = str(gtype) - if f.props.get('garmin_subtype'): - tags['garmin:subtype'] = str(f.props['garmin_subtype']) - if f.props.get('garmin_is_poi'): - tags['garmin:is_poi'] = 'yes' - return tags - - -def _is_useful_feature(tags: Dict[str, str]) -> bool: - # Keep only features with at least one semantic tag or a name. - for k in tags: - if not k.startswith('garmin:'): - return True - return 'name' in tags - - -def _node_key(lon: float, lat: float) -> Tuple[int, int]: - # Quantized key for shared way node reuse. - return (int(round(lon * 1e7)), int(round(lat * 1e7))) - - -def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple[List[Feature], Dict[str, object]]: - tre = TRE(subfiles['TRE']) - lbl = LBL(subfiles.get('LBL')) - rgn = RGN(subfiles['RGN'], tre=tre, lbl=lbl) - features = rgn.parse_features() - meta = { - 'mapset': mapset_name, - 'bounds_wgs84': { - 'north': to_deg(tre.north), - 'east': to_deg(tre.east), - 'south': to_deg(tre.south), - 'west': to_deg(tre.west), - }, - 'feature_count': len(features), - 'levels': {lvl: {'bits_per_coord': li.bits_per_coord, 'inherited': li.inherited} for lvl, li in tre.levels.items()}, - } - return features, meta - - -def collect_type_stats(features: Iterable[Feature]) -> Dict[str, object]: - by_kind = Counter() - by_type = Counter() - by_type_sub = Counter() - for f in features: - kind = f.props.get('garmin_kind') or 'unknown' - typ = f.props.get('garmin_type') or 'unknown' - sub = f.props.get('garmin_subtype') or '' - by_kind[kind] += 1 - by_type[f'{kind}:{typ}'] += 1 - if sub: - by_type_sub[f'{kind}:{typ}:{sub}'] += 1 - return { - 'by_kind': dict(by_kind.most_common()), - 'by_type': dict(by_type.most_common()), - 'by_type_subtype': dict(by_type_sub.most_common()), - } - - -def write_geojson(features: List[Feature], path: Path) -> None: - if str(path).lower().endswith('.gz'): - with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh: - json.dump({ - 'type': 'FeatureCollection', - 'features': [feature_to_geojson(f) for f in features], - }, fh, ensure_ascii=False) - else: - path.write_text(json.dumps({ - 'type': 'FeatureCollection', - 'features': [feature_to_geojson(f) for f in features], - }, ensure_ascii=False, indent=2), encoding='utf-8') - - -def _serialize_osm_chunk(fh, features: List[Feature], node_id: int, way_id: int, semantic: bool = True) -> Tuple[int, int]: - line_nodes: Dict[Tuple[int, int], int] = {} - plain_nodes: Dict[int, Tuple[float, float]] = {} - point_nodes: List[str] = [] - ways: List[Tuple[int, List[int], Dict[str, str]]] = [] - - def alloc_node(lon: float, lat: float) -> int: - nonlocal node_id - key = _node_key(lon, lat) - if key in line_nodes: - return line_nodes[key] - nid = node_id - node_id -= 1 - line_nodes[key] = nid - plain_nodes[nid] = (lon, lat) - return nid - - for f in features: - tags = tags_for_feature(f, semantic=semantic) - if not _is_useful_feature(tags): - continue - if f.geom_type == 'Point': - lon, lat = f.coords - nid = node_id - node_id -= 1 - node_lines = [f' '] - for k, v in tags.items(): - node_lines.append(f' ') - node_lines.append(' ') - point_nodes.append('\n'.join(node_lines)) - else: - coords = f.coords if f.geom_type == 'LineString' else f.coords[0] - node_ids = [alloc_node(lon, lat) for lon, lat in coords] - if len(node_ids) < 2: - continue - wid = way_id - way_id -= 1 - if f.geom_type == 'Polygon': - tags['area'] = 'yes' - ways.append((wid, node_ids, tags)) - - for nid in sorted(plain_nodes.keys(), reverse=True): - lon, lat = plain_nodes[nid] - fh.write(f' \n') - for chunk in point_nodes: - fh.write(chunk) - fh.write('\n') - for wid, node_ids, tags in ways: - fh.write(f' \n') - for nid in node_ids: - fh.write(f' \n') - for k, v in tags.items(): - fh.write(f' \n') - fh.write(' \n') - return node_id, way_id - - -def write_osm(features: List[Feature], path: Path, semantic: bool = True) -> None: - with _maybe_open_text(path) as fh: - fh.write('\n') - fh.write('\n') - _serialize_osm_chunk(fh, features, node_id=-1, way_id=-1, semantic=semantic) - fh.write('\n') - - -def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] = None, - bbox: Optional[Tuple[float, float, float, float]] = None, - semantic: bool = True) -> Dict[str, object]: - raw = img_path.read_bytes() - container = ImgContainer(raw) - all_sets = _all_mapsets(container.files) - selected = set(s.upper() for s in mapsets) if mapsets else None - total_kind_counter = Counter() - total_features = 0 - mapset_meta: List[Dict[str, object]] = [] - node_id = -1 - way_id = -1 - with _maybe_open_text(path) as fh: - fh.write('\n') - fh.write('\n') - for name, subs in all_sets.items(): - if selected and name.upper() not in selected: - continue - feats, meta = parse_mapset_features(name, subs) - if bbox is not None: - feats = [f for f in feats if _intersects_bbox(f, bbox)] - meta['feature_count_after_bbox'] = len(feats) - total_features += len(feats) - for f in feats: - total_kind_counter[f.props.get('garmin_kind') or 'unknown'] += 1 - node_id, way_id = _serialize_osm_chunk(fh, feats, node_id=node_id, way_id=way_id, semantic=semantic) - mapset_meta.append(meta) - fh.write('\n') - return { - 'img_file': str(img_path), - 'block_size': container.block_size, - 'mapset_count': len(all_sets), - 'selected_mapsets': mapsets or sorted(all_sets.keys()), - 'mapsets': mapset_meta, - 'feature_count': total_features, - 'kind_counts': dict(total_kind_counter), - } - - -def load_features_from_img( - img_path: Path, - mapsets: Optional[List[str]] = None, - bbox: Optional[Tuple[float, float, float, float]] = None, -) -> Tuple[List[Feature], Dict[str, object]]: - raw = img_path.read_bytes() - container = ImgContainer(raw) - all_sets = _all_mapsets(container.files) - selected = set(s.upper() for s in mapsets) if mapsets else None - features: List[Feature] = [] - mapset_meta: List[Dict[str, object]] = [] - for name, subs in all_sets.items(): - if selected and name.upper() not in selected: - continue - feats, meta = parse_mapset_features(name, subs) - if bbox is not None: - feats = [f for f in feats if _intersects_bbox(f, bbox)] - meta['feature_count_after_bbox'] = len(feats) - features.extend(feats) - mapset_meta.append(meta) - meta = { - 'img_file': str(img_path), - 'block_size': container.block_size, - 'mapset_count': len(all_sets), - 'selected_mapsets': mapsets or sorted(all_sets.keys()), - 'mapsets': mapset_meta, - 'feature_count': len(features), - 'type_stats': collect_type_stats(features), - } - return features, meta - - -def main() -> int: - ap = argparse.ArgumentParser(description='Extract vector features from a Garmin IMG and export GeoJSON / OSM XML suitable for further conversion to OsmAnd .obf.') - ap.add_argument('img', type=Path, help='Input Garmin .img file') - ap.add_argument('--geojson', type=Path, help='Write GeoJSON or .geojson.gz output') - ap.add_argument('--osm', type=Path, help='Write OSM XML or .osm.gz output') - ap.add_argument('--meta-json', type=Path, help='Write parse metadata JSON') - ap.add_argument('--mapset', action='append', help='Process only this TRE/RGN family id (repeatable), e.g. 02234008') - ap.add_argument('--bbox', help='Clip by WGS84 bbox: west,south,east,north') - ap.add_argument('--list-mapsets', action='store_true', help='List available mapsets and exit') - ap.add_argument('--raw-only', action='store_true', help='Do not add semantic OSM tags; only preserve raw garmin:* tags') - args = ap.parse_args() - - if args.list_mapsets: - container = ImgContainer(args.img.read_bytes()) - for name, subs in _all_mapsets(container.files).items(): - tre = TRE(subs['TRE']) - print(f'{name}\t{to_deg(tre.west):.6f},{to_deg(tre.south):.6f},{to_deg(tre.east):.6f},{to_deg(tre.north):.6f}') - return 0 - - if not args.geojson and not args.osm and not args.meta_json: - ap.error('provide at least one of --geojson, --osm, --meta-json or use --list-mapsets') - - bbox = _parse_bbox(args.bbox) - if args.osm and not args.geojson: - meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only) - info(f'parsed {meta.get("feature_count", 0)} features from {len(meta.get("mapsets", []))} mapsets') - info(f'wrote OSM XML: {args.osm}') - if args.meta_json: - args.meta_json.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8') - info(f'wrote metadata: {args.meta_json}') - return 0 - - features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox) - info(f'parsed {len(features)} features from {len(meta.get("mapsets", []))} mapsets') - - if args.geojson: - write_geojson(features, args.geojson) - info(f'wrote GeoJSON: {args.geojson}') - if args.osm: - write_osm(features, args.osm, semantic=not args.raw_only) - info(f'wrote OSM XML: {args.osm}') - if args.meta_json: - args.meta_json.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8') - info(f'wrote metadata: {args.meta_json}') - return 0 - - -if __name__ == '__main__': - raise SystemExit(main())