1119 lines
43 KiB
Python
1119 lines
43 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Prototype Garmin IMG vector extractor -> GeoJSON / OSM XML.
|
|
|
|
What it does well:
|
|
- Reads classic Garmin IMG container FAT and extracts subfiles.
|
|
- Supports classic top-level TRE/RGN/LBL maps and many GMP/NT-style maps where
|
|
TRE/RGN/LBL offsets are stored inside the .GMP container.
|
|
- Parses TRE levels/subdivisions.
|
|
- Parses LBL labels (coding 6, 9, 10) with common codepage handling.
|
|
- Parses standard points, extended points, standard polylines/polygons, and
|
|
extended polylines/polygons from RGN.
|
|
- Exports GeoJSON and/or OSM XML.
|
|
|
|
What it does NOT promise:
|
|
- Full Garmin NT routing/address semantics.
|
|
- Locked/compressed/vendor-obfuscated maps.
|
|
- Perfect type-to-OSM semantic translation. The exporter preserves Garmin type
|
|
codes as tags instead of inventing OSM semantics.
|
|
|
|
This is a practical reverse-engineering tool, not a complete implementation of
|
|
all Garmin IMG variants.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import io
|
|
import json
|
|
import math
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
|
|
from xml.sax.saxutils import escape as xml_escape
|
|
|
|
# -------------------------
|
|
# Low-level helpers
|
|
# -------------------------
|
|
|
|
COORD_FACTOR = 360.0 / (1 << 24)
|
|
FAT_BLOCK_SIZE = 0x200
|
|
FAT_ENTRY_SIZE = 0x200
|
|
MAX_FAT_BLOCKLIST = 240
|
|
SEG_POINT = 0
|
|
SEG_IPOINT = 1
|
|
SEG_POLYLINE = 2
|
|
SEG_POLYGON = 3
|
|
SEG_EXTPOLYGON = 4
|
|
SEG_EXTPOLYLINE = 5
|
|
SEG_EXTPOINT = 6
|
|
|
|
OBJ_POINT = 0x10
|
|
OBJ_INDEXED_POINT = 0x20
|
|
OBJ_POLYLINE = 0x40
|
|
OBJ_POLYGON = 0x80
|
|
OBJ_EXT_POLYGON = 0x100
|
|
OBJ_EXT_POLYLINE = 0x200
|
|
OBJ_EXT_POINT = 0x400
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
print(f"[warn] {msg}", file=sys.stderr)
|
|
|
|
|
|
def info(msg: str) -> None:
|
|
print(f"[info] {msg}", file=sys.stderr)
|
|
|
|
|
|
def read_u16le(buf: bytes, off: int) -> int:
|
|
return int.from_bytes(buf[off:off + 2], "little", signed=False)
|
|
|
|
|
|
def read_s16le(buf: bytes, off: int) -> int:
|
|
return int.from_bytes(buf[off:off + 2], "little", signed=True)
|
|
|
|
|
|
def read_u24le(buf: bytes, off: int) -> int:
|
|
return int.from_bytes(buf[off:off + 3], "little", signed=False)
|
|
|
|
|
|
def read_s24le(buf: bytes, off: int) -> int:
|
|
raw = read_u24le(buf, off)
|
|
if raw & 0x800000:
|
|
raw -= 1 << 24
|
|
return raw
|
|
|
|
|
|
def read_u32le(buf: bytes, off: int) -> int:
|
|
return int.from_bytes(buf[off:off + 4], "little", signed=False)
|
|
|
|
|
|
def to_deg(coord: int) -> float:
|
|
return coord * COORD_FACTOR
|
|
|
|
|
|
def decode_ascii_z(data: bytes) -> str:
|
|
return data.split(b"\x00", 1)[0].decode("ascii", errors="replace").strip()
|
|
|
|
|
|
# -------------------------
|
|
# Container extraction
|
|
# -------------------------
|
|
|
|
@dataclass
|
|
class FatRecord:
|
|
filename: str
|
|
ext: str
|
|
size: int
|
|
blocks: List[int]
|
|
offset_in_fat: int
|
|
|
|
|
|
class ImgContainer:
|
|
def __init__(self, raw: bytes):
|
|
self.raw = raw
|
|
# Some IMG files are XOR'd by a single byte stored at byte 0.
|
|
xor_byte = raw[0]
|
|
if xor_byte not in (0x00,):
|
|
maybe = bytes(b ^ xor_byte for b in raw)
|
|
sig = maybe[0x10:0x17]
|
|
ident = maybe[0x41:0x48]
|
|
if sig.startswith(b"DSKIMG") or ident.startswith(b"GARMIN"):
|
|
info(f"applied XOR decode with byte 0x{xor_byte:02x}")
|
|
self.raw = maybe
|
|
self.block_size = self._read_block_size()
|
|
self.fat_start = self._read_fat_start()
|
|
self.files = self._extract_subfiles()
|
|
|
|
def _read_block_size(self) -> int:
|
|
e1 = self.raw[0x61]
|
|
e2 = self.raw[0x62]
|
|
return 1 << (e1 + e2)
|
|
|
|
def _read_fat_start(self) -> int:
|
|
fat_phys_block = self.raw[0x40]
|
|
return fat_phys_block * FAT_BLOCK_SIZE + FAT_BLOCK_SIZE
|
|
|
|
def _parse_fat_chain(self) -> List[FatRecord]:
|
|
records: List[FatRecord] = []
|
|
off = self.fat_start
|
|
seen_offsets = set()
|
|
while off + FAT_ENTRY_SIZE <= len(self.raw):
|
|
if off in seen_offsets:
|
|
break
|
|
seen_offsets.add(off)
|
|
first = self.raw[off]
|
|
if first != 0x01:
|
|
break
|
|
name = self.raw[off + 1:off + 9].decode("ascii", errors="replace").rstrip(" \x00")
|
|
ext = self.raw[off + 9:off + 12].decode("ascii", errors="replace").rstrip(" \x00")
|
|
size = read_u32le(self.raw, off + 12)
|
|
next_fat = read_u16le(self.raw, off + 16)
|
|
blocks = []
|
|
boff = off + 0x20
|
|
for i in range(MAX_FAT_BLOCKLIST):
|
|
blk = read_u16le(self.raw, boff + i * 2)
|
|
if blk == 0xFFFF:
|
|
break
|
|
blocks.append(blk)
|
|
if next_fat == 0:
|
|
records.append(FatRecord(name, ext, size, blocks, off))
|
|
off += FAT_ENTRY_SIZE
|
|
return records
|
|
|
|
def _collect_blocks(self, start_record: FatRecord) -> bytes:
|
|
data = bytearray()
|
|
blocks = list(start_record.blocks)
|
|
current_offset = start_record.offset_in_fat
|
|
# Follow FAT continuation blocks when next_fat is used.
|
|
while True:
|
|
next_fat = read_u16le(self.raw, current_offset + 16)
|
|
if next_fat == 0:
|
|
break
|
|
current_offset += FAT_ENTRY_SIZE
|
|
if current_offset + FAT_ENTRY_SIZE > len(self.raw):
|
|
break
|
|
boff = current_offset + 0x20
|
|
for i in range(MAX_FAT_BLOCKLIST):
|
|
blk = read_u16le(self.raw, boff + i * 2)
|
|
if blk == 0xFFFF:
|
|
break
|
|
blocks.append(blk)
|
|
for blk in blocks:
|
|
start = blk * self.block_size
|
|
end = start + self.block_size
|
|
if end > len(self.raw):
|
|
break
|
|
data.extend(self.raw[start:end])
|
|
return bytes(data[:start_record.size])
|
|
|
|
def _extract_subfiles(self) -> Dict[str, bytes]:
|
|
out: Dict[str, bytes] = {}
|
|
for rec in self._parse_fat_chain():
|
|
key = f"{rec.filename}.{rec.ext}".upper()
|
|
out[key] = self._collect_blocks(rec)
|
|
return out
|
|
|
|
|
|
# -------------------------
|
|
# Core format structures
|
|
# -------------------------
|
|
|
|
@dataclass
|
|
class LevelInfo:
|
|
level: int
|
|
bits_per_coord: int
|
|
inherited: bool
|
|
present: bool = True
|
|
|
|
|
|
@dataclass
|
|
class Subdivision:
|
|
index: int
|
|
level: int
|
|
data_offset: int
|
|
object_types: int
|
|
lon_center: int
|
|
lat_center: int
|
|
width: int
|
|
height: int
|
|
index_next_level: int = 0
|
|
last: bool = False
|
|
data_end: int = 0
|
|
data_ext_polygon_offset: int = 0
|
|
data_ext_polygon_end: int = 0
|
|
data_ext_polyline_offset: int = 0
|
|
data_ext_polyline_end: int = 0
|
|
data_ext_poi_offset: int = 0
|
|
data_ext_poi_end: int = 0
|
|
children: List["Subdivision"] = field(default_factory=list)
|
|
|
|
def nb_object_types(self) -> int:
|
|
count = 0
|
|
cur = 0x10
|
|
for _ in range(4):
|
|
if self.object_types & cur:
|
|
count += 1
|
|
cur <<= 1
|
|
return count
|
|
|
|
|
|
@dataclass
|
|
class Feature:
|
|
geom_type: str # Point | LineString | Polygon
|
|
coords: object
|
|
props: Dict[str, object]
|
|
|
|
|
|
# -------------------------
|
|
# LBL parser
|
|
# -------------------------
|
|
|
|
class LBL:
|
|
NORMAL_CHARS = [' ', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R',
|
|
'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5',
|
|
'6', '7', '8', '9', '~', '~', '~', '~', '~', '~']
|
|
SYMBOL_CHARS = ['@', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '~', '~', '~',
|
|
'~', '~', '~', '~', '~', '~', '~', ':', ';', '<', '=', '>', '?', '~', '~', '~', '~', '~', '~',
|
|
'~', '~', '~', '~', '~', '[', '\\', ']', '^', '_']
|
|
SPECIAL_CHARS = ['`', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
|
|
's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '~', '~', '~', '~', '~', '0', '1', '2', '3', '4', '5',
|
|
'6', '7', '8', '9', '~', '~', '~', '~', '~', '~']
|
|
|
|
def __init__(self, data: Optional[bytes]):
|
|
self.data = data or b""
|
|
self.ok = bool(data)
|
|
self.data_offset = 0
|
|
self.data_length = 0
|
|
self.data_offset_multiplier = 1
|
|
self.label_coding = 6
|
|
self.codepage = 1252
|
|
if self.ok:
|
|
self._parse_header()
|
|
|
|
def _parse_header(self) -> None:
|
|
header_length = read_u16le(self.data, 0)
|
|
self.data_offset = read_u32le(self.data, 0x15)
|
|
self.data_length = read_u32le(self.data, 0x19)
|
|
self.data_offset_multiplier = 1 << self.data[0x1D]
|
|
self.label_coding = self.data[0x1E]
|
|
if len(self.data) >= 0xAC:
|
|
self.codepage = read_u16le(self.data, 0xAA)
|
|
|
|
def get_label(self, offset: int) -> str:
|
|
if not self.ok or offset == 0:
|
|
return ""
|
|
actual = self.data_offset + offset * self.data_offset_multiplier
|
|
if actual < 0 or actual >= len(self.data):
|
|
return ""
|
|
if self.label_coding == 6:
|
|
return self._get_label6(actual)
|
|
return self._get_label8_10(actual)
|
|
|
|
def _get_label8_10(self, off: int) -> str:
|
|
end = off
|
|
while end < len(self.data) and self.data[end] != 0:
|
|
end += 1
|
|
raw = self.data[off:end]
|
|
enc = None
|
|
cp = self.codepage
|
|
if cp in (0, 850):
|
|
enc = "cp1252"
|
|
elif cp == 65001:
|
|
enc = "utf-8"
|
|
elif cp == 932:
|
|
enc = "cp932"
|
|
elif cp == 950:
|
|
enc = "big5"
|
|
else:
|
|
enc = f"cp{cp}"
|
|
try:
|
|
return raw.decode(enc, errors="replace")
|
|
except Exception:
|
|
return raw.decode("latin1", errors="replace")
|
|
|
|
def _get_label6(self, off: int) -> str:
|
|
out: List[str] = []
|
|
charset = "NORMAL"
|
|
pos = off
|
|
while pos + 3 <= len(self.data):
|
|
b1, b2, b3 = self.data[pos], self.data[pos + 1], self.data[pos + 2]
|
|
pos += 3
|
|
codes = [
|
|
b1 >> 2,
|
|
((b1 & 0x3) << 4) | (b2 >> 4),
|
|
((b2 & 0xF) << 2) | (b3 >> 6),
|
|
b3 & 0x3F,
|
|
]
|
|
for c in codes:
|
|
if c > 0x2F:
|
|
return "".join(out).strip()
|
|
if charset == "NORMAL":
|
|
if c == 0x1C:
|
|
charset = "SYMBOL"
|
|
elif c == 0x1B:
|
|
charset = "SPECIAL"
|
|
elif c == 0x1D:
|
|
out.append("|")
|
|
elif c in (0x1E, 0x1F):
|
|
out.append(" ")
|
|
else:
|
|
out.append(self.NORMAL_CHARS[c])
|
|
elif charset == "SYMBOL":
|
|
out.append(self.SYMBOL_CHARS[c])
|
|
charset = "NORMAL"
|
|
else:
|
|
out.append(self.SPECIAL_CHARS[c])
|
|
charset = "NORMAL"
|
|
return "".join(out).strip()
|
|
|
|
|
|
# -------------------------
|
|
# TRE parser
|
|
# -------------------------
|
|
|
|
class TRE:
|
|
def __init__(self, data: bytes):
|
|
self.data = data
|
|
self.header_length = read_u16le(data, 0)
|
|
self.north = read_s24le(data, 0x15)
|
|
self.east = read_s24le(data, 0x18)
|
|
self.south = read_s24le(data, 0x1B)
|
|
self.west = read_s24le(data, 0x1E)
|
|
self.levels: Dict[int, LevelInfo] = {}
|
|
self.max_level = 0
|
|
self.min_level = 15
|
|
self.extended_types = False
|
|
self.extended_types_offset = 0
|
|
self.extended_types_length = 0
|
|
self.extended_types_size = 0
|
|
self.extended_types_number = 0
|
|
self.decalaje_extended_types = 0
|
|
self.subdivisions_count = 1
|
|
self.root_subdivisions: List[Subdivision] = []
|
|
self.subdivisions_by_index: Dict[int, Subdivision] = {}
|
|
self._parse()
|
|
|
|
def _parse(self) -> None:
|
|
self._parse_levels()
|
|
self._parse_tre7()
|
|
self._parse_subdivisions()
|
|
|
|
def _parse_tre7(self) -> None:
|
|
if self.header_length >= 0x7C + 10:
|
|
self.extended_types_offset = read_u32le(self.data, 0x7C)
|
|
self.extended_types_length = read_u32le(self.data, 0x80)
|
|
self.extended_types_size = read_u16le(self.data, 0x84)
|
|
if self.extended_types_size > 0:
|
|
self.extended_types_number = self.extended_types_length // self.extended_types_size
|
|
self.extended_types = self.extended_types_length > 0
|
|
self.decalaje_extended_types = self.subdivisions_count - self.extended_types_number
|
|
|
|
def _parse_levels(self) -> None:
|
|
levels_offset = read_u32le(self.data, 0x21)
|
|
levels_length = read_u32le(self.data, 0x25)
|
|
pos = levels_offset
|
|
end = levels_offset + levels_length
|
|
while pos + 4 <= end and pos + 4 <= len(self.data):
|
|
zoom = self.data[pos]
|
|
bits = self.data[pos + 1]
|
|
count = read_u16le(self.data, pos + 2)
|
|
_ = count
|
|
level = zoom & 0xF
|
|
inherited = bool(zoom & 0x80)
|
|
self.levels[level] = LevelInfo(level=level, bits_per_coord=bits, inherited=inherited)
|
|
self.max_level = max(self.max_level, level)
|
|
self.min_level = min(self.min_level, level)
|
|
self.subdivisions_count += count
|
|
pos += 4
|
|
|
|
def get_resolution(self, level: int) -> int:
|
|
return self.levels[level].bits_per_coord
|
|
|
|
def convert_map_units(self, level: int, value: int, additional_accuracy: int) -> int:
|
|
shift = 24 - self.get_resolution(level) - additional_accuracy
|
|
if shift >= 0:
|
|
return value << shift
|
|
return value >> (-shift)
|
|
|
|
def _parse_subdiv_record(self, pos: int, level: int, record_size: int, index: int) -> Tuple[Subdivision, int]:
|
|
data_offset = read_u24le(self.data, pos)
|
|
object_types = self.data[pos + 3]
|
|
if object_types & 0x0F:
|
|
data_offset += (object_types & 0x0F) * (1 << 24)
|
|
lon_center = read_s24le(self.data, pos + 4)
|
|
lat_center = read_s24le(self.data, pos + 7)
|
|
width = read_u16le(self.data, pos + 10)
|
|
last = False
|
|
if width & 0x8000:
|
|
width &= 0x7FFF
|
|
last = True
|
|
height = read_u16le(self.data, pos + 12)
|
|
index_next = read_u16le(self.data, pos + 14) if record_size >= 16 else 0
|
|
sub = Subdivision(index=index, level=level, data_offset=data_offset, object_types=object_types,
|
|
lon_center=lon_center, lat_center=lat_center, width=width, height=height,
|
|
index_next_level=index_next, last=last)
|
|
# Extended offsets per subdivision, if present.
|
|
if self.extended_types:
|
|
indice = index - self.decalaje_extended_types
|
|
if indice > 0 and self.extended_types_size >= 8:
|
|
p = self.extended_types_offset + (indice - 1) * self.extended_types_size
|
|
if p + self.extended_types_size <= len(self.data):
|
|
sub.data_ext_polygon_offset = read_u32le(self.data, p)
|
|
if self.extended_types_size >= 8:
|
|
sub.data_ext_polyline_offset = read_u32le(self.data, p + 4)
|
|
if self.extended_types_size >= 12:
|
|
sub.data_ext_poi_offset = read_u32le(self.data, p + 8)
|
|
return sub, pos + record_size
|
|
|
|
def _parse_subdivisions(self) -> None:
|
|
sub_offset = read_u32le(self.data, 0x29)
|
|
sub_length = read_u32le(self.data, 0x2D)
|
|
end = sub_offset + sub_length
|
|
if end > len(self.data):
|
|
end = len(self.data)
|
|
|
|
present_levels = sorted(self.levels.keys(), reverse=True)
|
|
if not present_levels:
|
|
return
|
|
current_root_level = present_levels[0]
|
|
index = 1
|
|
pos = sub_offset
|
|
roots: List[Subdivision] = []
|
|
# Parse all 16-byte records first until last root.
|
|
while pos + 16 <= end:
|
|
sub, pos = self._parse_subdiv_record(pos, current_root_level, 16, index)
|
|
roots.append(sub)
|
|
self.subdivisions_by_index[index] = sub
|
|
index += 1
|
|
if sub.last:
|
|
break
|
|
self.root_subdivisions = roots
|
|
# Recursively parse children using the index_next_level scheme.
|
|
self._parse_children(self.root_subdivisions, current_root_level - 1, sub_offset, end, index)
|
|
# Compute data ends by sorted data offsets.
|
|
ordered = sorted(self.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index))
|
|
for i, sub in enumerate(ordered):
|
|
if i + 1 < len(ordered):
|
|
sub.data_end = ordered[i + 1].data_offset
|
|
else:
|
|
sub.data_end = 0
|
|
# Extended segment ends.
|
|
for attr_start, attr_end in [
|
|
("data_ext_polygon_offset", "data_ext_polygon_end"),
|
|
("data_ext_polyline_offset", "data_ext_polyline_end"),
|
|
("data_ext_poi_offset", "data_ext_poi_end"),
|
|
]:
|
|
items = sorted((s for s in self.subdivisions_by_index.values() if getattr(s, attr_start, 0)),
|
|
key=lambda s: getattr(s, attr_start))
|
|
for i, sub in enumerate(items):
|
|
if i + 1 < len(items):
|
|
setattr(sub, attr_end, getattr(items[i + 1], attr_start))
|
|
else:
|
|
setattr(sub, attr_end, 0)
|
|
|
|
def _next_present_level(self, level: int) -> int:
|
|
while level > 0 and level not in self.levels:
|
|
level -= 1
|
|
return level
|
|
|
|
def _parse_children(self, parents: List[Subdivision], level: int, sub_offset: int, end: int, next_index_hint: int) -> None:
|
|
level = self._next_present_level(level)
|
|
if level <= 0:
|
|
return
|
|
for parent in parents:
|
|
if parent.index_next_level <= 0:
|
|
continue
|
|
idx = parent.index_next_level
|
|
if idx <= 0:
|
|
continue
|
|
# Heuristic matching JGarminImgParser: 16-byte records for non-leaf levels, 14-byte for last level.
|
|
record_size = 14 if level == self.min_level else 16
|
|
pos = sub_offset + (idx - 1) * 16 if record_size == 16 else sub_offset + (idx - 1) * 14
|
|
# Fallback for mixed record layout: compute small-record area start after all 16-byte records already parsed.
|
|
if record_size == 14 and pos + 14 > end:
|
|
pos = min(end, sub_offset + len(self.root_subdivisions) * 16)
|
|
children: List[Subdivision] = []
|
|
while pos + record_size <= end:
|
|
try:
|
|
sub, pos = self._parse_subdiv_record(pos, level, record_size, idx)
|
|
except Exception:
|
|
break
|
|
children.append(sub)
|
|
self.subdivisions_by_index[idx] = sub
|
|
idx += 1
|
|
if sub.last:
|
|
break
|
|
parent.children = children
|
|
child_level = self._next_present_level(level - 1)
|
|
if child_level > 0 and children:
|
|
self._parse_children(children, child_level, sub_offset, end, idx)
|
|
|
|
|
|
# -------------------------
|
|
# RGN parser
|
|
# -------------------------
|
|
|
|
class BitStreamReader:
|
|
def __init__(self, data: bytes, start: int, length_bytes: int):
|
|
self.data = data
|
|
self.pos = start
|
|
self.remaining_bytes = length_bytes
|
|
self.remaining_bits = 0
|
|
self.cur_byte = 0
|
|
|
|
def has_next(self, nbits: int) -> bool:
|
|
return self.remaining_bytes * 8 + self.remaining_bits >= nbits
|
|
|
|
def finish(self) -> int:
|
|
self.pos += self.remaining_bytes
|
|
self.remaining_bytes = 0
|
|
self.remaining_bits = 0
|
|
return self.pos
|
|
|
|
def _get_if_needed(self) -> None:
|
|
if self.remaining_bits == 0:
|
|
if self.remaining_bytes <= 0:
|
|
raise EOFError
|
|
self.cur_byte = self.data[self.pos]
|
|
self.pos += 1
|
|
self.remaining_bytes -= 1
|
|
self.remaining_bits = 8
|
|
|
|
def read_next_bits(self, to_get: int) -> int:
|
|
cur_pos = 0
|
|
result = 0
|
|
while cur_pos < to_get:
|
|
self._get_if_needed()
|
|
remaining_to_get = to_get - cur_pos
|
|
if remaining_to_get >= self.remaining_bits:
|
|
result |= self.cur_byte << cur_pos
|
|
cur_pos += self.remaining_bits
|
|
self.remaining_bits = 0
|
|
else:
|
|
mask = (1 << remaining_to_get) - 1
|
|
result |= (self.cur_byte & mask) << cur_pos
|
|
self.cur_byte >>= remaining_to_get
|
|
self.remaining_bits -= remaining_to_get
|
|
return result
|
|
return result
|
|
|
|
def read_coord_offset(self, nbits: int, sign: int, extra_bit: int) -> int:
|
|
if sign == 0:
|
|
value = self.read_next_bits(nbits)
|
|
sign_mask = 1 << (nbits - 1)
|
|
if value & sign_mask:
|
|
comp = value ^ sign_mask
|
|
if extra_bit == 0:
|
|
if comp != 0:
|
|
return comp - sign_mask
|
|
other = self.read_coord_offset(nbits, sign, extra_bit)
|
|
if other < 0:
|
|
return 1 - value + other
|
|
return value - 1 + other
|
|
else:
|
|
if comp & 0xFFFFFE:
|
|
return (comp & 0xFFFFFE) - sign_mask
|
|
other = self.read_coord_offset(nbits - 1, sign, 0)
|
|
if other < 0:
|
|
return 1 - sign_mask + 1 + (other << 1)
|
|
return sign_mask - 1 - 1 + (other << 1)
|
|
else:
|
|
if extra_bit > 0:
|
|
return value & 0xFFFFFE
|
|
return value
|
|
else:
|
|
value = self.read_next_bits(nbits)
|
|
if extra_bit > 0:
|
|
return (((value >> 1) * sign) << 1)
|
|
return value * sign
|
|
|
|
|
|
class RGN:
|
|
def __init__(self, data: bytes, tre: TRE, lbl: Optional[LBL]):
|
|
self.data = data
|
|
self.tre = tre
|
|
self.lbl = lbl or LBL(None)
|
|
self.header_length = read_u16le(data, 0)
|
|
self.data_offset = read_u32le(data, 0x15) if len(data) >= 0x1D else 0
|
|
self.data_length = read_u32le(data, 0x19) if len(data) >= 0x1D else 0
|
|
self.ext_poly_offset = read_u32le(data, 0x1D) if len(data) >= 0x25 else 0
|
|
self.ext_poly_length = read_u32le(data, 0x21) if len(data) >= 0x25 else 0
|
|
self.ext_line_offset = read_u32le(data, 0x39) if len(data) >= 0x41 else 0
|
|
self.ext_line_length = read_u32le(data, 0x3D) if len(data) >= 0x41 else 0
|
|
self.ext_poi_offset = read_u32le(data, 0x55) if len(data) >= 0x5D else 0
|
|
self.ext_poi_length = read_u32le(data, 0x59) if len(data) >= 0x5D else 0
|
|
|
|
def data_end(self) -> int:
|
|
return self.data_length
|
|
|
|
def ext_polygon_end(self) -> int:
|
|
return self.ext_poly_length
|
|
|
|
def ext_polyline_end(self) -> int:
|
|
return self.ext_line_length
|
|
|
|
def ext_poi_end(self) -> int:
|
|
return self.ext_poi_length
|
|
|
|
@staticmethod
|
|
def _convert_coord_length(i: int, sign: int, extra_bit: int) -> int:
|
|
add = 0
|
|
if sign == 0:
|
|
add += 1
|
|
add += extra_bit
|
|
if i <= 9:
|
|
return i + 2 + add
|
|
return 2 * i - 9 + 2 + add
|
|
|
|
def _subdiv_lon(self, sub: Subdivision, delta: int, add_acc: int) -> int:
|
|
return sub.lon_center + self.tre.convert_map_units(sub.level, delta, add_acc)
|
|
|
|
def _subdiv_lat(self, sub: Subdivision, delta: int, add_acc: int) -> int:
|
|
return sub.lat_center + self.tre.convert_map_units(sub.level, delta, add_acc)
|
|
|
|
def _segments(self, sub: Subdivision) -> List[Optional[Tuple[int, int]]]:
|
|
result: List[Optional[Tuple[int, int]]] = [None] * 7
|
|
offset = sub.data_offset + self.data_offset
|
|
end = (sub.data_end if sub.data_end else self.data_length) + self.data_offset
|
|
if sub.object_types == 0:
|
|
return result
|
|
if sub.data_end and end > len(self.data):
|
|
end = len(self.data)
|
|
if sub.data_end and end > offset and sub.nb_object_types() > 0:
|
|
if sub.object_types & OBJ_POINT:
|
|
result[SEG_POINT] = (0, 0)
|
|
if sub.object_types & OBJ_INDEXED_POINT:
|
|
result[SEG_IPOINT] = (0, 0)
|
|
if sub.object_types & OBJ_POLYLINE:
|
|
result[SEG_POLYLINE] = (0, 0)
|
|
if sub.object_types & OBJ_POLYGON:
|
|
result[SEG_POLYGON] = (0, 0)
|
|
order = [SEG_POINT, SEG_IPOINT, SEG_POLYLINE, SEG_POLYGON]
|
|
nb_pointers = sub.nb_object_types() - 1
|
|
if offset + nb_pointers * 2 <= len(self.data):
|
|
segment_start = offset + nb_pointers * 2
|
|
cur_idx = 0
|
|
p = offset
|
|
for _ in range(nb_pointers):
|
|
while cur_idx < 4 and result[order[cur_idx]] is None:
|
|
cur_idx += 1
|
|
if cur_idx >= 4:
|
|
break
|
|
segment_end = read_u16le(self.data, p) + offset
|
|
p += 2
|
|
if segment_end > end or segment_end <= segment_start:
|
|
result[order[cur_idx]] = None
|
|
else:
|
|
result[order[cur_idx]] = (segment_start, segment_end)
|
|
segment_start = segment_end
|
|
cur_idx += 1
|
|
while cur_idx < 4 and result[order[cur_idx]] is None:
|
|
cur_idx += 1
|
|
if cur_idx < 4 and result[order[cur_idx]] is not None and segment_start < end:
|
|
result[order[cur_idx]] = (segment_start, end)
|
|
if sub.data_ext_polygon_offset:
|
|
s = self.ext_poly_offset + sub.data_ext_polygon_offset
|
|
e = self.ext_poly_offset + (sub.data_ext_polygon_end or self.ext_poly_length)
|
|
if e > s:
|
|
result[SEG_EXTPOLYGON] = (s, e)
|
|
if sub.data_ext_polyline_offset:
|
|
s = self.ext_line_offset + sub.data_ext_polyline_offset
|
|
e = self.ext_line_offset + (sub.data_ext_polyline_end or self.ext_line_length)
|
|
if e > s:
|
|
result[SEG_EXTPOLYLINE] = (s, e)
|
|
if sub.data_ext_poi_offset:
|
|
s = self.ext_poi_offset + sub.data_ext_poi_offset
|
|
e = self.ext_poi_offset + (sub.data_ext_poi_end or self.ext_poi_length)
|
|
if e > s:
|
|
result[SEG_EXTPOINT] = (s, e)
|
|
return result
|
|
|
|
def parse_features(self) -> List[Feature]:
|
|
# Finalize subdivision end markers using RGN section lengths.
|
|
ordered = sorted(self.tre.subdivisions_by_index.values(), key=lambda s: (s.data_offset, s.index))
|
|
for i, sub in enumerate(ordered):
|
|
if sub.data_end == 0:
|
|
sub.data_end = self.data_length if i + 1 == len(ordered) else ordered[i + 1].data_offset
|
|
for attr_start, final_end in [
|
|
("data_ext_polygon_offset", self.ext_poly_length),
|
|
("data_ext_polyline_offset", self.ext_line_length),
|
|
("data_ext_poi_offset", self.ext_poi_length),
|
|
]:
|
|
items = sorted((s for s in self.tre.subdivisions_by_index.values() if getattr(s, attr_start, 0)),
|
|
key=lambda s: getattr(s, attr_start))
|
|
for i, sub in enumerate(items):
|
|
if attr_start == "data_ext_polygon_offset":
|
|
setattr(sub, "data_ext_polygon_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
|
|
elif attr_start == "data_ext_polyline_offset":
|
|
setattr(sub, "data_ext_polyline_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
|
|
else:
|
|
setattr(sub, "data_ext_poi_end", final_end if i + 1 == len(items) else getattr(items[i + 1], attr_start))
|
|
|
|
feats: List[Feature] = []
|
|
for sub in sorted(self.tre.subdivisions_by_index.values(), key=lambda s: s.index):
|
|
segs = self._segments(sub)
|
|
if segs[SEG_POINT]:
|
|
feats.extend(self._parse_points(sub, segs[SEG_POINT], indexed=False))
|
|
if segs[SEG_IPOINT]:
|
|
feats.extend(self._parse_points(sub, segs[SEG_IPOINT], indexed=True))
|
|
if segs[SEG_EXTPOINT]:
|
|
feats.extend(self._parse_ext_points(sub, segs[SEG_EXTPOINT]))
|
|
if segs[SEG_POLYLINE]:
|
|
feats.extend(self._parse_poly(sub, segs[SEG_POLYLINE], line=True, extended=False))
|
|
if segs[SEG_POLYGON]:
|
|
feats.extend(self._parse_poly(sub, segs[SEG_POLYGON], line=False, extended=False))
|
|
if segs[SEG_EXTPOLYLINE]:
|
|
feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYLINE], line=True, extended=True))
|
|
if segs[SEG_EXTPOLYGON]:
|
|
feats.extend(self._parse_poly(sub, segs[SEG_EXTPOLYGON], line=False, extended=True))
|
|
return feats
|
|
|
|
def _parse_points(self, sub: Subdivision, seg: Tuple[int, int], indexed: bool) -> List[Feature]:
|
|
feats: List[Feature] = []
|
|
pos, end = seg
|
|
while pos < end and pos + 8 <= len(self.data):
|
|
typ = self.data[pos]
|
|
info24 = read_u24le(self.data, pos + 1)
|
|
has_subtype = bool(info24 & 0x800000)
|
|
is_poi = bool(info24 & 0x400000)
|
|
lbl_off = info24 & 0x3FFFFF
|
|
lon_delta = read_s16le(self.data, pos + 4)
|
|
lat_delta = read_s16le(self.data, pos + 6)
|
|
pos += 8
|
|
subtype = 0
|
|
if has_subtype and pos < end:
|
|
subtype = self.data[pos]
|
|
pos += 1
|
|
name = self.lbl.get_label(lbl_off) if lbl_off else ""
|
|
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
|
|
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
|
|
feats.append(Feature(
|
|
geom_type="Point",
|
|
coords=[lon, lat],
|
|
props={
|
|
"garmin_kind": "indexed_point" if indexed else "point",
|
|
"garmin_type": f"0x{typ:02x}",
|
|
"garmin_subtype": f"0x{subtype:02x}",
|
|
"garmin_is_poi": is_poi,
|
|
"name": name,
|
|
},
|
|
))
|
|
return feats
|
|
|
|
def _parse_ext_points(self, sub: Subdivision, seg: Tuple[int, int]) -> List[Feature]:
|
|
feats: List[Feature] = []
|
|
pos, end = seg
|
|
while pos < end and pos + 6 <= len(self.data):
|
|
typ = self.data[pos]
|
|
subtype_raw = self.data[pos + 1]
|
|
has_lbl = bool(subtype_raw & 0x20)
|
|
subtype = subtype_raw % 32
|
|
full_type = ((typ + 0x100) << 8) + subtype
|
|
lon_delta = read_s16le(self.data, pos + 2)
|
|
lat_delta = read_s16le(self.data, pos + 4)
|
|
pos += 6
|
|
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
|
|
if has_lbl:
|
|
pos += 3
|
|
name = self.lbl.get_label(lbl_off) if lbl_off else ""
|
|
lon = to_deg(self._subdiv_lon(sub, lon_delta, 0))
|
|
lat = to_deg(self._subdiv_lat(sub, lat_delta, 0))
|
|
feats.append(Feature(
|
|
geom_type="Point",
|
|
coords=[lon, lat],
|
|
props={
|
|
"garmin_kind": "extended_point",
|
|
"garmin_type": f"0x{full_type:04x}",
|
|
"name": name,
|
|
},
|
|
))
|
|
return feats
|
|
|
|
def _parse_poly(self, sub: Subdivision, seg: Tuple[int, int], line: bool, extended: bool) -> List[Feature]:
|
|
feats: List[Feature] = []
|
|
pos, end = seg
|
|
while pos < end:
|
|
try:
|
|
if not extended:
|
|
if pos + 10 > end:
|
|
break
|
|
info1 = self.data[pos]
|
|
pos += 1
|
|
if line:
|
|
typ = info1 & 0x3F
|
|
direction = bool(info1 & 0x40)
|
|
else:
|
|
typ = info1 & 0x7F
|
|
direction = False
|
|
two_byte_len = bool(info1 & 0x80)
|
|
info24 = read_u24le(self.data, pos)
|
|
pos += 3
|
|
lbl_off = info24 & 0x3FFFFF
|
|
extra_bit = 1 if (info24 & 0x400000) else 0
|
|
data_in_net = bool(info24 & 0x800000)
|
|
lon_delta = read_s16le(self.data, pos)
|
|
lat_delta = read_s16le(self.data, pos + 2)
|
|
pos += 4
|
|
bitstream_len = read_u16le(self.data, pos) if two_byte_len else self.data[pos]
|
|
pos += 2 if two_byte_len else 1
|
|
bitstream_info = self.data[pos]
|
|
pos += 1
|
|
long_sign = 0
|
|
lat_sign = 0
|
|
long_extra_bit = extra_bit
|
|
lat_extra_bit = 0
|
|
full_type = typ
|
|
else:
|
|
if pos + 8 > end:
|
|
break
|
|
typ = self.data[pos]
|
|
subtype_raw = self.data[pos + 1]
|
|
has_lbl = bool(subtype_raw & 0x20)
|
|
subtype = subtype_raw % 32
|
|
full_type = ((typ + 0x100) << 8) + subtype
|
|
lon_delta = read_s16le(self.data, pos + 2)
|
|
lat_delta = read_s16le(self.data, pos + 4)
|
|
pos += 6
|
|
bitstream_len_byte = self.data[pos]
|
|
pos += 1
|
|
if bitstream_len_byte % 2 == 0:
|
|
if pos >= end:
|
|
break
|
|
bitstream_len = (bitstream_len_byte + self.data[pos] * 256) // 4 - 1
|
|
pos += 1
|
|
else:
|
|
bitstream_len = bitstream_len_byte // 2 - 1
|
|
bitstream_info = self.data[pos]
|
|
pos += 1
|
|
direction = False
|
|
data_in_net = False
|
|
long_sign = 0
|
|
lat_sign = 0
|
|
long_extra_bit = 0
|
|
lat_extra_bit = 0
|
|
reader = BitStreamReader(self.data, pos, bitstream_len)
|
|
if reader.read_next_bits(1) != 0:
|
|
long_sign = +1 if reader.read_next_bits(1) == 0 else -1
|
|
if reader.read_next_bits(1) != 0:
|
|
lat_sign = +1 if reader.read_next_bits(1) == 0 else -1
|
|
if extended:
|
|
long_extra_bit = reader.read_next_bits(1)
|
|
long_bits = self._convert_coord_length(bitstream_info & 0xF, long_sign, long_extra_bit)
|
|
lat_bits = self._convert_coord_length(bitstream_info >> 4, lat_sign, lat_extra_bit)
|
|
cur_lon = lon_delta
|
|
cur_lat = lat_delta
|
|
pts = [[to_deg(self._subdiv_lon(sub, cur_lon, 0)), to_deg(self._subdiv_lat(sub, cur_lat, 0))]]
|
|
cur_lon <<= long_extra_bit
|
|
cur_lat <<= lat_extra_bit
|
|
while reader.has_next(long_bits + lat_bits):
|
|
dlon = reader.read_coord_offset(long_bits, long_sign, long_extra_bit)
|
|
dlat = reader.read_coord_offset(lat_bits, lat_sign, lat_extra_bit)
|
|
cur_lon += dlon
|
|
cur_lat += dlat
|
|
pts.append([
|
|
to_deg(self._subdiv_lon(sub, cur_lon, long_extra_bit)),
|
|
to_deg(self._subdiv_lat(sub, cur_lat, lat_extra_bit)),
|
|
])
|
|
pos = reader.finish()
|
|
lbl_off = 0 if extended else lbl_off
|
|
if extended:
|
|
lbl_off = read_u24le(self.data, pos) if has_lbl and pos + 3 <= end else 0
|
|
if has_lbl:
|
|
pos += 3
|
|
name = self.lbl.get_label(lbl_off) if lbl_off else ""
|
|
if not line:
|
|
if pts and pts[0] != pts[-1]:
|
|
pts.append(pts[0])
|
|
feats.append(Feature(
|
|
geom_type="Polygon",
|
|
coords=[pts],
|
|
props={
|
|
"garmin_kind": "extended_polygon" if extended else "polygon",
|
|
"garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}",
|
|
"garmin_direction": direction,
|
|
"garmin_data_in_net": data_in_net,
|
|
"name": name,
|
|
},
|
|
))
|
|
else:
|
|
feats.append(Feature(
|
|
geom_type="LineString",
|
|
coords=pts,
|
|
props={
|
|
"garmin_kind": "extended_polyline" if extended else "polyline",
|
|
"garmin_type": f"0x{full_type:04x}" if extended else f"0x{typ:02x}",
|
|
"garmin_direction": direction,
|
|
"garmin_data_in_net": data_in_net,
|
|
"name": name,
|
|
},
|
|
))
|
|
except Exception:
|
|
# Stop current segment on malformed data instead of crashing the whole file.
|
|
break
|
|
return feats
|
|
|
|
|
|
# -------------------------
|
|
# Output writers
|
|
# -------------------------
|
|
|
|
def feature_to_geojson(f: Feature) -> Dict[str, object]:
|
|
props = {k: v for k, v in f.props.items() if v not in (None, "", [], {})}
|
|
return {
|
|
"type": "Feature",
|
|
"geometry": {"type": f.geom_type, "coordinates": f.coords},
|
|
"properties": props,
|
|
}
|
|
|
|
|
|
def write_geojson(features: List[Feature], path: Path) -> None:
|
|
fc = {
|
|
"type": "FeatureCollection",
|
|
"features": [feature_to_geojson(f) for f in features],
|
|
}
|
|
path.write_text(json.dumps(fc, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _osm_escape(v: object) -> str:
|
|
return xml_escape(str(v), {'"': '"'})
|
|
|
|
|
|
def write_osm(features: List[Feature], path: Path) -> None:
|
|
node_id = -1
|
|
way_id = -1
|
|
lines: List[str] = ['<?xml version="1.0" encoding="UTF-8"?>', '<osm version="0.6" generator="garmin_img_to_osmand">']
|
|
|
|
def add_node(lon: float, lat: float, tags: Optional[Dict[str, object]] = None) -> int:
|
|
nonlocal node_id
|
|
nid = node_id
|
|
node_id -= 1
|
|
if tags:
|
|
lines.append(f' <node id="{nid}" lat="{lat:.8f}" lon="{lon:.8f}">')
|
|
for k, v in tags.items():
|
|
if v in (None, ""):
|
|
continue
|
|
lines.append(f' <tag k="{_osm_escape(k)}" v="{_osm_escape(v)}"/>')
|
|
lines.append(' </node>')
|
|
else:
|
|
lines.append(f' <node id="{nid}" lat="{lat:.8f}" lon="{lon:.8f}"/>')
|
|
return nid
|
|
|
|
def feature_tags(f: Feature) -> Dict[str, object]:
|
|
tags = {}
|
|
kind = f.props.get("garmin_kind")
|
|
gtype = f.props.get("garmin_type")
|
|
if kind:
|
|
tags["garmin:kind"] = kind
|
|
if gtype:
|
|
tags["garmin:type"] = gtype
|
|
if f.props.get("garmin_subtype"):
|
|
tags["garmin:subtype"] = f.props["garmin_subtype"]
|
|
if f.props.get("name"):
|
|
tags["name"] = f.props["name"]
|
|
return tags
|
|
|
|
for f in features:
|
|
tags = feature_tags(f)
|
|
if f.geom_type == "Point":
|
|
lon, lat = f.coords
|
|
add_node(lon, lat, tags)
|
|
elif f.geom_type in ("LineString", "Polygon"):
|
|
coords = f.coords if f.geom_type == "LineString" else f.coords[0]
|
|
node_ids = [add_node(lon, lat) for lon, lat in coords]
|
|
wid = way_id
|
|
way_id -= 1
|
|
lines.append(f' <way id="{wid}">')
|
|
for nid in node_ids:
|
|
lines.append(f' <nd ref="{nid}"/>')
|
|
if f.geom_type == "Polygon":
|
|
tags["area"] = "yes"
|
|
for k, v in tags.items():
|
|
lines.append(f' <tag k="{_osm_escape(k)}" v="{_osm_escape(v)}"/>')
|
|
lines.append(' </way>')
|
|
lines.append('</osm>')
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
# -------------------------
|
|
# High-level orchestration
|
|
# -------------------------
|
|
|
|
def resolve_map_subfiles(files: Dict[str, bytes]) -> Dict[str, bytes]:
|
|
names = {k.upper(): v for k, v in files.items()}
|
|
# First, classic top-level TRE/RGN/LBL.
|
|
classic = {}
|
|
for ext in ("TRE", "RGN", "LBL", "NET"):
|
|
for k, v in names.items():
|
|
if k.endswith(f".{ext}"):
|
|
classic[ext] = v
|
|
break
|
|
if "TRE" in classic and "RGN" in classic:
|
|
return classic
|
|
# Then GMP bundle. Many NT maps expose offsets from the GMP header.
|
|
gmp = None
|
|
for k, v in names.items():
|
|
if k.endswith(".GMP"):
|
|
gmp = v
|
|
break
|
|
if gmp and len(gmp) > 0x29:
|
|
try:
|
|
tre_of = read_u32le(gmp, 0x19)
|
|
rgn_of = read_u32le(gmp, 0x1D)
|
|
lbl_of = read_u32le(gmp, 0x21)
|
|
net_of = read_u32le(gmp, 0x25)
|
|
offsets = [("TRE", tre_of), ("RGN", rgn_of), ("LBL", lbl_of), ("NET", net_of)]
|
|
positive = [(n, o) for n, o in offsets if o > 0]
|
|
positive_sorted = sorted(positive, key=lambda t: t[1])
|
|
out = {}
|
|
for i, (name, start) in enumerate(positive_sorted):
|
|
end = positive_sorted[i + 1][1] if i + 1 < len(positive_sorted) else len(gmp)
|
|
out[name] = gmp[start:end]
|
|
if "TRE" in out and "RGN" in out:
|
|
return out
|
|
except Exception:
|
|
pass
|
|
raise RuntimeError("Could not find a usable TRE/RGN pair in IMG container")
|
|
|
|
|
|
def load_features_from_img(img_path: Path) -> Tuple[List[Feature], Dict[str, object]]:
|
|
raw = img_path.read_bytes()
|
|
container = ImgContainer(raw)
|
|
subfiles = resolve_map_subfiles(container.files)
|
|
tre = TRE(subfiles["TRE"])
|
|
lbl = LBL(subfiles.get("LBL"))
|
|
rgn = RGN(subfiles["RGN"], tre=tre, lbl=lbl)
|
|
features = rgn.parse_features()
|
|
meta = {
|
|
"img_file": str(img_path),
|
|
"block_size": container.block_size,
|
|
"subfiles": sorted(container.files.keys()),
|
|
"bounds_garmin": {
|
|
"north": tre.north,
|
|
"east": tre.east,
|
|
"south": tre.south,
|
|
"west": tre.west,
|
|
},
|
|
"bounds_wgs84": {
|
|
"north": to_deg(tre.north),
|
|
"east": to_deg(tre.east),
|
|
"south": to_deg(tre.south),
|
|
"west": to_deg(tre.west),
|
|
},
|
|
"levels": {lvl: {"bits_per_coord": li.bits_per_coord, "inherited": li.inherited} for lvl, li in tre.levels.items()},
|
|
"feature_count": len(features),
|
|
}
|
|
return features, meta
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="Extract vector features from a Garmin IMG and export GeoJSON / OSM XML.")
|
|
ap.add_argument("img", type=Path, help="Input Garmin .img file")
|
|
ap.add_argument("--geojson", type=Path, help="Write GeoJSON output")
|
|
ap.add_argument("--osm", type=Path, help="Write OSM XML output")
|
|
ap.add_argument("--meta-json", type=Path, help="Write parse metadata JSON")
|
|
args = ap.parse_args()
|
|
|
|
if not args.geojson and not args.osm and not args.meta_json:
|
|
ap.error("provide at least one of --geojson, --osm, --meta-json")
|
|
|
|
features, meta = load_features_from_img(args.img)
|
|
info(f"parsed {len(features)} features")
|
|
|
|
if args.geojson:
|
|
write_geojson(features, args.geojson)
|
|
info(f"wrote GeoJSON: {args.geojson}")
|
|
if args.osm:
|
|
write_osm(features, args.osm)
|
|
info(f"wrote OSM XML: {args.osm}")
|
|
if args.meta_json:
|
|
args.meta_json.write_text(json.dumps(meta, indent=2), encoding="utf-8")
|
|
info(f"wrote metadata: {args.meta_json}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|