First implementation

This commit is contained in:
2026-05-01 09:32:48 +02:00
parent 9cee33c405
commit 5c53dcad52
5 changed files with 2255 additions and 3 deletions

3
.gitignore vendored
View File

@@ -9,3 +9,6 @@
# OSX-specific
.DS_Store
# Data
data/**

View File

@@ -0,0 +1,100 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "fe0521db",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"import json\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from PIL import Image\n",
"from IPython.display import display, Image as IPImage"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6d7ebbf",
"metadata": {},
"outputs": [],
"source": [
"out_dir = Path(\"../data/annotation_results/\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "75efc15a",
"metadata": {},
"outputs": [],
"source": [
"def show_result(folder):\n",
" folder = Path(folder)\n",
"\n",
" with open(folder / \"metadata.json\") as f:\n",
" metadata = json.load(f)\n",
"\n",
" frame = np.array(Image.open(folder / \"frame.png\"))\n",
" mask = np.array(Image.open(folder / \"mask_vis.png\"))\n",
" overlay = np.array(Image.open(folder / \"overlay.png\"))\n",
"\n",
" title = \" | \".join(f\"{k}: {v}\" for k, v in metadata.items())\n",
" fig, axs = plt.subplots(1, 3, figsize=(15, 5))\n",
" axs[0].imshow(frame)\n",
" axs[0].set_title(\"Frame\")\n",
" axs[0].axis(\"off\")\n",
" axs[1].imshow(mask, cmap=\"gray\")\n",
" axs[1].set_title(\"Mask\")\n",
" axs[1].axis(\"off\")\n",
" axs[2].imshow(overlay)\n",
" axs[2].set_title(\"Overlay\")\n",
" axs[2].axis(\"off\")\n",
" plt.suptitle(f\"{folder.name}\\n{title}\", fontsize=9)\n",
" plt.tight_layout()\n",
" plt.show()\n",
"\n",
" for gif_name in [\"video_original_lowres.gif\", \"video_overlay_lowres.gif\"]:\n",
" gif_path = folder / gif_name\n",
" if gif_path.exists():\n",
" display(IPImage(filename=str(gif_path)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "02ff1ae9",
"metadata": {},
"outputs": [],
"source": [
"for folder in sorted(out_dir.iterdir()):\n",
" if folder.is_dir() and (folder / \"metadata.json\").exists():\n",
" show_result(folder)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "river-annotation-tool",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -13,7 +13,12 @@ description = ""
readme = "README.md"
requires-python = "~=3.12.0"
dependencies = [
# TODO configure install dependencies
"opencv-contrib-python-headless==4.12.0.88",
"pandas>=2.3.3",
"matplotlib>=3.10.8",
"matplotlib-inline>=0.2.1",
"pillow>=12.2.0",
"pyside6>=6.11.0",
]
dynamic = ["version"]

View File

@@ -0,0 +1,535 @@
import os
import zipfile
import tempfile
import json
import argparse
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from matplotlib import use
use("QtAgg")
from PySide6.QtWidgets import (
QApplication,
QMainWindow,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QLabel,
QRadioButton,
QButtonGroup,
QGroupBox,
QSlider,
)
from PySide6.QtCore import Qt, QTimer
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
# ─────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────
class Config:
DISPLAY_MAX = 480
FPS_FALLBACK = 25
MAX_FRAMES = 100
# ─────────────────────────────────────────────
# QUESTIONS
# ─────────────────────────────────────────────
QUESTIONS = [
(
"River",
[
("flow", "Flow Regime", ["Turbulent", "Laminar", "Uncertain"]),
("shadows", "Strong Shadows", ["Yes", "No", "Uncertain"]),
("artifacts", "Artifacts on River", ["Yes", "No", "Uncertain"]),
],
),
(
"Scene",
[
("lighting", "Lighting", ["Day", "Night", "Uncertain"]),
(
"exposure",
"Exposure",
["Overexposed", "Underexposed", "Both", "Normal", "Uncertain"],
),
],
),
(
"Weather",
[
("snowing", "Snowing", ["Yes", "No", "Uncertain"]),
("snow_on_ground", "Snow on Ground", ["Yes", "No", "Uncertain"]),
],
),
]
# ─────────────────────────────────────────────
# DEFAULTS
# ─────────────────────────────────────────────
DEFAULTS = {
"flow": "Laminar",
"shadows": "No",
"artifacts": "No",
"lighting": "Day",
"exposure": "Normal",
"snowing": "No",
"snow_on_ground": "No",
}
# ─────────────────────────────────────────────
# VIDEO LOADING
# ─────────────────────────────────────────────
def load_frames(zip_path: Path, max_frames: int):
video_bytes = zipfile.ZipFile(zip_path).read("left.mp4")
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_bytes)
tmp_path = f.name
cap = cv2.VideoCapture(tmp_path)
fps = cap.get(cv2.CAP_PROP_FPS) or Config.FPS_FALLBACK
frames = []
while len(frames) < max_frames:
ok, frame = cap.read()
if not ok:
break
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
cap.release()
os.unlink(tmp_path)
if not frames:
raise RuntimeError(f"No frames found in {zip_path}")
h, w = frames[0].shape[:2]
scale = Config.DISPLAY_MAX / max(h, w)
dh, dw = int(h * scale), int(w * scale)
frames = [cv2.resize(f, (dw, dh)) for f in frames]
return frames, fps, dh, dw, h, w
# ─────────────────────────────────────────────
# MAIN APP
# ─────────────────────────────────────────────
class Annotator(QMainWindow):
def __init__(self, data_dir: Path, out_dir: Path, clip: str = None):
super().__init__()
self.data_dir = Path(data_dir)
self.out_dir = Path(out_dir)
self.history = []
self.erase_mode = False
self.frame_i = 0
self.drawing = False
self._pending_answers = None
self.setWindowTitle("River Annotator")
self.df = self._load_dataset()
self._load_clip(specific=clip)
self._init_canvas()
self._init_ui()
self._init_timer()
# ─────────────────────────────
# DATA
# ─────────────────────────────
def _load_dataset(self):
files = list(self.data_dir.glob("*.zip"))
if not files:
raise FileNotFoundError(f"No zip files in {self.data_dir}")
df = pd.DataFrame({"filename": files})
df["datetime"] = df["filename"].apply(
lambda x: pd.to_datetime(x.stem.split("_")[1], errors="coerce")
)
return df
def _load_clip(self, specific: str = None):
if specific is not None:
matches = list(self.data_dir.glob(f"{specific}.zip"))
if not matches:
p = self.data_dir / specific
matches = [p] if p.exists() else []
if not matches:
raise FileNotFoundError(f"Clip '{specific}' not found in {self.data_dir}")
self.filename = matches[0]
else:
remaining = [
f
for f in self.df["filename"]
if not (self.out_dir / f.stem / "mask.png").exists()
]
if not remaining:
raise RuntimeError("No remaining clips to annotate")
self.filename = np.random.choice(remaining)
self.frames, self.fps, self.dh, self.dw, self.h, self.w = load_frames(
self.filename, Config.MAX_FRAMES
)
self.history = []
self.mask = np.zeros((self.dh, self.dw), dtype=np.uint8)
self._pending_answers = None
out = self.out_dir / self.filename.stem
mask_path = out / "mask.png"
meta_path = out / "metadata.json"
if mask_path.exists():
mask_full = np.array(Image.open(mask_path).convert("L"))
self.mask = cv2.resize(
(mask_full > 127).astype(np.uint8),
(self.dw, self.dh),
interpolation=cv2.INTER_NEAREST,
)
if meta_path.exists():
with open(meta_path) as f:
self._pending_answers = json.load(f)
def _set_answers(self, answers: dict):
for key, value in answers.items():
if key not in self.q_widgets:
continue
_, buttons, options = self.q_widgets[key]
for i, btn in enumerate(buttons):
btn.setChecked(options[i] == value)
# ─────────────────────────────
# UI
# ─────────────────────────────
def _init_canvas(self):
self.fig = Figure()
self.canvas = FigureCanvas(self.fig)
self.ax = self.fig.add_subplot(111)
self.ax.axis("off")
self.img = self.ax.imshow(self.frames[0])
self.mask_img = self.ax.imshow(np.zeros((self.dh, self.dw, 4)))
self.title_text = self.ax.text(
5,
15,
self.filename.name,
color="white",
fontsize=10,
bbox=dict(facecolor="black", alpha=0.5),
)
def _init_ui(self):
self.q_widgets = {}
question_box = QVBoxLayout()
for section, qs in QUESTIONS:
group = QGroupBox(section)
vbox = QVBoxLayout()
for key, label, options in qs:
vbox.addWidget(QLabel(label))
btn_group = QButtonGroup(self)
row = QHBoxLayout()
buttons = []
default_value = DEFAULTS.get(key)
for opt in options:
btn = QRadioButton(opt)
btn_group.addButton(btn)
row.addWidget(btn)
buttons.append(btn)
if default_value == opt:
btn.setChecked(True)
if default_value is None and buttons:
buttons[-1].setChecked(True)
self.q_widgets[key] = (btn_group, buttons, options)
vbox.addLayout(row)
group.setLayout(vbox)
question_box.addWidget(group)
# Controls
self.btn_save = QPushButton("Save")
self.btn_next = QPushButton("Next")
self.btn_skip = QPushButton("Skip")
self.btn_clear = QPushButton("Clear")
self.btn_erase = QPushButton("Eraser")
self.btn_undo = QPushButton("Undo")
self.btn_reload = QPushButton("Reload Saved")
self.brush_slider = QSlider(Qt.Horizontal)
self.brush_slider.setRange(2, 50)
self.brush_slider.setValue(5)
row1 = QHBoxLayout()
for b in [self.btn_save, self.btn_next, self.btn_skip]:
row1.addWidget(b)
row2 = QHBoxLayout()
for b in [self.btn_clear, self.btn_erase, self.btn_undo, self.btn_reload]:
row2.addWidget(b)
row2.addWidget(QLabel("Brush"))
row2.addWidget(self.brush_slider)
left = QVBoxLayout()
left.addWidget(self.canvas)
left.addLayout(row1)
left.addLayout(row2)
main = QHBoxLayout()
left_widget = QWidget()
left_widget.setLayout(left)
right_widget = QWidget()
right_widget.setLayout(question_box)
main.addWidget(left_widget, 3)
main.addWidget(right_widget, 2)
container = QWidget()
container.setLayout(main)
self.setCentralWidget(container)
# events
self.btn_save.clicked.connect(self.save)
self.btn_next.clicked.connect(self.next_clip)
self.btn_skip.clicked.connect(self.skip_clip)
self.btn_clear.clicked.connect(self.clear_mask)
self.btn_erase.clicked.connect(self.toggle_eraser)
self.btn_undo.clicked.connect(self.undo)
self.btn_reload.clicked.connect(self.reload_saved)
self.canvas.mpl_connect("button_press_event", self.on_press)
self.canvas.mpl_connect("motion_notify_event", self.on_move)
self.canvas.mpl_connect("button_release_event", self.on_release)
if self._pending_answers:
self._set_answers(self._pending_answers)
self._pending_answers = None
def _init_timer(self):
self.timer = QTimer()
self.timer.timeout.connect(self.update_frame)
self.timer.start(int(1000 / self.fps))
# ─────────────────────────────
# ANNOTATION
# ─────────────────────────────
def get_answers(self):
out = {}
for key, (group, buttons, options) in self.q_widgets.items():
for i, btn in enumerate(buttons):
if btn.isChecked():
out[key] = options[i]
return out
def stamp(self, x, y):
if x is None or y is None:
return
self.history.append(self.mask.copy())
r = self.brush_slider.value()
ix, iy = int(x), int(y)
y0, y1 = max(0, iy - r), min(self.dh, iy + r + 1)
x0, x1 = max(0, ix - r), min(self.dw, ix + r + 1)
Y, X = np.ogrid[y0:y1, x0:x1]
circle = (X - ix) ** 2 + (Y - iy) ** 2 <= r**2
self.mask[y0:y1, x0:x1][circle] = 0 if self.erase_mode else 1
self.redraw_mask()
def redraw_mask(self):
rgba = np.zeros((self.dh, self.dw, 4))
rgba[..., 1] = self.mask * 0.7
rgba[..., 3] = self.mask * 0.4
self.mask_img.set_data(rgba)
self.canvas.draw_idle()
# ─────────────────────────────
# EVENTS
# ─────────────────────────────
def on_press(self, e):
if e.xdata is None:
return
self.drawing = True
self.stamp(e.xdata, e.ydata)
def on_move(self, e):
if self.drawing:
self.stamp(e.xdata, e.ydata)
def on_release(self, _):
self.drawing = False
def update_frame(self):
self.frame_i = (self.frame_i + 1) % len(self.frames)
self.img.set_data(self.frames[self.frame_i])
self.canvas.draw_idle()
# ─────────────────────────────
# HELPERS
# ─────────────────────────────
def _make_overlay(self, frame, alpha=0.4):
overlay = frame.copy()
green = np.zeros_like(frame)
green[..., 1] = 255
m = self.mask.astype(bool)
overlay[m] = (1 - alpha) * overlay[m] + alpha * green[m]
return overlay.astype(np.uint8)
def _save_gif(self, frames, out_path, scale=1.0):
h, w = frames[0].shape[:2]
nh, nw = max(1, int(h * scale)), max(1, int(w * scale))
pil_frames = [Image.fromarray(cv2.resize(f, (nw, nh))) for f in frames]
pil_frames[0].save(
out_path,
save_all=True,
append_images=pil_frames[1:],
duration=int(1000 / self.fps),
loop=0,
)
# ─────────────────────────────
# ACTIONS
# ─────────────────────────────
def reload_saved(self):
out = self.out_dir / self.filename.stem
mask_path = out / "mask.png"
meta_path = out / "metadata.json"
if not mask_path.exists():
return
mask_full = np.array(Image.open(mask_path).convert("L"))
self.mask = cv2.resize(
(mask_full > 127).astype(np.uint8),
(self.dw, self.dh),
interpolation=cv2.INTER_NEAREST,
)
self.history = []
self.redraw_mask()
if meta_path.exists():
with open(meta_path) as f:
self._set_answers(json.load(f))
def clear_mask(self):
self.mask[:] = 0
self.redraw_mask()
def undo(self):
if self.history:
self.mask = self.history.pop()
self.redraw_mask()
def toggle_eraser(self):
self.erase_mode = not self.erase_mode
self.btn_erase.setText("Eraser ON" if self.erase_mode else "Eraser")
def save(self):
out = self.out_dir / self.filename.stem
out.mkdir(parents=True, exist_ok=True)
mask_full = cv2.resize(
self.mask.astype(np.uint8),
(self.w, self.h),
interpolation=cv2.INTER_NEAREST,
)
Image.fromarray(mask_full * 255).save(out / "mask.png")
with open(out / "metadata.json", "w") as f:
json.dump(self.get_answers(), f, indent=2)
# images
mid = len(self.frames) // 2
frame = self.frames[mid]
overlay_frame = self._make_overlay(frame)
Image.fromarray(frame).save(out / "frame.png")
Image.fromarray((self.mask * 255).astype(np.uint8)).save(out / "mask_vis.png")
Image.fromarray(overlay_frame).save(out / "overlay.png")
# GIFs — original and overlay, high-res (display size) and low-res (half)
overlay_frames = [self._make_overlay(f) for f in self.frames]
self._save_gif(self.frames, out / "video_original_hires.gif", scale=1.0)
self._save_gif(self.frames, out / "video_original_lowres.gif", scale=0.5)
self._save_gif(overlay_frames, out / "video_overlay_hires.gif", scale=1.0)
self._save_gif(overlay_frames, out / "video_overlay_lowres.gif", scale=0.5)
print("Saved:", out)
def next_clip(self):
self.save()
self._load_clip()
self.frame_i = 0
self.img.set_data(self.frames[0])
self.title_text.set_text(self.filename.name)
self.redraw_mask()
if self._pending_answers:
self._set_answers(self._pending_answers)
self._pending_answers = None
def skip_clip(self):
self._load_clip()
self.frame_i = 0
self.img.set_data(self.frames[0])
self.title_text.set_text(self.filename.name)
self.redraw_mask()
if self._pending_answers:
self._set_answers(self._pending_answers)
self._pending_answers = None
# ─────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--data", default="../torrent-flow/data/examples_for_annotations/")
parser.add_argument("--out", default="data/annotation_results/")
parser.add_argument("--clip", default=None, help="Stem name of a specific clip to load (e.g. 'left_20230501')")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
app = QApplication([])
win = Annotator(Path(args.data), Path(args.out), clip=args.clip)
win.show()
app.exec()

1613
uv.lock generated

File diff suppressed because it is too large Load Diff