Files
garmin-img-format-parsing/bgtopo_poc/detector_cv.py
2026-05-03 21:58:47 +03:00

224 lines
8.2 KiB
Python

from __future__ import annotations
import logging
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import cv2
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
from rasterio.windows import Window
from tqdm import tqdm
from .georef import iter_windows, open_georaster, pixel_to_lonlat, read_window_rgb
from .utils import ensure_dir, safe_stem
LOG = logging.getLogger(__name__)
@dataclass
class Candidate:
sheet_id: str
source_path: str
x: int
y: int
w: int
h: int
cx: float
cy: float
area: float
aspect: float
blue_fill_ratio: float
rectangularity: float
solidity: float
approx_vertices: int
fill_style: str
score: float
lon: float | None = None
lat: float | None = None
def to_dict(self):
return asdict(self)
def build_blue_mask(rgb: np.ndarray, cfg: Dict) -> np.ndarray:
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV)
mask = np.zeros(hsv.shape[:2], dtype=np.uint8)
for r in cfg["detector"].get("hsv_ranges", []):
lower = np.array(r["lower"], dtype=np.uint8)
upper = np.array(r["upper"], dtype=np.uint8)
mask |= cv2.inRange(hsv, lower, upper)
morph = cfg["detector"].get("morphology", {})
open_k = int(morph.get("open_kernel", 0) or 0)
close_k = int(morph.get("close_kernel", 0) or 0)
if open_k > 1:
k = np.ones((open_k, open_k), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k)
if close_k > 1:
k = np.ones((close_k, close_k), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k)
dilate_iter = int(morph.get("dilate_iterations", 0) or 0)
if dilate_iter > 0:
mask = cv2.dilate(mask, np.ones((2, 2), np.uint8), iterations=dilate_iter)
return mask
def _classify_fill_style(blue_fill_ratio: float, rectangularity: float, solidity: float) -> str:
if blue_fill_ratio >= 0.48 and solidity >= 0.60:
return "filled"
if 0.10 <= blue_fill_ratio < 0.42 and rectangularity >= 0.28:
return "hollow"
if blue_fill_ratio < 0.18 and rectangularity >= 0.18:
return "border"
return "unknown"
def _score_candidate(blue_fill_ratio: float, rectangularity: float, solidity: float, aspect: float, approx_vertices: int) -> float:
aspect_bonus = 1.0 - min(abs(np.log(max(aspect, 1e-3))), 1.8) / 1.8
vertex_bonus = 1.0 if 4 <= approx_vertices <= 8 else 0.55
raw = 0.30 * blue_fill_ratio + 0.28 * rectangularity + 0.22 * solidity + 0.12 * aspect_bonus + 0.08 * vertex_bonus
return float(max(0.0, min(1.0, raw)))
def find_candidates_in_rgb(rgb: np.ndarray, cfg: Dict, sheet_id: str, source_path: str, xoff: int = 0, yoff: int = 0) -> List[Candidate]:
det = cfg["detector"]
mask = build_blue_mask(rgb, cfg)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
out: List[Candidate] = []
for contour in contours:
area = float(cv2.contourArea(contour))
if area < det["min_area_px"] or area > det["max_area_px"]:
continue
x, y, w, h = cv2.boundingRect(contour)
if w < det["min_width_px"] or h < det["min_height_px"]:
continue
if w > det["max_width_px"] or h > det["max_height_px"]:
continue
aspect = float(w / max(h, 1))
if not (det["min_aspect"] <= aspect <= det["max_aspect"]):
continue
bbox_mask = mask[y:y + h, x:x + w]
blue_fill_ratio = float(np.count_nonzero(bbox_mask) / max(w * h, 1))
if not (det["min_blue_fill_ratio"] <= blue_fill_ratio <= det["max_blue_fill_ratio"]):
continue
rectangularity = float(area / max(w * h, 1))
if rectangularity < det["min_rectangularity"]:
continue
hull = cv2.convexHull(contour)
hull_area = float(cv2.contourArea(hull))
solidity = float(area / hull_area) if hull_area > 0 else 0.0
if solidity < det["min_solidity"]:
continue
peri = float(cv2.arcLength(contour, True))
approx = cv2.approxPolyDP(contour, 0.04 * peri, True)
fill_style = _classify_fill_style(blue_fill_ratio, rectangularity, solidity)
score = _score_candidate(blue_fill_ratio, rectangularity, solidity, aspect, len(approx))
out.append(
Candidate(
sheet_id=sheet_id,
source_path=source_path,
x=int(x + xoff),
y=int(y + yoff),
w=int(w),
h=int(h),
cx=float(x + xoff + w / 2),
cy=float(y + yoff + h / 2),
area=area,
aspect=aspect,
blue_fill_ratio=blue_fill_ratio,
rectangularity=rectangularity,
solidity=solidity,
approx_vertices=int(len(approx)),
fill_style=fill_style,
score=score,
)
)
return out
def bbox_iou(a: Candidate, b: Candidate) -> float:
ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h
bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
inter = iw * ih
union = a.w * a.h + b.w * b.h - inter
return float(inter / union) if union else 0.0
def nms(cands: Iterable[Candidate], threshold: float) -> List[Candidate]:
items = sorted(cands, key=lambda c: c.score, reverse=True)
kept: List[Candidate] = []
for c in items:
if all(bbox_iou(c, k) < threshold for k in kept):
kept.append(c)
return kept
def detect_sheet(map_path: str | None, tif_path: str | None, sheet_id: str, cfg: Dict, out_dir: str | Path) -> Path:
out_dir = ensure_dir(out_dir)
rh = open_georaster(map_path=map_path, tif_path=tif_path)
try:
tile_size = int(cfg["detector"]["tile_size"])
overlap = int(cfg["detector"]["tile_overlap"])
all_candidates: List[Candidate] = []
windows = list(iter_windows(rh.width, rh.height, tile_size, overlap))
LOG.info("Scanning %s as %d windows (%dx%d px)", sheet_id, len(windows), rh.width, rh.height)
for win in tqdm(windows, desc=f"scan {sheet_id}"):
rgb = read_window_rgb(rh.dataset, win)
cands = find_candidates_in_rgb(
rgb=rgb,
cfg=cfg,
sheet_id=sheet_id,
source_path=str(rh.path),
xoff=int(win.col_off),
yoff=int(win.row_off),
)
all_candidates.extend(cands)
kept = nms(all_candidates, float(cfg["detector"].get("nms_iou_threshold", 0.25)))
# Attach georeferenced centers when possible.
for c in kept:
if rh.crs is not None:
xgeo, ygeo = pixel_to_lonlat(rh.dataset, int(c.cy), int(c.cx))
c.lon = xgeo
c.lat = ygeo
out_csv = out_dir / f"{sheet_id}_candidates.csv"
pd.DataFrame([c.to_dict() for c in kept]).to_csv(out_csv, index=False)
LOG.info("%s: wrote %d candidates to %s", sheet_id, len(kept), out_csv)
return out_csv
finally:
rh.close()
def draw_overlay(tif_path: str | Path, candidates_csv: str | Path, out_png: str | Path, max_side: int = 2400) -> Path:
"""Draw candidates on a downscaled image. Uses TIFF path for simple visual QA."""
img = Image.open(tif_path).convert("RGB")
scale = min(1.0, max_side / max(img.size))
disp = img.resize((int(img.width * scale), int(img.height * scale))) if scale < 1 else img.copy()
draw = ImageDraw.Draw(disp)
df = pd.read_csv(candidates_csv)
color_by_style = {
"filled": (0, 255, 255),
"hollow": (0, 120, 255),
"border": (20, 20, 255),
"unknown": (255, 255, 0),
}
for _, r in df.iterrows():
color = color_by_style.get(str(r.get("fill_style", "unknown")), (255, 255, 0))
x1, y1 = float(r.x) * scale, float(r.y) * scale
x2, y2 = float(r.x + r.w) * scale, float(r.y + r.h) * scale
draw.rectangle([x1, y1, x2, y2], outline=color, width=max(1, int(2 * scale + 1)))
out_png = Path(out_png)
ensure_dir(out_png.parent)
disp.save(out_png)
LOG.info("Wrote overlay: %s", out_png)
return out_png