from __future__ import annotations import logging from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, Iterable, List, Tuple import cv2 import numpy as np import pandas as pd from PIL import Image, ImageDraw from rasterio.windows import Window from tqdm import tqdm from .georef import iter_windows, open_georaster, pixel_to_lonlat, read_window_rgb from .utils import ensure_dir, safe_stem LOG = logging.getLogger(__name__) @dataclass class Candidate: sheet_id: str source_path: str x: int y: int w: int h: int cx: float cy: float area: float aspect: float blue_fill_ratio: float rectangularity: float solidity: float approx_vertices: int fill_style: str score: float lon: float | None = None lat: float | None = None def to_dict(self): return asdict(self) def build_blue_mask(rgb: np.ndarray, cfg: Dict) -> np.ndarray: hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV) mask = np.zeros(hsv.shape[:2], dtype=np.uint8) for r in cfg["detector"].get("hsv_ranges", []): lower = np.array(r["lower"], dtype=np.uint8) upper = np.array(r["upper"], dtype=np.uint8) mask |= cv2.inRange(hsv, lower, upper) morph = cfg["detector"].get("morphology", {}) open_k = int(morph.get("open_kernel", 0) or 0) close_k = int(morph.get("close_kernel", 0) or 0) if open_k > 1: k = np.ones((open_k, open_k), np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k) if close_k > 1: k = np.ones((close_k, close_k), np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k) dilate_iter = int(morph.get("dilate_iterations", 0) or 0) if dilate_iter > 0: mask = cv2.dilate(mask, np.ones((2, 2), np.uint8), iterations=dilate_iter) return mask def _classify_fill_style(blue_fill_ratio: float, rectangularity: float, solidity: float) -> str: if blue_fill_ratio >= 0.48 and solidity >= 0.60: return "filled" if 0.10 <= blue_fill_ratio < 0.42 and rectangularity >= 0.28: return "hollow" if blue_fill_ratio < 0.18 and rectangularity >= 0.18: return "border" return "unknown" def _score_candidate(blue_fill_ratio: float, rectangularity: float, solidity: float, aspect: float, approx_vertices: int) -> float: aspect_bonus = 1.0 - min(abs(np.log(max(aspect, 1e-3))), 1.8) / 1.8 vertex_bonus = 1.0 if 4 <= approx_vertices <= 8 else 0.55 raw = 0.30 * blue_fill_ratio + 0.28 * rectangularity + 0.22 * solidity + 0.12 * aspect_bonus + 0.08 * vertex_bonus return float(max(0.0, min(1.0, raw))) def find_candidates_in_rgb(rgb: np.ndarray, cfg: Dict, sheet_id: str, source_path: str, xoff: int = 0, yoff: int = 0) -> List[Candidate]: det = cfg["detector"] mask = build_blue_mask(rgb, cfg) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) out: List[Candidate] = [] for contour in contours: area = float(cv2.contourArea(contour)) if area < det["min_area_px"] or area > det["max_area_px"]: continue x, y, w, h = cv2.boundingRect(contour) if w < det["min_width_px"] or h < det["min_height_px"]: continue if w > det["max_width_px"] or h > det["max_height_px"]: continue aspect = float(w / max(h, 1)) if not (det["min_aspect"] <= aspect <= det["max_aspect"]): continue bbox_mask = mask[y:y + h, x:x + w] blue_fill_ratio = float(np.count_nonzero(bbox_mask) / max(w * h, 1)) if not (det["min_blue_fill_ratio"] <= blue_fill_ratio <= det["max_blue_fill_ratio"]): continue rectangularity = float(area / max(w * h, 1)) if rectangularity < det["min_rectangularity"]: continue hull = cv2.convexHull(contour) hull_area = float(cv2.contourArea(hull)) solidity = float(area / hull_area) if hull_area > 0 else 0.0 if solidity < det["min_solidity"]: continue peri = float(cv2.arcLength(contour, True)) approx = cv2.approxPolyDP(contour, 0.04 * peri, True) fill_style = _classify_fill_style(blue_fill_ratio, rectangularity, solidity) score = _score_candidate(blue_fill_ratio, rectangularity, solidity, aspect, len(approx)) out.append( Candidate( sheet_id=sheet_id, source_path=source_path, x=int(x + xoff), y=int(y + yoff), w=int(w), h=int(h), cx=float(x + xoff + w / 2), cy=float(y + yoff + h / 2), area=area, aspect=aspect, blue_fill_ratio=blue_fill_ratio, rectangularity=rectangularity, solidity=solidity, approx_vertices=int(len(approx)), fill_style=fill_style, score=score, ) ) return out def bbox_iou(a: Candidate, b: Candidate) -> float: ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) inter = iw * ih union = a.w * a.h + b.w * b.h - inter return float(inter / union) if union else 0.0 def nms(cands: Iterable[Candidate], threshold: float) -> List[Candidate]: items = sorted(cands, key=lambda c: c.score, reverse=True) kept: List[Candidate] = [] for c in items: if all(bbox_iou(c, k) < threshold for k in kept): kept.append(c) return kept def detect_sheet(map_path: str | None, tif_path: str | None, sheet_id: str, cfg: Dict, out_dir: str | Path) -> Path: out_dir = ensure_dir(out_dir) rh = open_georaster(map_path=map_path, tif_path=tif_path) try: tile_size = int(cfg["detector"]["tile_size"]) overlap = int(cfg["detector"]["tile_overlap"]) all_candidates: List[Candidate] = [] windows = list(iter_windows(rh.width, rh.height, tile_size, overlap)) LOG.info("Scanning %s as %d windows (%dx%d px)", sheet_id, len(windows), rh.width, rh.height) for win in tqdm(windows, desc=f"scan {sheet_id}"): rgb = read_window_rgb(rh.dataset, win) cands = find_candidates_in_rgb( rgb=rgb, cfg=cfg, sheet_id=sheet_id, source_path=str(rh.path), xoff=int(win.col_off), yoff=int(win.row_off), ) all_candidates.extend(cands) kept = nms(all_candidates, float(cfg["detector"].get("nms_iou_threshold", 0.25))) # Attach georeferenced centers when possible. for c in kept: if rh.crs is not None: xgeo, ygeo = pixel_to_lonlat(rh.dataset, int(c.cy), int(c.cx)) c.lon = xgeo c.lat = ygeo out_csv = out_dir / f"{sheet_id}_candidates.csv" pd.DataFrame([c.to_dict() for c in kept]).to_csv(out_csv, index=False) LOG.info("%s: wrote %d candidates to %s", sheet_id, len(kept), out_csv) return out_csv finally: rh.close() def draw_overlay(tif_path: str | Path, candidates_csv: str | Path, out_png: str | Path, max_side: int = 2400) -> Path: """Draw candidates on a downscaled image. Uses TIFF path for simple visual QA.""" img = Image.open(tif_path).convert("RGB") scale = min(1.0, max_side / max(img.size)) disp = img.resize((int(img.width * scale), int(img.height * scale))) if scale < 1 else img.copy() draw = ImageDraw.Draw(disp) df = pd.read_csv(candidates_csv) color_by_style = { "filled": (0, 255, 255), "hollow": (0, 120, 255), "border": (20, 20, 255), "unknown": (255, 255, 0), } for _, r in df.iterrows(): color = color_by_style.get(str(r.get("fill_style", "unknown")), (255, 255, 0)) x1, y1 = float(r.x) * scale, float(r.y) * scale x2, y2 = float(r.x + r.w) * scale, float(r.y + r.h) * scale draw.rectangle([x1, y1, x2, y2], outline=color, width=max(1, int(2 * scale + 1))) out_png = Path(out_png) ensure_dir(out_png.parent) disp.save(out_png) LOG.info("Wrote overlay: %s", out_png) return out_png