Replace hardcoded config and directory scan with YAML config and explicit clip list
- config.py constants -> config/config.yaml (user-editable, git-ignored) - Questions and defaults now defined in the YAML, including per-question defaults - ClipSelector no longer scans the data dir; reads a user-provided clips.txt instead - Removed --daily / --time / --skip-existing-day args - video_loader now samples frames evenly across the full clip - pyyaml added as a dependency Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -9,56 +9,44 @@ use("QtAgg")
|
||||
from PySide6.QtWidgets import QApplication
|
||||
|
||||
from .annotator import Annotator
|
||||
from .config import load_config
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--data",
|
||||
default=r"C:\Users\sieverin\HydroScan\Code\river-annotation-tool\data\filtered_data",
|
||||
"--config",
|
||||
default="config/config.yaml",
|
||||
help="Path to config YAML file (default: config/config.yaml)",
|
||||
)
|
||||
parser.add_argument("--out", default="data/annotation_results/")
|
||||
parser.add_argument("--data", default=None, help="Override data_dir from config")
|
||||
parser.add_argument("--out", default=None, help="Override out_dir from config")
|
||||
parser.add_argument("--clips", default=None, help="Override clips_file from config")
|
||||
parser.add_argument(
|
||||
"--clip",
|
||||
default=None,
|
||||
help="Stem name of a specific clip to load (e.g. 'left_20230501')",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--time",
|
||||
default=None,
|
||||
help="Target time to filter clips by day (format: HH:MM, e.g. '14:30'). "
|
||||
"Selects the closest clip to this time for each day.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--daily",
|
||||
action="store_true",
|
||||
help="Load only 1 clip per day at the specified time (requires --time).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--extras",
|
||||
action="store_true",
|
||||
help="Also save GIFs, frame PNG, overlay PNG, and mask_vis PNG alongside the mask.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skip-existing-day",
|
||||
action="store_true",
|
||||
help="In --daily mode, skip days that already have any annotated clip.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
|
||||
cfg = load_config(Path(args.config))
|
||||
if args.data:
|
||||
cfg.data_dir = args.data
|
||||
if args.out:
|
||||
cfg.out_dir = args.out
|
||||
if args.clips:
|
||||
cfg.clips_file = args.clips
|
||||
|
||||
app = QApplication([])
|
||||
win = Annotator(
|
||||
Path(args.data),
|
||||
Path(args.out),
|
||||
clip=args.clip,
|
||||
target_time=args.time,
|
||||
daily=args.daily,
|
||||
extras=args.extras,
|
||||
skip_existing_day=args.skip_existing_day,
|
||||
)
|
||||
win = Annotator(cfg, clip=args.clip, extras=args.extras)
|
||||
win.show()
|
||||
app.exec()
|
||||
|
||||
@@ -18,7 +18,7 @@ from PySide6.QtWidgets import (
|
||||
)
|
||||
|
||||
from .clip_selector import ClipSelector
|
||||
from .config import DEFAULTS, QUESTIONS, Config
|
||||
from .config import AppConfig
|
||||
from .mask_canvas import MaskCanvas
|
||||
from .video_loader import load_frames
|
||||
|
||||
@@ -26,25 +26,20 @@ from .video_loader import load_frames
|
||||
class Annotator(QMainWindow):
|
||||
def __init__(
|
||||
self,
|
||||
data_dir: Path,
|
||||
out_dir: Path,
|
||||
config: AppConfig,
|
||||
clip: str = None,
|
||||
target_time: str = None,
|
||||
daily: bool = False,
|
||||
extras: bool = False,
|
||||
skip_existing_day: bool = False,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.out_dir = Path(out_dir)
|
||||
self.cfg = config
|
||||
self.out_dir = Path(config.out_dir)
|
||||
self.extras = extras
|
||||
|
||||
self.selector = ClipSelector(
|
||||
data_dir=Path(data_dir),
|
||||
data_dir=Path(config.data_dir),
|
||||
out_dir=self.out_dir,
|
||||
target_time=target_time,
|
||||
daily=daily,
|
||||
skip_existing_day=skip_existing_day,
|
||||
clips_file=Path(config.clips_file),
|
||||
)
|
||||
|
||||
self.setWindowTitle("River Annotator")
|
||||
@@ -53,10 +48,13 @@ class Annotator(QMainWindow):
|
||||
self._init_timer()
|
||||
|
||||
# ── clip loading ───────────────────────────────────────────────
|
||||
def _load_clip(self, specific: str = None, next_day: bool = False):
|
||||
self.filename = self.selector.next(specific=specific, next_day=next_day)
|
||||
def _load_clip(self, specific: str = None):
|
||||
self.filename = self.selector.next(specific=specific)
|
||||
self.frames, self.fps, self.dh, self.dw, self.h, self.w = load_frames(
|
||||
self.filename, Config.MAX_FRAMES
|
||||
self.filename,
|
||||
self.cfg.max_frames,
|
||||
self.cfg.display_max,
|
||||
self.cfg.fps_fallback,
|
||||
)
|
||||
self._pending_answers = self._read_saved_answers()
|
||||
|
||||
@@ -135,23 +133,22 @@ class Annotator(QMainWindow):
|
||||
|
||||
def _build_question_panel(self) -> QVBoxLayout:
|
||||
vbox = QVBoxLayout()
|
||||
for section, qs in QUESTIONS:
|
||||
for section, qs in self.cfg.get_questions():
|
||||
group = QGroupBox(section)
|
||||
gvbox = QVBoxLayout()
|
||||
for key, label, options in qs:
|
||||
for key, label, options, default in qs:
|
||||
gvbox.addWidget(QLabel(label))
|
||||
btn_group = QButtonGroup(self)
|
||||
row = QHBoxLayout()
|
||||
buttons = []
|
||||
default_value = DEFAULTS.get(key)
|
||||
for opt in options:
|
||||
btn = QRadioButton(opt)
|
||||
btn_group.addButton(btn)
|
||||
row.addWidget(btn)
|
||||
buttons.append(btn)
|
||||
if default_value == opt:
|
||||
if default == opt:
|
||||
btn.setChecked(True)
|
||||
if default_value is None and buttons:
|
||||
if default is None and buttons:
|
||||
buttons[-1].setChecked(True)
|
||||
self.q_widgets[key] = (btn_group, buttons, options)
|
||||
gvbox.addLayout(row)
|
||||
@@ -246,8 +243,8 @@ class Annotator(QMainWindow):
|
||||
if answers:
|
||||
self._set_answers(answers)
|
||||
|
||||
def _advance_clip(self, next_day: bool):
|
||||
self._load_clip(next_day=next_day)
|
||||
def _advance_clip(self):
|
||||
self._load_clip()
|
||||
self.frame_i = 0
|
||||
self.mc.load_clip(
|
||||
self.frames,
|
||||
@@ -262,7 +259,7 @@ class Annotator(QMainWindow):
|
||||
|
||||
def next_clip(self):
|
||||
self.save()
|
||||
self._advance_clip(next_day=self.selector.daily)
|
||||
self._advance_clip()
|
||||
|
||||
def skip_clip(self):
|
||||
self._advance_clip(next_day=self.selector.daily)
|
||||
self._advance_clip()
|
||||
|
||||
@@ -1,47 +1,28 @@
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class ClipSelector:
|
||||
"""Picks which clip to annotate next, handling daily/time-based filtering."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data_dir: Path,
|
||||
out_dir: Path,
|
||||
target_time: str = None,
|
||||
daily: bool = False,
|
||||
skip_existing_day: bool = False,
|
||||
):
|
||||
def __init__(self, data_dir: Path, out_dir: Path, clips_file: Path):
|
||||
self.data_dir = data_dir
|
||||
self.out_dir = out_dir
|
||||
self.target_time = target_time
|
||||
self.daily = daily
|
||||
self.skip_existing_day = skip_existing_day
|
||||
self.current_date = None
|
||||
self.clips = self._load_clips(clips_file)
|
||||
self.index = 0
|
||||
|
||||
self.df = self._load_dataset()
|
||||
|
||||
def _load_dataset(self) -> pd.DataFrame:
|
||||
files = list(self.data_dir.glob("*.zip"))
|
||||
if not files:
|
||||
raise FileNotFoundError(f"No zip files in {self.data_dir}")
|
||||
|
||||
df = pd.DataFrame({"filename": files})
|
||||
df["datetime"] = df["filename"].apply(
|
||||
lambda x: pd.to_datetime(x.stem.split("_")[1], errors="coerce")
|
||||
)
|
||||
return df.sort_values("datetime").reset_index(drop=True)
|
||||
def _load_clips(self, clips_file: Path) -> list[Path]:
|
||||
lines = clips_file.read_text().splitlines()
|
||||
return [
|
||||
self.data_dir / name.strip()
|
||||
for name in lines
|
||||
if name.strip() and not name.strip().startswith("#")
|
||||
]
|
||||
|
||||
def is_annotated(self, path: Path) -> bool:
|
||||
return (self.out_dir / path.stem / "mask.png").exists()
|
||||
|
||||
def next(self, specific: str = None, next_day: bool = False) -> Path:
|
||||
if specific is not None:
|
||||
def next(self, specific: str = None) -> Path:
|
||||
if specific:
|
||||
return self._resolve_specific(specific)
|
||||
return self._pick_next(next_day=next_day)
|
||||
return self._pick_next()
|
||||
|
||||
def _resolve_specific(self, specific: str) -> Path:
|
||||
matches = list(self.data_dir.glob(f"{specific}.zip"))
|
||||
@@ -52,57 +33,10 @@ class ClipSelector:
|
||||
raise FileNotFoundError(f"Clip '{specific}' not found in {self.data_dir}")
|
||||
return matches[0]
|
||||
|
||||
def _pick_next(self, next_day: bool = False) -> Path:
|
||||
remaining = [f for f in self.df["filename"] if not self.is_annotated(f)]
|
||||
if not remaining:
|
||||
raise RuntimeError("No remaining clips to annotate")
|
||||
|
||||
if not (self.target_time or self.daily):
|
||||
filename = remaining[0]
|
||||
dt = self.df[self.df["filename"] == filename]["datetime"].values[0]
|
||||
self.current_date = pd.Timestamp(dt).date()
|
||||
return filename
|
||||
|
||||
return self._pick_by_time(remaining, next_day)
|
||||
|
||||
def _pick_by_time(self, remaining: list, next_day: bool) -> Path:
|
||||
if self.target_time:
|
||||
target_hour, target_minute = map(int, self.target_time.split(":"))
|
||||
else:
|
||||
target_hour, target_minute = 12, 0
|
||||
target_seconds = target_hour * 3600 + target_minute * 60
|
||||
|
||||
remaining_datetimes = [
|
||||
self.df[self.df["filename"] == f]["datetime"].values[0] for f in remaining
|
||||
]
|
||||
df_remaining = pd.DataFrame({"filename": remaining, "datetime": remaining_datetimes})
|
||||
df_remaining["date"] = df_remaining["datetime"].dt.date
|
||||
|
||||
if self.daily and next_day and self.current_date is not None:
|
||||
next_date = self.current_date + datetime.timedelta(days=1)
|
||||
df_remaining = df_remaining[df_remaining["date"] >= next_date]
|
||||
|
||||
if self.daily and self.skip_existing_day:
|
||||
annotated_dates = set()
|
||||
for f in self.df["filename"]:
|
||||
if self.is_annotated(f):
|
||||
dt = self.df[self.df["filename"] == f]["datetime"].values[0]
|
||||
annotated_dates.add(pd.Timestamp(dt).date())
|
||||
df_remaining = df_remaining[~df_remaining["date"].isin(annotated_dates)]
|
||||
|
||||
if df_remaining.empty:
|
||||
raise RuntimeError("No remaining clips to annotate")
|
||||
|
||||
closest_clips, dates_list = [], []
|
||||
for date, group in df_remaining.groupby("date"):
|
||||
group = group.copy()
|
||||
group["time_seconds"] = (
|
||||
group["datetime"].dt.hour * 3600 + group["datetime"].dt.minute * 60
|
||||
)
|
||||
group["time_diff"] = (group["time_seconds"] - target_seconds).abs()
|
||||
closest = group.loc[group["time_diff"].idxmin()]
|
||||
closest_clips.append(closest["filename"])
|
||||
dates_list.append(date)
|
||||
|
||||
self.current_date = dates_list[0]
|
||||
return closest_clips[0]
|
||||
def _pick_next(self) -> Path:
|
||||
while self.index < len(self.clips):
|
||||
clip = self.clips[self.index]
|
||||
self.index += 1
|
||||
if not self.is_annotated(clip):
|
||||
return clip
|
||||
raise RuntimeError("No remaining clips to annotate")
|
||||
|
||||
@@ -1,44 +1,38 @@
|
||||
class Config:
|
||||
DISPLAY_MAX = 480
|
||||
FPS_FALLBACK = 25
|
||||
MAX_FRAMES = 100
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
QUESTIONS = [
|
||||
(
|
||||
"River",
|
||||
[
|
||||
("flow", "Flow Regime", ["Turbulent", "Laminar", "Uncertain"]),
|
||||
("shadows", "Strong Shadows", ["Yes", "No", "Uncertain"]),
|
||||
("artifacts", "Artifacts on River", ["Yes", "No", "Uncertain"]),
|
||||
],
|
||||
),
|
||||
(
|
||||
"Scene",
|
||||
[
|
||||
("lighting", "Lighting", ["Day", "Night", "Uncertain"]),
|
||||
@dataclass
|
||||
class AppConfig:
|
||||
display_max: int = 480
|
||||
fps_fallback: int = 25
|
||||
max_frames: int = 100
|
||||
data_dir: str = "data/clips"
|
||||
out_dir: str = "data/annotation_results"
|
||||
clips_file: str = "config/clips.txt"
|
||||
questions: list = field(default_factory=list)
|
||||
|
||||
def get_questions(self):
|
||||
return [
|
||||
(
|
||||
"exposure",
|
||||
"Exposure",
|
||||
["Overexposed", "Underexposed", "Both", "Normal", "Uncertain"],
|
||||
),
|
||||
],
|
||||
),
|
||||
(
|
||||
"Weather",
|
||||
[
|
||||
("snowing", "Snowing", ["Yes", "No", "Uncertain"]),
|
||||
("snow_on_ground", "Snow on Ground", ["Yes", "No", "Uncertain"]),
|
||||
],
|
||||
),
|
||||
]
|
||||
s["section"],
|
||||
[
|
||||
(
|
||||
item["key"],
|
||||
item["label"],
|
||||
[str(o) for o in item["options"]],
|
||||
str(item["default"]) if item.get("default") is not None else None,
|
||||
)
|
||||
for item in s["items"]
|
||||
],
|
||||
)
|
||||
for s in self.questions
|
||||
]
|
||||
|
||||
DEFAULTS = {
|
||||
"flow": "Laminar",
|
||||
"shadows": "No",
|
||||
"artifacts": "No",
|
||||
"lighting": "Day",
|
||||
"exposure": "Normal",
|
||||
"snowing": "No",
|
||||
"snow_on_ground": "No",
|
||||
}
|
||||
|
||||
def load_config(path: Path) -> AppConfig:
|
||||
with open(path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
return AppConfig(**data)
|
||||
|
||||
@@ -5,10 +5,8 @@ from pathlib import Path
|
||||
|
||||
import cv2
|
||||
|
||||
from .config import Config
|
||||
|
||||
|
||||
def load_frames(zip_path: Path, max_frames: int):
|
||||
def load_frames(zip_path: Path, max_frames: int, display_max: int, fps_fallback: int):
|
||||
video_bytes = zipfile.ZipFile(zip_path).read("left.mp4")
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
|
||||
@@ -16,14 +14,20 @@ def load_frames(zip_path: Path, max_frames: int):
|
||||
tmp_path = f.name
|
||||
|
||||
cap = cv2.VideoCapture(tmp_path)
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or Config.FPS_FALLBACK
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or fps_fallback
|
||||
|
||||
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
step = max(1, total // max_frames)
|
||||
|
||||
frames = []
|
||||
i = 0
|
||||
while len(frames) < max_frames:
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
|
||||
ok, frame = cap.read()
|
||||
if not ok:
|
||||
break
|
||||
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
||||
i += step
|
||||
|
||||
cap.release()
|
||||
os.unlink(tmp_path)
|
||||
@@ -32,7 +36,7 @@ def load_frames(zip_path: Path, max_frames: int):
|
||||
raise RuntimeError(f"No frames found in {zip_path}")
|
||||
|
||||
h, w = frames[0].shape[:2]
|
||||
scale = Config.DISPLAY_MAX / max(h, w)
|
||||
scale = display_max / max(h, w)
|
||||
dh, dw = int(h * scale), int(w * scale)
|
||||
|
||||
frames = [cv2.resize(f, (dw, dh)) for f in frames]
|
||||
|
||||
Reference in New Issue
Block a user