import io import os import tempfile import zipfile import cv2 def load_frames( zip_path, max_frames: int, display_max: int, fps_fallback: int, video_in_zip: str = "left.mp4", video_tmp_suffix: str = ".mp4", fs=None, ): if fs is None: video_bytes = zipfile.ZipFile(zip_path).read(video_in_zip) else: with fs.open(str(zip_path), "rb") as f: video_bytes = zipfile.ZipFile(io.BytesIO(f.read())).read(video_in_zip) with tempfile.NamedTemporaryFile(suffix=video_tmp_suffix, delete=False) as f: f.write(video_bytes) tmp_path = f.name cap = cv2.VideoCapture(tmp_path) fps = cap.get(cv2.CAP_PROP_FPS) or fps_fallback total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) step = max(1, total // max_frames) frames = [] i = 0 while len(frames) < max_frames: cap.set(cv2.CAP_PROP_POS_FRAMES, i) ok, frame = cap.read() if not ok: break frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) i += step cap.release() os.unlink(tmp_path) if not frames: raise RuntimeError(f"No frames found in {zip_path}") h, w = frames[0].shape[:2] scale = display_max / max(h, w) dh, dw = int(h * scale), int(w * scale) frames = [cv2.resize(f, (dw, dh)) for f in frames] return frames, fps, dh, dw, h, w