224 lines
8.2 KiB
Python
224 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, List, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pandas as pd
|
|
from PIL import Image, ImageDraw
|
|
from rasterio.windows import Window
|
|
from tqdm import tqdm
|
|
|
|
from .georef import iter_windows, open_georaster, pixel_to_lonlat, read_window_rgb
|
|
from .utils import ensure_dir, safe_stem
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Candidate:
|
|
sheet_id: str
|
|
source_path: str
|
|
x: int
|
|
y: int
|
|
w: int
|
|
h: int
|
|
cx: float
|
|
cy: float
|
|
area: float
|
|
aspect: float
|
|
blue_fill_ratio: float
|
|
rectangularity: float
|
|
solidity: float
|
|
approx_vertices: int
|
|
fill_style: str
|
|
score: float
|
|
lon: float | None = None
|
|
lat: float | None = None
|
|
|
|
def to_dict(self):
|
|
return asdict(self)
|
|
|
|
|
|
def build_blue_mask(rgb: np.ndarray, cfg: Dict) -> np.ndarray:
|
|
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV)
|
|
mask = np.zeros(hsv.shape[:2], dtype=np.uint8)
|
|
for r in cfg["detector"].get("hsv_ranges", []):
|
|
lower = np.array(r["lower"], dtype=np.uint8)
|
|
upper = np.array(r["upper"], dtype=np.uint8)
|
|
mask |= cv2.inRange(hsv, lower, upper)
|
|
morph = cfg["detector"].get("morphology", {})
|
|
open_k = int(morph.get("open_kernel", 0) or 0)
|
|
close_k = int(morph.get("close_kernel", 0) or 0)
|
|
if open_k > 1:
|
|
k = np.ones((open_k, open_k), np.uint8)
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k)
|
|
if close_k > 1:
|
|
k = np.ones((close_k, close_k), np.uint8)
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k)
|
|
dilate_iter = int(morph.get("dilate_iterations", 0) or 0)
|
|
if dilate_iter > 0:
|
|
mask = cv2.dilate(mask, np.ones((2, 2), np.uint8), iterations=dilate_iter)
|
|
return mask
|
|
|
|
|
|
def _classify_fill_style(blue_fill_ratio: float, rectangularity: float, solidity: float) -> str:
|
|
if blue_fill_ratio >= 0.48 and solidity >= 0.60:
|
|
return "filled"
|
|
if 0.10 <= blue_fill_ratio < 0.42 and rectangularity >= 0.28:
|
|
return "hollow"
|
|
if blue_fill_ratio < 0.18 and rectangularity >= 0.18:
|
|
return "border"
|
|
return "unknown"
|
|
|
|
|
|
def _score_candidate(blue_fill_ratio: float, rectangularity: float, solidity: float, aspect: float, approx_vertices: int) -> float:
|
|
aspect_bonus = 1.0 - min(abs(np.log(max(aspect, 1e-3))), 1.8) / 1.8
|
|
vertex_bonus = 1.0 if 4 <= approx_vertices <= 8 else 0.55
|
|
raw = 0.30 * blue_fill_ratio + 0.28 * rectangularity + 0.22 * solidity + 0.12 * aspect_bonus + 0.08 * vertex_bonus
|
|
return float(max(0.0, min(1.0, raw)))
|
|
|
|
|
|
def find_candidates_in_rgb(rgb: np.ndarray, cfg: Dict, sheet_id: str, source_path: str, xoff: int = 0, yoff: int = 0) -> List[Candidate]:
|
|
det = cfg["detector"]
|
|
mask = build_blue_mask(rgb, cfg)
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
out: List[Candidate] = []
|
|
|
|
for contour in contours:
|
|
area = float(cv2.contourArea(contour))
|
|
if area < det["min_area_px"] or area > det["max_area_px"]:
|
|
continue
|
|
x, y, w, h = cv2.boundingRect(contour)
|
|
if w < det["min_width_px"] or h < det["min_height_px"]:
|
|
continue
|
|
if w > det["max_width_px"] or h > det["max_height_px"]:
|
|
continue
|
|
aspect = float(w / max(h, 1))
|
|
if not (det["min_aspect"] <= aspect <= det["max_aspect"]):
|
|
continue
|
|
|
|
bbox_mask = mask[y:y + h, x:x + w]
|
|
blue_fill_ratio = float(np.count_nonzero(bbox_mask) / max(w * h, 1))
|
|
if not (det["min_blue_fill_ratio"] <= blue_fill_ratio <= det["max_blue_fill_ratio"]):
|
|
continue
|
|
|
|
rectangularity = float(area / max(w * h, 1))
|
|
if rectangularity < det["min_rectangularity"]:
|
|
continue
|
|
|
|
hull = cv2.convexHull(contour)
|
|
hull_area = float(cv2.contourArea(hull))
|
|
solidity = float(area / hull_area) if hull_area > 0 else 0.0
|
|
if solidity < det["min_solidity"]:
|
|
continue
|
|
|
|
peri = float(cv2.arcLength(contour, True))
|
|
approx = cv2.approxPolyDP(contour, 0.04 * peri, True)
|
|
fill_style = _classify_fill_style(blue_fill_ratio, rectangularity, solidity)
|
|
score = _score_candidate(blue_fill_ratio, rectangularity, solidity, aspect, len(approx))
|
|
out.append(
|
|
Candidate(
|
|
sheet_id=sheet_id,
|
|
source_path=source_path,
|
|
x=int(x + xoff),
|
|
y=int(y + yoff),
|
|
w=int(w),
|
|
h=int(h),
|
|
cx=float(x + xoff + w / 2),
|
|
cy=float(y + yoff + h / 2),
|
|
area=area,
|
|
aspect=aspect,
|
|
blue_fill_ratio=blue_fill_ratio,
|
|
rectangularity=rectangularity,
|
|
solidity=solidity,
|
|
approx_vertices=int(len(approx)),
|
|
fill_style=fill_style,
|
|
score=score,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def bbox_iou(a: Candidate, b: Candidate) -> float:
|
|
ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h
|
|
bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h
|
|
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
|
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
|
iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
|
|
inter = iw * ih
|
|
union = a.w * a.h + b.w * b.h - inter
|
|
return float(inter / union) if union else 0.0
|
|
|
|
|
|
def nms(cands: Iterable[Candidate], threshold: float) -> List[Candidate]:
|
|
items = sorted(cands, key=lambda c: c.score, reverse=True)
|
|
kept: List[Candidate] = []
|
|
for c in items:
|
|
if all(bbox_iou(c, k) < threshold for k in kept):
|
|
kept.append(c)
|
|
return kept
|
|
|
|
|
|
def detect_sheet(map_path: str | None, tif_path: str | None, sheet_id: str, cfg: Dict, out_dir: str | Path) -> Path:
|
|
out_dir = ensure_dir(out_dir)
|
|
rh = open_georaster(map_path=map_path, tif_path=tif_path)
|
|
try:
|
|
tile_size = int(cfg["detector"]["tile_size"])
|
|
overlap = int(cfg["detector"]["tile_overlap"])
|
|
all_candidates: List[Candidate] = []
|
|
windows = list(iter_windows(rh.width, rh.height, tile_size, overlap))
|
|
LOG.info("Scanning %s as %d windows (%dx%d px)", sheet_id, len(windows), rh.width, rh.height)
|
|
for win in tqdm(windows, desc=f"scan {sheet_id}"):
|
|
rgb = read_window_rgb(rh.dataset, win)
|
|
cands = find_candidates_in_rgb(
|
|
rgb=rgb,
|
|
cfg=cfg,
|
|
sheet_id=sheet_id,
|
|
source_path=str(rh.path),
|
|
xoff=int(win.col_off),
|
|
yoff=int(win.row_off),
|
|
)
|
|
all_candidates.extend(cands)
|
|
kept = nms(all_candidates, float(cfg["detector"].get("nms_iou_threshold", 0.25)))
|
|
# Attach georeferenced centers when possible.
|
|
for c in kept:
|
|
if rh.crs is not None:
|
|
xgeo, ygeo = pixel_to_lonlat(rh.dataset, int(c.cy), int(c.cx))
|
|
c.lon = xgeo
|
|
c.lat = ygeo
|
|
out_csv = out_dir / f"{sheet_id}_candidates.csv"
|
|
pd.DataFrame([c.to_dict() for c in kept]).to_csv(out_csv, index=False)
|
|
LOG.info("%s: wrote %d candidates to %s", sheet_id, len(kept), out_csv)
|
|
return out_csv
|
|
finally:
|
|
rh.close()
|
|
|
|
|
|
def draw_overlay(tif_path: str | Path, candidates_csv: str | Path, out_png: str | Path, max_side: int = 2400) -> Path:
|
|
"""Draw candidates on a downscaled image. Uses TIFF path for simple visual QA."""
|
|
img = Image.open(tif_path).convert("RGB")
|
|
scale = min(1.0, max_side / max(img.size))
|
|
disp = img.resize((int(img.width * scale), int(img.height * scale))) if scale < 1 else img.copy()
|
|
draw = ImageDraw.Draw(disp)
|
|
df = pd.read_csv(candidates_csv)
|
|
color_by_style = {
|
|
"filled": (0, 255, 255),
|
|
"hollow": (0, 120, 255),
|
|
"border": (20, 20, 255),
|
|
"unknown": (255, 255, 0),
|
|
}
|
|
for _, r in df.iterrows():
|
|
color = color_by_style.get(str(r.get("fill_style", "unknown")), (255, 255, 0))
|
|
x1, y1 = float(r.x) * scale, float(r.y) * scale
|
|
x2, y2 = float(r.x + r.w) * scale, float(r.y + r.h) * scale
|
|
draw.rectangle([x1, y1, x2, y2], outline=color, width=max(1, int(2 * scale + 1)))
|
|
out_png = Path(out_png)
|
|
ensure_dir(out_png.parent)
|
|
disp.save(out_png)
|
|
LOG.info("Wrote overlay: %s", out_png)
|
|
return out_png
|