Source code for entry_exit_mouse_box.media_manager

import cv2
import os


[docs] def properties_match(p1, p2): if p1['total_frames'] != p2['total_frames']: return False if abs(p1['fps'] - p2['fps']) > 0.2: return False if (p1['width'] != p2['width']) or (p1['height'] != p2['height']): return False return True
[docs] class MediaManager: def __init__(self, viewer): self.sources = [] # (file_path, capture_instance, layer_name, process_function, image_category) self.properties = [] # Available keys: total_frames, fps, width, height self.current_frame = -1 # True index, not the displayed index (starts at 0) self.viewer = viewer # Instance of the Napari viewer self.logger = None self.active = False def __del__(self): self.release()
[docs] def set_logger(self, logger): self.logger = logger
[docs] def release(self): self.active = False for source in self.sources: _, capture, _, _, _ = source capture.release() self.sources.clear() self.properties.clear() self.current_frame = -1
[docs] def get_source_by_index(self, index): if index < 0 or index >= len(self.sources): raise ValueError("ERROR: Index out of range.") return self.sources[index]
[docs] def get_source_by_name(self, name): for source in self.sources: _, _, layer_name, _, _ = source if layer_name == name: return source return None
[docs] def get_n_frames(self): if len(self.properties) == 0: raise ValueError("ERROR: No media opened.") return self.properties[0]['total_frames']
[docs] def get_fps(self): if len(self.properties) == 0: raise ValueError("ERROR: No media opened.") return self.properties[0]['fps']
[docs] def get_width(self): if len(self.properties) == 0: raise ValueError("ERROR: No media opened.") return self.properties[0]['width']
[docs] def get_height(self): if len(self.properties) == 0: raise ValueError("ERROR: No media opened.") return self.properties[0]['height']
[docs] def get_n_sources(self): return len(self.sources)
[docs] def release_source(self, index): if index < 0 or index >= len(self.sources): raise ValueError("ERROR: Index out of range.") _, capture, _, _, _ = self.sources[index] capture.release() self.sources.pop(index) self.properties.pop(index) if index == 0: self.current_frame = -1 return None
[docs] def add_source(self, file_path, target_layer, img_type, process=None): """ Add a new video source to the manager. Args: file_path : Path of the video file to be opened. target_layer: Name of the layer to which each frame of the video will be loaded. img_type : Type of the image to be loaded ('image' or 'labels'). process : Function to be applied to each frame of the video before displaying it. Raises: FileNotFoundError: If the file is not found at the specified path. IOError : If the file cannot be opened. ValueError : If the properties of the video do not match the properties of the media already opened. Returns: The properties of the last video added under the form of a dictionary. """ self.active = True if not os.path.exists(file_path): raise FileNotFoundError(f"ERROR: File not found at {file_path}") # Checking that the target name is not already in use. If it is, release the source before adding the new one. for idx, source in enumerate(self.sources): if source[2] == target_layer: self.release_source(idx) capture = cv2.VideoCapture(file_path) if not capture.isOpened(): raise IOError("ERROR: Failed to open video file.") properties = { 'total_frames': int(capture.get(cv2.CAP_PROP_FRAME_COUNT)), 'fps' : capture.get(cv2.CAP_PROP_FPS), 'width' : int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)), 'height' : int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) } # Checking that the file is not already opened. If it is, return the known properties. for idx, source in enumerate(self.sources): if source[0] == file_path: print("The file is already opened.") return properties[idx] # Checking that the properties are compatible with the media already opened. if len(self.properties) > 0 and not properties_match(properties, self.properties[0]): self.logger.error("The properties of the video do not match the properties of the media already opened.") self.logger.error(f"Video properties: {properties}") self.logger.error(f"Media properties: {self.properties[0]}") raise ValueError("ERROR: The properties of the video do not match the properties of the media already opened.") self.sources.append((file_path, capture, target_layer, process, img_type)) self.properties.append(properties) if self.current_frame == -1: self.current_frame = 0 # Set the correct frame index and extract the given frame capture.set(cv2.CAP_PROP_POS_FRAMES, self.current_frame) ret, frame = capture.read() if process is not None: frame = process(frame) if not ret: raise IOError("ERROR: Failed to read frame.") if target_layer in self.viewer.layers: layer = self.viewer.layers[target_layer] layer.data = frame else: if img_type == "image": layer = self.viewer.add_image( frame, name=target_layer ) elif img_type == "labels": layer = self.viewer.add_labels( frame, name=target_layer, blending="additive" ) else: raise ValueError("ERROR: The image type is not recognized.") return properties
[docs] def set_frame(self, frame_target): if len(self.sources) == 0: if self.active: raise ValueError("ERROR: No media opened.") else: return frame_number = int(frame_target) frame_number = max(0, frame_number) frame_number = min(frame_number, self.properties[0]['total_frames']-1) if frame_number == self.current_frame: return self.current_frame self.current_frame = frame_number for source in self.sources: _, capture, target_layer, process, img_type = source capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = capture.read() if process is not None: frame = process(frame) if not ret: print("ERROR: Failed to read frame.") return None if target_layer in self.viewer.layers: layer = self.viewer.layers[target_layer] layer.data = frame else: if img_type == "image": self.viewer.add_image( frame, name=target_layer ) elif img_type == "labels": self.viewer.add_labels( frame, name=target_layer ) else: raise ValueError("ERROR: The image type is not recognized.")
[docs] def get_video_properties(self): return self.properties
[docs] def get_current_frame_number(self): return self.current_frame