Files
garmin-img-format-parsing/garmin_img_to_osmand_v4.py
2026-04-14 14:48:54 -07:00

1812 lines
71 KiB
Python

#!/usr/bin/env python3
"""
Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML.
What it does well:
- Reads classic Garmin IMG container FAT and extracts subfiles.
- Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where
TRE/RGN/LBL offsets are stored inside the .GMP container.
- Parses TRE levels/subdivisions.
- Parses LBL labels (coding 6, 9, 10) with common codepage handling.
- Parses standard points, extended points, standard polylines/polygons, and
extended polylines/polygons from RGN.
- Exports GeoJSON and/or OSM XML.
What it does NOT promise:
- Full Garmin NT routing/address semantics.
- Locked/compressed/vendor-obfuscated maps.
- Perfect type-to-OSM semantic translation. The exporter preserves Garmin type
codes as tags instead of inventing OSM semantics.
This is a practical reverse-engineering tool, not a complete implementation of
all Garmin IMG variants.
"""
from __future__ import annotations
import argparse
import csv
import io
import json
import math
import sys
import gzip
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
from xml.sax.saxutils import escape as xml_escape
# -------------------------
# Low-level helpers
# -------------------------
COORD_FACTOR = 360.0 / (1 << 24)
FAT_BLOCK_SIZE = 0x200
FAT_ENTRY_SIZE = 0x200
MAX_FAT_BLOCKLIST = 240
SEG_POINT = 0
SEG_IPOINT = 1
SEG_POLYLINE = 2
SEG_POLYGON = 3
SEG_EXTPOLYGON = 4
SEG_EXTPOLYLINE = 5
SEG_EXTPOINT = 6
OBJ_POINT = 0x10
OBJ_INDEXED_POINT = 0x20
OBJ_POLYLINE = 0x40
OBJ_POLYGON = 0x80
OBJ_EXT_POLYGON = 0x100
OBJ_EXT_POLYLINE = 0x200
OBJ_EXT_POINT = 0x400
def warn(msg: str) -> None:
print(f"[warn] {msg}", file=sys.stderr)
def info(msg: str) -> None:
print(f"[info] {msg}", file=sys.stderr)
def read_u16le(buf: bytes, off: int) -> int:
return int.from_bytes(buf[off:off + 2], "little", signed=False)
def read_s16le(buf: bytes, off: int) -> int:
return int.from_bytes(buf[off:off + 2], "little", signed=True)
def read_u24le(buf: bytes, off: int) -> int:
return int.from_bytes(buf[off:off + 3], "little", signed=False)
def read_s24le(buf: bytes, off: int) -> int:
raw = read_u24le(buf, off)
if raw & 0x800000:
raw -= 1 << 24
return raw
def read_u32le(buf: bytes, off: int) -> int:
return int.from_bytes(buf[off:off + 4], "little", signed=False)
def to_deg(coord: int) -> float:
return coord * COORD_FACTOR
def decode_ascii_z(data: bytes) -> str:
return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
# -------------------------
# Container extraction
# -------------------------
@dataclass
class FatRecord:
filename: str
ext: str
size: int
blocks: List[int]
offset_in_fat: int
class ImgContainer:
def __init__(self, raw: bytes):
self.raw = raw
# Some IMG files are XOR'd by a single byte stored at byte 0.
xor_byte = raw[0]
if xor_byte not in (0x00,):
maybe = bytes(b ^ xor_byte for b in raw)
sig = maybe[0x10:0x17]
ident = maybe[0x41:0x48]
if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"):
info(f"applied XOR decode with byte 0x{xor_byte:02x}")
self.raw = maybe
self.block_size = self._read_block_size()
self.fat_start = self._read_fat_start()
self.files = self._extract_subfiles()
def _read_block_size(self) -> int:
e1 = self.raw[0x61]
e2 = self.raw[0x62]
return 1 << (e1 + e2)
def _read_fat_start(self) -> int:
fat_phys_block = self.raw[0x40]
return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE
def _parse_fat_chain(self) -> List[FatRecord]:
records: List[FatRecord] = []
off = self.fat_start
seen_offsets = set()
while off + FAT_ENTRY_SIZE <= len(self.raw):
if off in seen_offsets:
break
seen_offsets.add(off)
first = self.raw[off]
if first != 0x01:
break
name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00")
ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00")
size = read_u32le(self.raw, off + 12)
next_fat = read_u16le(self.raw, off + 16)
blocks = []
boff = off + 0x20
for i in range(MAX_FAT_BLOCKLIST):
blk = read_u16le(self.raw, boff + i * 2)
if blk == 0xFFFF:
break
blocks.append(blk)
if next_fat == 0:
records.append(FatRecord(name, ext, size, blocks, off))
off += FAT_ENTRY_SIZE
return records
def _collect_blocks(self, start_record: FatRecord) -> bytes:
data = bytearray()
blocks = list(start_record.blocks)
current_offset = start_record.offset_in_fat
# Follow FAT continuation blocks when next_fat is used.
while True:
next_fat = read_u16le(self.raw, current_offset + 16)
if next_fat == 0:
break
current_offset += FAT_ENTRY_SIZE
if current_offset + FAT_ENTRY_SIZE > len(self.raw):
break
boff = current_offset + 0x20
for i in range(MAX_FAT_BLOCKLIST):
blk = read_u16le(self.raw, boff + i * 2)
if blk == 0xFFFF:
break
blocks.append(blk)
for blk in blocks:
start = blk * self.block_size
end = start + self.block_size
if end > len(self.raw):
break
data.extend(self.raw[start:end])
return bytes(data[:start_record.size])
def _extract_subfiles(self) -> Dict[str, bytes]:
out: Dict[str, bytes] = {}
for rec in self._parse_fat_chain():
key = f"{rec.filename}.{rec.ext}".upper()
out[key] = self._collect_blocks(rec)
return out
# -------------------------
# Core format structures
# -------------------------
@dataclass
class LevelInfo:
level: int
bits_per_coord: int
inherited: bool
present: bool = True
@dataclass
class Subdivision:
index: int
level: int
data_offset: int
object_types: int
lon_center: int
lat_center: int
width: int
height: int
index_next_level: int = 0
last: bool = False
data_end: int = 0
data_ext_polygon_offset: int = 0
data_ext_polygon_end: int = 0
data_ext_polyline_offset: int = 0
data_ext_polyline_end: int = 0
data_ext_poi_offset: int = 0
data_ext_poi_end: int = 0
children: List["Subdivision"] = field(default_factory=list)
def nb_object_types(self) -> int:
count = 0
cur = 0x10
for _ in range(4):
if self.object_types & cur:
count += 1
cur <<= 1
return count
@dataclass
class Feature:
geom_type: str # Point | LineString | Polygon
coords: object
props: Dict[str, object]
# -------------------------
# LBL parser
# -------------------------
class LBL:
NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R',
'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5',
'6', '7', '8', '9', '~', '~', '~', '~', '~', '~']
SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~',
'~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~',
'~', '~', '~', '~', '~', '[', '\\', ']', '^', '_']
SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5',
'6', '7', '8', '9', '~', '~', '~', '~', '~', '~']
def __init__(self, data: Optional[bytes]):
self.data = data or b""
self.ok = bool(data)
self.data_offset = 0
self.data_length = 0
self.data_offset_multiplier = 1
self.label_coding = 6
self.codepage = 1252
if self.ok:
self._parse_header()
def _parse_header(self) -> None:
header_length = read_u16le(self.data, 0)
self.data_offset = read_u32le(self.data, 0x15)
self.data_length = read_u32le(self.data, 0x19)
self.data_offset_multiplier = 1 << self.data[0x1D]
self.label_coding = self.data[0x1E]
if len(self.data) >= 0xAC:
self.codepage = read_u16le(self.data, 0xAA)
def get_label(self, offset: int) -> str:
if not self.ok or offset == 0:
return ""
actual = self.data_offset + offset * self.data_offset_multiplier
if actual < 0 or actual >= len(self.data):
return ""
if self.label_coding == 6:
return self._get_label6(actual)
return self._get_label8_10(actual)
def _get_label8_10(self, off: int) -> str:
end = off
while end < len(self.data) and self.data[end] != 0:
end += 1
raw = self.data[off:end]
enc = None
cp = self.codepage
if cp in (0, 850):
enc = "cp1252"
elif cp == 65001:
enc = "utf-8"
elif cp == 932:
enc = "cp932"
elif cp == 950:
enc = "big5"
else:
enc = f"cp{cp}"
try:
return raw.decode(enc, errors="replace")
except Exception:
return raw.decode("latin1", errors="replace")
def _get_label6(self, off: int) -> str:
out: List[str] = []
charset = "NORMAL"
pos = off
while pos + 3 <= len(self.data):
b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2]
pos += 3
codes = [
b1 >> 2,
((b1 & 0x3) << 4) | (b2 >> 4),
((b2 & 0xF) << 2) | (b3 >> 6),
b3 & 0x3F,
]
for c in codes:
if c > 0x2F:
return "".join(out).strip()
if charset == "NORMAL":
if c == 0x1C:
charset = "SYMBOL"
elif c == 0x1B:
charset = "SPECIAL"
elif c == 0x1D:
out.append("|")
elif c in (0x1E, 0x1F):
out.append(" ")
else:
out.append(self.NORMAL_CHARS[c])
elif charset == "SYMBOL":
out.append(self.SYMBOL_CHARS[c])
charset = "NORMAL"
else:
out.append(self.SPECIAL_CHARS[c])
charset = "NORMAL"
return "".join(out).strip()
# -------------------------
# TRE parser
# -------------------------
class TRE:
def __init__(self, data: bytes):
self.data = data
self.header_length = read_u16le(data, 0)
self.north = read_s24le(data, 0x15)
self.east = read_s24le(data, 0x18)
self.south = read_s24le(data, 0x1B)
self.west = read_s24le(data, 0x1E)
self.levels: Dict[int, LevelInfo] = {}
self.max_level = 0
self.min_level = 15
self.extended_types = False
self.extended_types_offset = 0
self.extended_types_length = 0
self.extended_types_size = 0
self.extended_types_number = 0
self.decalaje_extended_types = 0
self.subdivisions_count = 1
self.root_subdivisions: List[Subdivision] = []
self.subdivisions_by_index: Dict[int, Subdivision] = {}
self._parse()
def _parse(self) -> None:
self._parse_levels()
self._parse_tre7()
self._parse_subdivisions()
def _parse_tre7(self) -> None:
if self.header_length >= 0x7C + 10:
self.extended_types_offset = read_u32le(self.data, 0x7C)
self.extended_types_length = read_u32le(self.data, 0x80)
self.extended_types_size = read_u16le(self.data, 0x84)
if self.extended_types_size > 0:
self.extended_types_number = self.extended_types_length // self.extended_types_size
self.extended_types = self.extended_types_length > 0
self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number
def _parse_levels(self) -> None:
levels_offset = read_u32le(self.data, 0x21)
levels_length = read_u32le(self.data, 0x25)
pos = levels_offset
end = levels_offset + levels_length
while pos + 4 <= end and pos + 4 <= len(self.data):
zoom = self.data[pos]
bits = self.data[pos + 1]
count = read_u16le(self.data, pos + 2)
_ = count
level = zoom & 0xF
inherited = bool(zoom & 0x80)
self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited)
self.max_level = max(self.max_level, level)
self.min_level = min(self.min_level, level)
self.subdivisions_count += count
pos += 4
def get_resolution(self, level: int) -> int:
return self.levels[level].bits_per_coord
def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int:
shift = 24 - self.get_resolution(level) - additional_accuracy
if shift >= 0:
return value << shift
return value >> (-shift)
def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]:
data_offset = read_u24le(self.data, pos)
object_types = self.data[pos + 3]
if object_types & 0x0F:
data_offset += (object_types & 0x0F) * (1 << 24)
lon_center = read_s24le(self.data, pos + 4)
lat_center = read_s24le(self.data, pos + 7)
width = read_u16le(self.data, pos + 10)
last = False
if width & 0x8000:
width &= 0x7FFF
last = True
height = read_u16le(self.data, pos + 12)
index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0
sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types,
lon_center=lon_center, lat_center=lat_center, width=width, height=height,
index_next_level=index_next, last=last)
# Extended offsets per subdivision, if present.
if self.extended_types:
indice = index - self.decalaje_extended_types
if indice > 0 and self.extended_types_size >= 8:
p = self.extended_types_offset + (indice - 1) * self.extended_types_size
if p + self.extended_types_size <= len(self.data):
sub.data_ext_polygon_offset = read_u32le(self.data, p)
if self.extended_types_size >= 8:
sub.data_ext_polyline_offset = read_u32le(self.data, p + 4)
if self.extended_types_size >= 12:
sub.data_ext_poi_offset = read_u32le(self.data, p + 8)
return sub, pos + record_size
def _parse_subdivisions(self) -> None:
sub_offset = read_u32le(self.data, 0x29)
sub_length = read_u32le(self.data, 0x2D)
end = sub_offset + sub_length
if end > len(self.data):
end = len(self.data)
present_levels = sorted(self.levels.keys(), reverse=True)
if not present_levels:
return
current_root_level = present_levels[0]
index = 1
pos = sub_offset
roots: List[Subdivision] = []
# Parse all 16-byte records first until last root.
while pos + 16 <= end:
sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index)
roots.append(sub)
self.subdivisions_by_index[index] = sub
index += 1
if sub.last:
break
self.root_subdivisions = roots
# Recursively parse children using the index_next_level scheme.
self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index)
# Compute data ends by sorted data offsets.
ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index))
for i, sub in enumerate(ordered):
if i + 1 < len(ordered):
sub.data_end = ordered[i + 1].data_offset
else:
sub.data_end = 0
# Extended segment ends.
for attr_start, attr_end in [
("data_ext_polygon_offset", "data_ext_polygon_end"),
("data_ext_polyline_offset", "data_ext_polyline_end"),
("data_ext_poi_offset", "data_ext_poi_end"),
]:
items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)),
key=lambda s: getattr(s, attr_start))
for i, sub in enumerate(items):
if i + 1 < len(items):
setattr(sub, attr_end, getattr(items[i + 1], attr_start))
else:
setattr(sub, attr_end, 0)
def _next_present_level(self, level: int) -> int:
while level > 0 and level not in self.levels:
level -= 1
return level
def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None:
level = self._next_present_level(level)
if level <= 0:
return
for parent in parents:
if parent.index_next_level <= 0:
continue
idx = parent.index_next_level
if idx <= 0:
continue
# Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level.
record_size = 14 if level == self.min_level else 16
pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14
# Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed.
if record_size == 14 and pos + 14 > end:
pos = min(end, sub_offset + len(self.root_subdivisions) * 16)
children: List[Subdivision] = []
while pos + record_size <= end:
try:
sub, pos = self._parse_subdiv_record(pos, level, record_size, idx)
except Exception:
break
children.append(sub)
self.subdivisions_by_index[idx] = sub
idx += 1
if sub.last:
break
parent.children = children
child_level = self._next_present_level(level - 1)
if child_level > 0 and children:
self._parse_children(children, child_level, sub_offset, end, idx)
# -------------------------
# RGN parser
# -------------------------
class BitStreamReader:
def __init__(self, data: bytes, start: int, length_bytes: int):
self.data = data
self.pos = start
self.remaining_bytes = length_bytes
self.remaining_bits = 0
self.cur_byte = 0
def has_next(self, nbits: int) -> bool:
return self.remaining_bytes * 8 + self.remaining_bits >= nbits
def finish(self) -> int:
self.pos += self.remaining_bytes
self.remaining_bytes = 0
self.remaining_bits = 0
return self.pos
def _get_if_needed(self) -> None:
if self.remaining_bits == 0:
if self.remaining_bytes <= 0:
raise EOFError
self.cur_byte = self.data[self.pos]
self.pos += 1
self.remaining_bytes -= 1
self.remaining_bits = 8
def read_next_bits(self, to_get: int) -> int:
cur_pos = 0
result = 0
while cur_pos < to_get:
self._get_if_needed()
remaining_to_get = to_get - cur_pos
if remaining_to_get >= self.remaining_bits:
result |= self.cur_byte << cur_pos
cur_pos += self.remaining_bits
self.remaining_bits = 0
else:
mask = (1 << remaining_to_get) - 1
result |= (self.cur_byte & mask) << cur_pos
self.cur_byte >>= remaining_to_get
self.remaining_bits -= remaining_to_get
return result
return result
def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int:
if sign == 0:
value = self.read_next_bits(nbits)
sign_mask = 1 << (nbits - 1)
if value & sign_mask:
comp = value ^ sign_mask
if extra_bit == 0:
if comp != 0:
return comp - sign_mask
other = self.read_coord_offset(nbits, sign, extra_bit)
if other < 0:
return 1 - value + other
return value - 1 + other
else:
if comp & 0xFFFFFE:
return (comp & 0xFFFFFE) - sign_mask
other = self.read_coord_offset(nbits - 1, sign, 0)
if other < 0:
return 1 - sign_mask + 1 + (other << 1)
return sign_mask - 1 - 1 + (other << 1)
else:
if extra_bit > 0:
return value & 0xFFFFFE
return value
else:
value = self.read_next_bits(nbits)
if extra_bit > 0:
return (((value >> 1) * sign) << 1)
return value * sign
class RGN:
def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]):
self.data = data
self.tre = tre
self.lbl = lbl or LBL(None)
self.header_length = read_u16le(data, 0)
self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0
self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0
self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0
self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0
self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0
self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0
self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0
self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0
def data_end(self) -> int:
return self.data_length
def ext_polygon_end(self) -> int:
return self.ext_poly_length
def ext_polyline_end(self) -> int:
return self.ext_line_length
def ext_poi_end(self) -> int:
return self.ext_poi_length
@staticmethod
def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int:
add = 0
if sign == 0:
add += 1
add += extra_bit
if i <= 9:
return i + 2 + add
return 2 * i - 9 + 2 + add
def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int:
return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc)
def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int:
return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc)
def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]:
result: List[Optional[Tuple[int, int]]] = [None] * 7
offset = sub.data_offset + self.data_offset
end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset
if sub.object_types == 0:
return result
if sub.data_end and end > len(self.data):
end = len(self.data)
if sub.data_end and end > offset and sub.nb_object_types() > 0:
if sub.object_types & OBJ_POINT:
result[SEG_POINT] = (0, 0)
if sub.object_types & OBJ_INDEXED_POINT:
result[SEG_IPOINT] = (0, 0)
if sub.object_types & OBJ_POLYLINE:
result[SEG_POLYLINE] = (0, 0)
if sub.object_types & OBJ_POLYGON:
result[SEG_POLYGON] = (0, 0)
order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON]
nb_pointers = sub.nb_object_types() - 1
if offset + nb_pointers * 2 <= len(self.data):
segment_start = offset + nb_pointers * 2
cur_idx = 0
p = offset
for _ in range(nb_pointers):
while cur_idx < 4 and result[order[cur_idx]] is None:
cur_idx += 1
if cur_idx >= 4:
break
segment_end = read_u16le(self.data, p) + offset
p += 2
if segment_end > end or segment_end <= segment_start:
result[order[cur_idx]] = None
else:
result[order[cur_idx]] = (segment_start, segment_end)
segment_start = segment_end
cur_idx += 1
while cur_idx < 4 and result[order[cur_idx]] is None:
cur_idx += 1
if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end:
result[order[cur_idx]] = (segment_start, end)
if sub.data_ext_polygon_offset:
s = self.ext_poly_offset + sub.data_ext_polygon_offset
e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length)
if e > s:
result[SEG_EXTPOLYGON] = (s, e)
if sub.data_ext_polyline_offset:
s = self.ext_line_offset + sub.data_ext_polyline_offset
e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length)
if e > s:
result[SEG_EXTPOLYLINE] = (s, e)
if sub.data_ext_poi_offset:
s = self.ext_poi_offset + sub.data_ext_poi_offset
e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length)
if e > s:
result[SEG_EXTPOINT] = (s, e)
return result
def parse_features(self) -> List[Feature]:
# Finalize subdivision end markers using RGN section lengths.
ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index))
for i, sub in enumerate(ordered):
if sub.data_end == 0:
sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset
for attr_start, final_end in [
("data_ext_polygon_offset", self.ext_poly_length),
("data_ext_polyline_offset", self.ext_line_length),
("data_ext_poi_offset", self.ext_poi_length),
]:
items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)),
key=lambda s: getattr(s, attr_start))
for i, sub in enumerate(items):
if attr_start == "data_ext_polygon_offset":
setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
elif attr_start == "data_ext_polyline_offset":
setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
else:
setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
feats: List[Feature] = []
for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index):
segs = self._segments(sub)
if segs[SEG_POINT]:
feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False))
if segs[SEG_IPOINT]:
feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True))
if segs[SEG_EXTPOINT]:
feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT]))
if segs[SEG_POLYLINE]:
feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False))
if segs[SEG_POLYGON]:
feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False))
if segs[SEG_EXTPOLYLINE]:
feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True))
if segs[SEG_EXTPOLYGON]:
feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True))
return feats
def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]:
feats: List[Feature] = []
pos, end = seg
while pos < end and pos + 8 <= len(self.data):
typ = self.data[pos]
info24 = read_u24le(self.data, pos + 1)
has_subtype = bool(info24 & 0x800000)
is_poi = bool(info24 & 0x400000)
lbl_off = info24 & 0x3FFFFF
lon_delta = read_s16le(self.data, pos + 4)
lat_delta = read_s16le(self.data, pos + 6)
pos += 8
subtype = 0
if has_subtype and pos < end:
subtype = self.data[pos]
pos += 1
name = self.lbl.get_label(lbl_off) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
feats.append(Feature(
geom_type="Point",
coords=[lon, lat],
props={
"garmin_kind": "indexed_point" if indexed else "point",
"garmin_type": f"0x{typ:02x}",
"garmin_subtype": f"0x{subtype:02x}",
"garmin_is_poi": is_poi,
"name": name,
},
))
return feats
def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]:
feats: List[Feature] = []
pos, end = seg
while pos < end and pos + 6 <= len(self.data):
typ = self.data[pos]
subtype_raw = self.data[pos + 1]
has_lbl = bool(subtype_raw & 0x20)
subtype = subtype_raw % 32
full_type = ((typ + 0x100) << 8) + subtype
lon_delta = read_s16le(self.data, pos + 2)
lat_delta = read_s16le(self.data, pos + 4)
pos += 6
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
if has_lbl:
pos += 3
name = self.lbl.get_label(lbl_off) if lbl_off else ""
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
feats.append(Feature(
geom_type="Point",
coords=[lon, lat],
props={
"garmin_kind": "extended_point",
"garmin_type": f"0x{full_type:04x}",
"name": name,
},
))
return feats
def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]:
feats: List[Feature] = []
pos, end = seg
while pos < end:
try:
if not extended:
if pos + 10 > end:
break
info1 = self.data[pos]
pos += 1
if line:
typ = info1 & 0x3F
direction = bool(info1 & 0x40)
else:
typ = info1 & 0x7F
direction = False
two_byte_len = bool(info1 & 0x80)
info24 = read_u24le(self.data, pos)
pos += 3
lbl_off = info24 & 0x3FFFFF
extra_bit = 1 if (info24 & 0x400000) else 0
data_in_net = bool(info24 & 0x800000)
lon_delta = read_s16le(self.data, pos)
lat_delta = read_s16le(self.data, pos + 2)
pos += 4
bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos]
pos += 2 if two_byte_len else 1
bitstream_info = self.data[pos]
pos += 1
long_sign = 0
lat_sign = 0
long_extra_bit = extra_bit
lat_extra_bit = 0
full_type = typ
else:
if pos + 8 > end:
break
typ = self.data[pos]
subtype_raw = self.data[pos + 1]
has_lbl = bool(subtype_raw & 0x20)
subtype = subtype_raw % 32
full_type = ((typ + 0x100) << 8) + subtype
lon_delta = read_s16le(self.data, pos + 2)
lat_delta = read_s16le(self.data, pos + 4)
pos += 6
bitstream_len_byte = self.data[pos]
pos += 1
if bitstream_len_byte % 2 == 0:
if pos >= end:
break
bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1
pos += 1
else:
bitstream_len = bitstream_len_byte // 2 - 1
bitstream_info = self.data[pos]
pos += 1
direction = False
data_in_net = False
long_sign = 0
lat_sign = 0
long_extra_bit = 0
lat_extra_bit = 0
reader = BitStreamReader(self.data, pos, bitstream_len)
if reader.read_next_bits(1) != 0:
long_sign = +1 if reader.read_next_bits(1) == 0 else -1
if reader.read_next_bits(1) != 0:
lat_sign = +1 if reader.read_next_bits(1) == 0 else -1
if extended:
long_extra_bit = reader.read_next_bits(1)
long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit)
lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit)
cur_lon = lon_delta
cur_lat = lat_delta
pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]]
cur_lon <<= long_extra_bit
cur_lat <<= lat_extra_bit
while reader.has_next(long_bits + lat_bits):
dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit)
dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit)
cur_lon += dlon
cur_lat += dlat
pts.append([
to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)),
to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)),
])
pos = reader.finish()
lbl_off = 0 if extended else lbl_off
if extended:
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
if has_lbl:
pos += 3
name = self.lbl.get_label(lbl_off) if lbl_off else ""
if not line:
if pts and pts[0] != pts[-1]:
pts.append(pts[0])
feats.append(Feature(
geom_type="Polygon",
coords=[pts],
props={
"garmin_kind": "extended_polygon" if extended else "polygon",
"garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}",
"garmin_direction": direction,
"garmin_data_in_net": data_in_net,
"name": name,
},
))
else:
feats.append(Feature(
geom_type="LineString",
coords=pts,
props={
"garmin_kind": "extended_polyline" if extended else "polyline",
"garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}",
"garmin_direction": direction,
"garmin_data_in_net": data_in_net,
"name": name,
},
))
except Exception:
# Stop current segment on malformed data instead of crashing the whole file.
break
return feats
# -------------------------
# Output writers and semantic mapping
# -------------------------
def feature_to_geojson(f: Feature) -> Dict[str, object]:
props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})}
return {
"type": "Feature",
"geometry": {"type": f.geom_type, "coordinates": f.coords},
"properties": props,
}
def _osm_escape(v: object) -> str:
return xml_escape(str(v), {'"': '&quot;'})
def _maybe_open_text(path: Path):
if str(path).lower().endswith('.gz'):
return gzip.open(path, 'wt', encoding='utf-8', newline='\n')
return open(path, 'w', encoding='utf-8', newline='\n')
def _parse_bbox(text: Optional[str]) -> Optional[Tuple[float, float, float, float]]:
if not text:
return None
parts = [p.strip() for p in text.split(',')]
if len(parts) != 4:
raise ValueError('bbox must be west,south,east,north')
west, south, east, north = map(float, parts)
if west > east or south > north:
raise ValueError('invalid bbox ordering')
return west, south, east, north
def _feature_bounds(f: Feature) -> Tuple[float, float, float, float]:
if f.geom_type == 'Point':
lon, lat = f.coords
return lon, lat, lon, lat
if f.geom_type == 'LineString':
pts = f.coords
else:
pts = f.coords[0]
xs = [p[0] for p in pts]
ys = [p[1] for p in pts]
return min(xs), min(ys), max(xs), max(ys)
def _intersects_bbox(f: Feature, bbox: Optional[Tuple[float, float, float, float]]) -> bool:
if bbox is None:
return True
west, south, east, north = bbox
a_w, a_s, a_e, a_n = _feature_bounds(f)
return not (a_w > east or a_e < west or a_s > north or a_n < south)
def _all_mapsets(files: Dict[str, bytes]) -> Dict[str, Dict[str, bytes]]:
groups: Dict[str, Dict[str, bytes]] = defaultdict(dict)
for key, data in files.items():
if '.' not in key:
continue
base, ext = key.rsplit('.', 1)
groups[base.upper()][ext.upper()] = data
out: Dict[str, Dict[str, bytes]] = {}
for base, subs in groups.items():
if 'TRE' in subs and 'RGN' in subs:
out[base] = subs
return dict(sorted(out.items()))
# Default semantic mapping. These are based on common Garmin/mkgmap conventions,
# plus a few heuristics for map labels commonly found in topographic IMG files.
LINE_TAGS: Dict[str, Dict[str, str]] = {
'0x01': {'highway': 'motorway'},
'0x02': {'highway': 'primary'},
'0x03': {'highway': 'secondary'},
'0x04': {'highway': 'tertiary'},
'0x05': {'highway': 'unclassified'},
'0x06': {'highway': 'residential'},
'0x07': {'highway': 'service'},
'0x08': {'highway': 'construction'},
'0x09': {'highway': 'road'},
'0x0a': {'highway': 'track', 'surface': 'unpaved'},
'0x0c': {'highway': 'road', 'junction': 'roundabout'},
'0x0d': {'highway': 'path'},
'0x0e': {'highway': 'track', 'tracktype': 'grade1'},
'0x0f': {'highway': 'track', 'tracktype': 'grade2'},
'0x10': {'highway': 'track', 'tracktype': 'grade3'},
'0x11': {'highway': 'track', 'tracktype': 'grade4'},
'0x12': {'highway': 'track', 'tracktype': 'grade5'},
'0x13': {'highway': 'steps'},
'0x14': {'railway': 'rail'},
'0x15': {'natural': 'coastline'},
'0x16': {'highway': 'cycleway'},
'0x17': {'highway': 'bridleway'},
'0x18': {'waterway': 'stream'},
'0x1a': {'route': 'ferry'},
'0x1f': {'waterway': 'river'},
'0x27': {'aeroway': 'runway'},
'0x28': {'man_made': 'pipeline'},
'0x29': {'power': 'line'},
'0x31': {'natural': 'cliff'},
'0x32': {'barrier': 'wall'},
'0x33': {'barrier': 'fence'},
'0x34': {'barrier': 'hedge'},
'0x38': {'aerialway': 'cable_car'},
'0x39': {'railway': 'tram'},
}
POLYGON_TAGS: Dict[str, Dict[str, str]] = {
'0x03': {'landuse': 'residential'},
'0x05': {'amenity': 'parking'},
'0x09': {'leisure': 'marina'},
'0x0b': {'amenity': 'hospital'},
'0x0c': {'landuse': 'industrial'},
'0x14': {'natural': 'heath'},
'0x15': {'natural': 'wood'},
'0x16': {'leisure': 'nature_reserve'},
'0x17': {'leisure': 'park'},
'0x18': {'leisure': 'golf_course'},
'0x19': {'leisure': 'sports_centre'},
'0x1a': {'landuse': 'cemetery'},
'0x2a': {'landuse': 'farmland'},
'0x2b': {'landuse': 'farmyard'},
'0x2c': {'landuse': 'vineyard'},
'0x2d': {'landuse': 'quarry'},
'0x2e': {'tourism': 'camp_site'},
'0x32': {'natural': 'water', 'water': 'sea'},
'0x35': {'landuse': 'meadow'},
'0x3c': {'natural': 'water'},
'0x3d': {'natural': 'beach'},
'0x3e': {'natural': 'water'},
'0x3f': {'landuse': 'reservoir'},
'0x40': {'natural': 'water'},
'0x41': {'natural': 'water'},
'0x46': {'waterway': 'riverbank'},
'0x4c': {'natural': 'water', 'intermittent': 'yes'},
'0x4d': {'natural': 'glacier'},
'0x4e': {'landuse': 'orchard'},
'0x4f': {'natural': 'scrub'},
'0x50': {'natural': 'wood'},
'0x51': {'natural': 'wetland'},
'0x52': {'natural': 'heath'}, # heuristic: Garmin default "Tundra"
'0x53': {'natural': 'bare_rock'}, # heuristic: Garmin default "Flat"
}
POINT_TAGS: Dict[Tuple[str, Optional[str]], Dict[str, str]] = {
('0x04', '0x00'): {'place': 'city'},
('0x08', '0x00'): {'place': 'town'},
('0x0a', '0x00'): {'place': 'suburb'},
('0x0b', '0x00'): {'place': 'village'},
('0x0d', '0x00'): {'place': 'village'}, # heuristic for this sample topo IMG
('0x11', '0x00'): {'place': 'hamlet'},
('0x28', '0x00'): {'place': 'locality'}, # heuristic: local named spot labels in sample
('0x64', '0x03'): {'amenity': 'grave_yard'},
('0x64', '0x06'): {'highway': 'crossing'},
('0x64', '0x11'): {'man_made': 'tower'},
('0x64', '0x14'): {'amenity': 'drinking_water'},
('0x64', '0x17'): {'amenity': 'hunting_stand'},
('0x64', '0x18'): {'amenity': 'grit_bin'},
('0x65', '0x0a'): {'natural': 'glacier'},
('0x65', '0x0c'): {'place': 'island'},
('0x65', '0x11'): {'natural': 'spring'},
('0x66', '0x04'): {'natural': 'beach'},
('0x66', '0x07'): {'natural': 'cliff'},
('0x66', '0x0e'): {'natural': 'volcano'},
('0x66', '0x16'): {'natural': 'peak'},
('0x66', '0x19'): {'natural': 'cave_entrance'},
}
def _garmin_type_int(value: Optional[str]) -> Optional[int]:
if value is None:
return None
s = str(value).strip().lower()
if not s:
return None
try:
return int(s, 16) if s.startswith('0x') else int(s, 0)
except ValueError:
return None
def gpxsee_classes_for_feature(f: Feature) -> List[str]:
"""Classify a Garmin object using GPXSee-style type predicates from style_img.h.
GPXSee stores classic Garmin object ids as type<<8 (and standard points as
type<<8|subtype). Extended objects already carry their expanded ids.
"""
gt = _garmin_type_int(f.props.get('garmin_type'))
if gt is None:
return []
kind = str(f.props.get('garmin_kind') or '')
st = _garmin_type_int(f.props.get('garmin_subtype')) or 0
if gt < 0x10000:
if kind in ('point', 'indexed_point'):
gt = (gt << 8) | st
else:
gt = gt << 8
classes: List[str] = []
# GPXSee Style:: static predicates (ported from style_img.h).
if not ((0x0100 <= gt <= 0x1F00) or (0x11400 <= gt < 0x11500)):
classes.append('poi')
if (0x2000 <= gt <= 0x2500) or ((gt & 0xFFFF00) == 0x10900):
classes.append('contour_line')
if (0x3C00 <= gt <= 0x4400) or ((gt & 0xFFFF00) == 0x10B00):
classes.append('water_area')
if gt in (0x2600, 0x1800, 0x1F00):
classes.append('water_line')
if gt in (0x0400, 0x10901):
classes.append('military_area')
if gt in (0x1600, 0x10A03):
classes.append('nature_reserve')
if gt in (0x6200, 0x6300):
classes.append('spot')
if gt == 0x6616:
classes.append('summit')
if gt <= 0x0400:
classes.append('major_road')
if 0x1400 <= gt <= 0x153F:
classes.append('country')
if gt == 0x1E00:
classes.append('state')
if gt == 0x10703:
classes.append('marina')
if gt == 0x10613:
classes.append('raster')
if 0x10301 <= gt <= 0x10302:
classes.append('depth_point')
if 0x10400 <= gt <= 0x10401:
classes.append('obstruction_point')
if 0x10200 <= gt < 0x10300:
classes.append('buoy')
if 0x10100 <= gt < 0x10200:
classes.append('light')
if gt == 0x10500:
classes.append('label_point')
if gt == 0x10300:
classes.append('dh_point')
if 0x10100 <= gt < 0x10A00:
classes.append('marine_point')
if 0x10400 <= gt < 0x10700:
classes.append('styled_line')
if gt == 0x10601:
classes.append('cartographic_line')
if gt == 0x10108:
classes.append('recommended_route')
return classes
def _feature_type_rows(features: List[Feature], point_only: bool = False) -> List[Dict[str, object]]:
groups: Dict[Tuple[str, str, str, str], Dict[str, object]] = {}
for f in features:
if point_only and not _point_feature(f):
continue
sem = semantic_tags_for_feature(f)
classes = gpxsee_classes_for_feature(f)
key = (
f.geom_type,
str(f.props.get('garmin_kind') or ''),
str(f.props.get('garmin_type') or ''),
str(f.props.get('garmin_subtype') or ''),
)
g = groups.setdefault(key, {
'geom_type': key[0],
'garmin_kind': key[1],
'garmin_type': key[2],
'garmin_subtype': key[3],
'count': 0,
'named_count': 0,
'sample_name': '',
'semantic': {},
'gpxsee_classes': set(),
})
g['count'] += 1
if sem.get('name'):
g['named_count'] += 1
if not g['sample_name']:
g['sample_name'] = sem['name']
if not g['semantic']:
g['semantic'] = {k: v for k, v in sem.items() if k != 'name'}
for c in classes:
g['gpxsee_classes'].add(c)
rows = []
for (_, _, _, _), meta in sorted(groups.items(), key=lambda kv: (-kv[1]['count'], kv[0])):
row = dict(meta)
row['gpxsee_classes'] = sorted(row['gpxsee_classes'])
rows.append(row)
return rows
def write_type_summary_csv(rows: List[Dict[str, object]], path: Path) -> None:
fields = [
'geom_type', 'garmin_kind', 'garmin_type', 'garmin_subtype',
'count', 'named_count', 'gpxsee_classes_json', 'semantic_tags_json', 'sample_name'
]
if str(path).lower().endswith('.gz'):
fh = gzip.open(path, 'wt', encoding='utf-8', newline='')
else:
fh = open(path, 'w', encoding='utf-8', newline='')
with fh:
w = csv.DictWriter(fh, fieldnames=fields)
w.writeheader()
for row in rows:
w.writerow({
'geom_type': row['geom_type'],
'garmin_kind': row['garmin_kind'],
'garmin_type': row['garmin_type'],
'garmin_subtype': row['garmin_subtype'],
'count': row['count'],
'named_count': row['named_count'],
'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False),
'semantic_tags_json': json.dumps(row['semantic'], ensure_ascii=False, sort_keys=True),
'sample_name': row['sample_name'],
})
def write_type_summary_json(rows: List[Dict[str, object]], path: Path) -> None:
payload = {'rows': rows}
if str(path).lower().endswith('.gz'):
with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh:
json.dump(payload, fh, ensure_ascii=False)
else:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
def _parse_ele_from_name(name: str) -> Optional[str]:
if not name:
return None
t = name.strip().replace(',', '.')
if not t:
return None
try:
v = float(t)
except ValueError:
return None
if abs(v) < 20000:
if v.is_integer():
return str(int(v))
return str(v)
return None
def semantic_tags_for_feature(f: Feature) -> Dict[str, str]:
kind = f.props.get('garmin_kind', '')
gtype = f.props.get('garmin_type')
subtype = f.props.get('garmin_subtype')
name = f.props.get('name') or ''
sem: Dict[str, str] = {}
if kind in ('polyline', 'extended_polyline'):
if gtype in ('0x20', '0x21', '0x22'):
sem['contour'] = 'elevation'
sem['contour_ext'] = {
'0x20': 'elevation_minor',
'0x21': 'elevation_medium',
'0x22': 'elevation_major',
}[gtype]
ele = _parse_ele_from_name(name)
if ele is not None:
sem['ele'] = ele
elif gtype in LINE_TAGS:
sem.update(LINE_TAGS[gtype])
elif kind == 'extended_polyline':
# Fallback heuristic for common topo extended trail/path style objects.
if gtype in ('0x10e11', '0x10e12', '0x10e13', '0x10e14', '0x10e1c', '0x10e1d', '0x10e1f',
'0x10f12', '0x10f14', '0x10f16'):
sem['highway'] = 'path'
elif kind in ('polygon', 'extended_polygon'):
if gtype in POLYGON_TAGS:
sem.update(POLYGON_TAGS[gtype])
elif kind in ('point', 'indexed_point', 'extended_point'):
key = (gtype, subtype)
if key in POINT_TAGS:
sem.update(POINT_TAGS[key])
elif gtype == '0x66' and subtype == '0x18':
sem['natural'] = 'hill' # heuristic fallback
elif gtype == '0x65' and subtype == '0x00' and name:
sem['place'] = 'locality'
elif gtype == '0x66' and name:
sem['place'] = 'locality'
if name:
sem['name'] = name
return sem
def tags_for_feature(f: Feature, semantic: bool = True) -> Dict[str, str]:
tags: Dict[str, str] = {}
if semantic:
tags.update(semantic_tags_for_feature(f))
kind = f.props.get('garmin_kind')
gtype = f.props.get('garmin_type')
if kind:
tags['garmin:kind'] = str(kind)
if gtype:
tags['garmin:type'] = str(gtype)
if f.props.get('garmin_subtype'):
tags['garmin:subtype'] = str(f.props['garmin_subtype'])
if f.props.get('garmin_is_poi'):
tags['garmin:is_poi'] = 'yes'
return tags
def _is_useful_feature(tags: Dict[str, str]) -> bool:
# Keep only features with at least one semantic tag or a name.
for k in tags:
if not k.startswith('garmin:'):
return True
return 'name' in tags
def _node_key(lon: float, lat: float) -> Tuple[int, int]:
# Quantized key for shared way node reuse.
return (int(round(lon * 1e7)), int(round(lat * 1e7)))
def parse_mapset_features(mapset_name: str, subfiles: Dict[str, bytes]) -> Tuple[List[Feature], Dict[str, object]]:
tre = TRE(subfiles['TRE'])
lbl = LBL(subfiles.get('LBL'))
rgn = RGN(subfiles['RGN'], tre=tre, lbl=lbl)
features = rgn.parse_features()
meta = {
'mapset': mapset_name,
'bounds_wgs84': {
'north': to_deg(tre.north),
'east': to_deg(tre.east),
'south': to_deg(tre.south),
'west': to_deg(tre.west),
},
'feature_count': len(features),
'levels': {lvl: {'bits_per_coord': li.bits_per_coord, 'inherited': li.inherited} for lvl, li in tre.levels.items()},
}
return features, meta
def collect_type_stats(features: Iterable[Feature]) -> Dict[str, object]:
by_kind = Counter()
by_type = Counter()
by_type_sub = Counter()
for f in features:
kind = f.props.get('garmin_kind') or 'unknown'
typ = f.props.get('garmin_type') or 'unknown'
sub = f.props.get('garmin_subtype') or ''
by_kind[kind] += 1
by_type[f'{kind}:{typ}'] += 1
if sub:
by_type_sub[f'{kind}:{typ}:{sub}'] += 1
return {
'by_kind': dict(by_kind.most_common()),
'by_type': dict(by_type.most_common()),
'by_type_subtype': dict(by_type_sub.most_common()),
}
def write_geojson(features: List[Feature], path: Path) -> None:
if str(path).lower().endswith('.gz'):
with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh:
json.dump({
'type': 'FeatureCollection',
'features': [feature_to_geojson(f) for f in features],
}, fh, ensure_ascii=False)
else:
path.write_text(json.dumps({
'type': 'FeatureCollection',
'features': [feature_to_geojson(f) for f in features],
}, ensure_ascii=False, indent=2), encoding='utf-8')
def _serialize_osm_chunk(fh, features: List[Feature], node_id: int, way_id: int, semantic: bool = True) -> Tuple[int, int]:
line_nodes: Dict[Tuple[int, int], int] = {}
plain_nodes: Dict[int, Tuple[float, float]] = {}
point_nodes: List[str] = []
ways: List[Tuple[int, List[int], Dict[str, str]]] = []
def alloc_node(lon: float, lat: float) -> int:
nonlocal node_id
key = _node_key(lon, lat)
if key in line_nodes:
return line_nodes[key]
nid = node_id
node_id -= 1
line_nodes[key] = nid
plain_nodes[nid] = (lon, lat)
return nid
for f in features:
tags = tags_for_feature(f, semantic=semantic)
if not _is_useful_feature(tags):
continue
if f.geom_type == 'Point':
lon, lat = f.coords
nid = node_id
node_id -= 1
node_lines = [f' <node id="{nid}" lat="{lat:.8f}" lon="{lon:.8f}">']
for k, v in tags.items():
node_lines.append(f' <tag k="{_osm_escape(k)}" v="{_osm_escape(v)}"/>')
node_lines.append(' </node>')
point_nodes.append('\n'.join(node_lines))
else:
coords = f.coords if f.geom_type == 'LineString' else f.coords[0]
node_ids = [alloc_node(lon, lat) for lon, lat in coords]
if len(node_ids) < 2:
continue
wid = way_id
way_id -= 1
if f.geom_type == 'Polygon':
tags['area'] = 'yes'
ways.append((wid, node_ids, tags))
for nid in sorted(plain_nodes.keys(), reverse=True):
lon, lat = plain_nodes[nid]
fh.write(f' <node id="{nid}" lat="{lat:.8f}" lon="{lon:.8f}"/>\n')
for chunk in point_nodes:
fh.write(chunk)
fh.write('\n')
for wid, node_ids, tags in ways:
fh.write(f' <way id="{wid}">\n')
for nid in node_ids:
fh.write(f' <nd ref="{nid}"/>\n')
for k, v in tags.items():
fh.write(f' <tag k="{_osm_escape(k)}" v="{_osm_escape(v)}"/>\n')
fh.write(' </way>\n')
return node_id, way_id
def write_osm(features: List[Feature], path: Path, semantic: bool = True) -> None:
with _maybe_open_text(path) as fh:
fh.write('<?xml version="1.0" encoding="UTF-8"?>\n')
fh.write('<osm version="0.6" generator="garmin_img_to_osmand_v2">\n')
_serialize_osm_chunk(fh, features, node_id=-1, way_id=-1, semantic=semantic)
fh.write('</osm>\n')
def write_osm_from_img(img_path: Path, path: Path, mapsets: Optional[List[str]] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
semantic: bool = True) -> Dict[str, object]:
raw = img_path.read_bytes()
container = ImgContainer(raw)
all_sets = _all_mapsets(container.files)
selected = set(s.upper() for s in mapsets) if mapsets else None
total_kind_counter = Counter()
total_features = 0
mapset_meta: List[Dict[str, object]] = []
node_id = -1
way_id = -1
with _maybe_open_text(path) as fh:
fh.write('<?xml version="1.0" encoding="UTF-8"?>\n')
fh.write('<osm version="0.6" generator="garmin_img_to_osmand_v2">\n')
for name, subs in all_sets.items():
if selected and name.upper() not in selected:
continue
feats, meta = parse_mapset_features(name, subs)
if bbox is not None:
feats = [f for f in feats if _intersects_bbox(f, bbox)]
meta['feature_count_after_bbox'] = len(feats)
total_features += len(feats)
for f in feats:
total_kind_counter[f.props.get('garmin_kind') or 'unknown'] += 1
node_id, way_id = _serialize_osm_chunk(fh, feats, node_id=node_id, way_id=way_id, semantic=semantic)
mapset_meta.append(meta)
fh.write('</osm>\n')
return {
'img_file': str(img_path),
'block_size': container.block_size,
'mapset_count': len(all_sets),
'selected_mapsets': mapsets or sorted(all_sets.keys()),
'mapsets': mapset_meta,
'feature_count': total_features,
'kind_counts': dict(total_kind_counter),
}
def load_features_from_img(
img_path: Path,
mapsets: Optional[List[str]] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
) -> Tuple[List[Feature], Dict[str, object]]:
raw = img_path.read_bytes()
container = ImgContainer(raw)
all_sets = _all_mapsets(container.files)
selected = set(s.upper() for s in mapsets) if mapsets else None
features: List[Feature] = []
mapset_meta: List[Dict[str, object]] = []
for name, subs in all_sets.items():
if selected and name.upper() not in selected:
continue
feats, meta = parse_mapset_features(name, subs)
if bbox is not None:
feats = [f for f in feats if _intersects_bbox(f, bbox)]
meta['feature_count_after_bbox'] = len(feats)
features.extend(feats)
mapset_meta.append(meta)
meta = {
'img_file': str(img_path),
'block_size': container.block_size,
'mapset_count': len(all_sets),
'selected_mapsets': mapsets or sorted(all_sets.keys()),
'mapsets': mapset_meta,
'feature_count': len(features),
'type_stats': collect_type_stats(features),
}
return features, meta
def _point_feature(f: Feature) -> bool:
return f.geom_type == 'Point' and (f.props.get('garmin_kind') in ('point', 'indexed_point', 'extended_point'))
def _parse_kv_filters(values: Optional[List[str]]) -> List[Tuple[str, str]]:
out: List[Tuple[str, str]] = []
for v in values or []:
if '=' not in v:
raise SystemExit(f'invalid --filter-tag value {v!r}, expected key=value')
k, val = v.split('=', 1)
out.append((k.strip(), val.strip()))
return out
def _category_match(f: Feature, sem: Dict[str, str], category: Optional[str]) -> bool:
if not category:
return True
c = category.strip().lower()
classes = set(gpxsee_classes_for_feature(f))
if c == 'water_sources':
return sem.get('amenity') == 'drinking_water' or sem.get('natural') == 'spring'
if c == 'peaks':
return sem.get('natural') == 'peak' or 'summit' in classes
if c == 'caves':
return sem.get('natural') == 'cave_entrance'
if c == 'settlements':
return 'place' in sem
if c == 'water_landmarks':
return sem.get('amenity') == 'drinking_water' or sem.get('natural') == 'spring' or sem.get('natural') == 'water' or sem.get('waterway') in ('stream', 'river') or 'water_area' in classes or 'water_line' in classes
if c == 'marine_points':
return 'marine_point' in classes or 'light' in classes or 'buoy' in classes
if c == 'depth_points':
return 'depth_point' in classes
if c == 'lights':
return 'light' in classes
if c == 'buoys':
return 'buoy' in classes
raise SystemExit(f'unknown --category {category!r}; supported: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys')
def _feature_matches(
f: Feature,
*,
point_only: bool = False,
categories: Optional[List[str]] = None,
filter_kind: Optional[List[str]] = None,
filter_type: Optional[List[str]] = None,
filter_subtype: Optional[List[str]] = None,
filter_tags: Optional[List[Tuple[str, str]]] = None,
gpxsee_classes: Optional[List[str]] = None,
named_only: bool = False,
) -> bool:
if point_only and not _point_feature(f):
return False
kind = str(f.props.get('garmin_kind') or '')
gtype = str(f.props.get('garmin_type') or '')
subtype = str(f.props.get('garmin_subtype') or '')
if filter_kind and kind not in set(filter_kind):
return False
if filter_type and gtype not in set(filter_type):
return False
if filter_subtype and subtype not in set(filter_subtype):
return False
sem = semantic_tags_for_feature(f)
if categories:
if not any(_category_match(f, sem, c) for c in categories):
return False
for k, v in (filter_tags or []):
if sem.get(k) != v:
return False
if gpxsee_classes:
classes = set(gpxsee_classes_for_feature(f))
wanted = {c.strip() for c in gpxsee_classes if c.strip()}
if not (classes & wanted):
return False
if named_only and not sem.get('name'):
return False
return True
def _feature_point_row(f: Feature) -> Dict[str, object]:
sem = semantic_tags_for_feature(f)
lon, lat = f.coords
return {
'lon': lon,
'lat': lat,
'name': sem.get('name', ''),
'garmin_kind': f.props.get('garmin_kind', ''),
'garmin_type': f.props.get('garmin_type', ''),
'garmin_subtype': f.props.get('garmin_subtype', ''),
'semantic_tags': sem,
'gpxsee_classes': gpxsee_classes_for_feature(f),
}
def write_landmarks_csv(features: List[Feature], path: Path) -> None:
fields = ['lon', 'lat', 'name', 'garmin_kind', 'garmin_type', 'garmin_subtype', 'gpxsee_classes_json', 'semantic_tags_json']
if str(path).lower().endswith('.gz'):
fh = gzip.open(path, 'wt', encoding='utf-8', newline='')
else:
fh = open(path, 'w', encoding='utf-8', newline='')
with fh:
w = csv.DictWriter(fh, fieldnames=fields)
w.writeheader()
for f in features:
row = _feature_point_row(f)
w.writerow({
'lon': f'{row["lon"]:.8f}',
'lat': f'{row["lat"]:.8f}',
'name': row['name'],
'garmin_kind': row['garmin_kind'],
'garmin_type': row['garmin_type'],
'garmin_subtype': row['garmin_subtype'],
'gpxsee_classes_json': json.dumps(row['gpxsee_classes'], ensure_ascii=False),
'semantic_tags_json': json.dumps(row['semantic_tags'], ensure_ascii=False, sort_keys=True),
})
def write_landmarks_geojson(features: List[Feature], path: Path) -> None:
fc = {
'type': 'FeatureCollection',
'features': [],
}
for f in features:
row = _feature_point_row(f)
props = {
'name': row['name'],
'garmin_kind': row['garmin_kind'],
'garmin_type': row['garmin_type'],
'garmin_subtype': row['garmin_subtype'],
'gpxsee_classes': ','.join(row['gpxsee_classes']),
}
props.update(row['semantic_tags'])
fc['features'].append({
'type': 'Feature',
'geometry': {'type': 'Point', 'coordinates': [row['lon'], row['lat']]},
'properties': props,
})
if str(path).lower().endswith('.gz'):
with gzip.open(path, 'wt', encoding='utf-8', newline='\n') as fh:
json.dump(fc, fh, ensure_ascii=False)
else:
path.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding='utf-8')
def print_feature_type_table(features: List[Feature], point_only: bool = False) -> None:
rows = _feature_type_rows(features, point_only=point_only)
print('geom_type garmin_kind garmin_type garmin_subtype count named_count gpxsee_classes semantic_tags sample_name')
for row in rows:
print('\t'.join([
row['geom_type'],
row['garmin_kind'],
row['garmin_type'],
row['garmin_subtype'],
str(row['count']),
str(row['named_count']),
json.dumps(row['gpxsee_classes'], ensure_ascii=False),
json.dumps(row['semantic'], ensure_ascii=False, sort_keys=True),
str(row['sample_name']),
]))
def main() -> int:
ap = argparse.ArgumentParser(description='Extract vector features from a Garmin IMG and export GeoJSON / OSM XML suitable for further conversion to OsmAnd .obf.')
ap.add_argument('img', type=Path, help='Input Garmin .img file')
ap.add_argument('--geojson', type=Path, help='Write GeoJSON or .geojson.gz output')
ap.add_argument('--osm', type=Path, help='Write OSM XML or .osm.gz output')
ap.add_argument('--meta-json', type=Path, help='Write parse metadata JSON')
ap.add_argument('--mapset', action='append', help='Process only this TRE/RGN family id (repeatable), e.g. 02234008')
ap.add_argument('--bbox', help='Clip by WGS84 bbox: west,south,east,north')
ap.add_argument('--list-mapsets', action='store_true', help='List available mapsets and exit')
ap.add_argument('--list-feature-types', action='store_true', help='List unique parsed Garmin feature types with counts')
ap.add_argument('--list-landmark-types', action='store_true', help='List unique point landmark types with counts')
ap.add_argument('--landmark-types-csv', type=Path, help='Export landmark type summary table to CSV or CSV.GZ')
ap.add_argument('--landmark-types-json', type=Path, help='Export landmark type summary table to JSON or JSON.GZ')
ap.add_argument('--landmarks-csv', type=Path, help='Export exact-coordinate point landmarks to CSV or CSV.GZ')
ap.add_argument('--landmarks-geojson', type=Path, help='Export exact-coordinate point landmarks to GeoJSON or GeoJSON.GZ')
ap.add_argument('--category', action='append', help='Filter landmarks/features by semantic category: water_sources, peaks, caves, settlements, water_landmarks, marine_points, depth_points, lights, buoys')
ap.add_argument('--filter-kind', action='append', help='Filter by garmin kind, e.g. point, indexed_point, extended_point, polyline')
ap.add_argument('--filter-type', action='append', help='Filter by Garmin type hex string, e.g. 0x64')
ap.add_argument('--filter-subtype', action='append', help='Filter by Garmin subtype hex string, e.g. 0x14')
ap.add_argument('--filter-tag', action='append', help='Filter by semantic tag key=value, e.g. natural=spring')
ap.add_argument('--gpxsee-class', action='append', help='Filter by GPXSee-style class predicate, e.g. water_line, water_area, summit, marine_point, buoy, light')
ap.add_argument('--named-only', action='store_true', help='Keep only features with a decoded name')
ap.add_argument('--raw-only', action='store_true', help='Do not add semantic OSM tags; only preserve raw garmin:* tags')
args = ap.parse_args()
if args.list_mapsets:
container = ImgContainer(args.img.read_bytes())
for name, subs in _all_mapsets(container.files).items():
tre = TRE(subs['TRE'])
print(f'{name}\t{to_deg(tre.west):.6f},{to_deg(tre.south):.6f},{to_deg(tre.east):.6f},{to_deg(tre.north):.6f}')
return 0
if not args.geojson and not args.osm and not args.meta_json and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson:
ap.error('provide at least one export/list option or use --list-mapsets')
bbox = _parse_bbox(args.bbox)
filter_tags = _parse_kv_filters(args.filter_tag)
# Fast streaming OSM path when no feature post-filtering is requested.
if args.osm and not args.geojson and not args.list_feature_types and not args.list_landmark_types and not args.landmark_types_csv and not args.landmark_types_json and not args.landmarks_csv and not args.landmarks_geojson and not args.category and not args.filter_kind and not args.filter_type and not args.filter_subtype and not args.filter_tag and not args.gpxsee_class and not args.named_only:
meta = write_osm_from_img(args.img, args.osm, mapsets=args.mapset, bbox=bbox, semantic=not args.raw_only)
info(f'parsed {meta.get("feature_count", 0)} features from {len(meta.get("mapsets", []))} mapsets')
info(f'wrote OSM XML: {args.osm}')
if args.meta_json:
args.meta_json.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding='utf-8')
info(f'wrote metadata: {args.meta_json}')
return 0
features, meta = load_features_from_img(args.img, mapsets=args.mapset, bbox=bbox)
info(f'parsed {len(features)} features from {len(meta.get("mapsets", []))} mapsets')
filtered = [
f for f in features
if _feature_matches(
f,
point_only=bool(args.landmarks_csv or args.landmarks_geojson or args.list_landmark_types),
categories=args.category,
filter_kind=args.filter_kind,
filter_type=args.filter_type,
filter_subtype=args.filter_subtype,
filter_tags=filter_tags,
gpxsee_classes=args.gpxsee_class,
named_only=args.named_only,
)
]
if args.list_feature_types:
print_feature_type_table(filtered, point_only=False)
if args.list_landmark_types:
print_feature_type_table(filtered, point_only=True)
if args.landmark_types_csv or args.landmark_types_json:
rows = _feature_type_rows(filtered, point_only=True)
if args.landmark_types_csv:
write_type_summary_csv(rows, args.landmark_types_csv)
info(f'wrote landmark type CSV: {args.landmark_types_csv}')
if args.landmark_types_json:
write_type_summary_json(rows, args.landmark_types_json)
info(f'wrote landmark type JSON: {args.landmark_types_json}')
if args.landmarks_csv:
point_features = [f for f in filtered if _point_feature(f)]
write_landmarks_csv(point_features, args.landmarks_csv)
info(f'wrote landmark CSV: {args.landmarks_csv}')
if args.landmarks_geojson:
point_features = [f for f in filtered if _point_feature(f)]
write_landmarks_geojson(point_features, args.landmarks_geojson)
info(f'wrote landmark GeoJSON: {args.landmarks_geojson}')
if args.geojson:
write_geojson(filtered, args.geojson)
info(f'wrote GeoJSON: {args.geojson}')
if args.osm:
write_osm(filtered, args.osm, semantic=not args.raw_only)
info(f'wrote OSM XML: {args.osm}')
if args.meta_json:
meta2 = dict(meta)
meta2['feature_count_after_filters'] = len(filtered)
args.meta_json.write_text(json.dumps(meta2, ensure_ascii=False, indent=2), encoding='utf-8')
info(f'wrote metadata: {args.meta_json}')
return 0
if __name__ == '__main__':
raise SystemExit(main())