Source code for entry_exit_mouse_box.measures

from concurrent.futures import ThreadPoolExecutor
import threading
import cv2
import numpy as np
import time
import os
from skimage.measure import regionprops

N_THREADS = int(min(4, os.cpu_count() // 2))
print(f"Using {N_THREADS} threads.")

[docs] class MiceVisibilityProcessor(object): """ Calculates an array indicating for each box (designated by the labels in 'areas') if a mouse is inside or not. The process is realized on several threads. The input video was saved as a mask (0=BG, 255=FG) but it requires thresholding anyway due to compression. The areas are a grayscale image with one value per box (0=BG). To process the presence of a mouse, we use the length of the ellipse fitted to the mouse's label. It requires the input image to be calibrated. The process doesn't start from the frame 0 but from the frame 'start'. We don't need a control structure to write in the buffer as the threads are not writing in the same place. """ def __init__(self, mask_path, areas, ma, start, duration): self.video_path = mask_path self.track_duration = int(duration) self.video_stream = cv2.VideoCapture(mask_path) self.lock = threading.Lock() self.labeled_boxes = areas self.n_frames = int(self.video_stream.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.video_stream.get(cv2.CAP_PROP_FPS) self.box_ids = set([int(i) for i in np.unique(self.labeled_boxes) if int(i) != 0]) self.current_frame = 0 self.min_trk_length = ma self.starting_frames = {k: v+1 for k, v in start.items()} self.ranges = self.split_frame_ranges(N_THREADS, self.n_frames) print(f"Total frames: {self.n_frames}") print(f"FPS: {self.fps}") print(f"Boxes: {self.box_ids}") print(f"Starters: {self.starting_frames}") self.instant_visibility = np.zeros((len(self.box_ids), self.n_frames), np.int8) self.instant_centroids = np.zeros((self.n_frames, len(self.box_ids), 2), float) self.all_sessions = None self.instant_centroids.fill(-1.0) self.video_stream.release() self.video_stream = None
[docs] def process_visibility_pos(self, interval): video_stream = cv2.VideoCapture(self.video_path) video_stream.set(cv2.CAP_PROP_POS_FRAMES, interval[0]) for i in range(interval[1] - interval[0]): _, frame = video_stream.read() mask = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) > 127 mask = mask.astype(np.float32) mask *= self.labeled_boxes mask = mask.astype(np.uint8) all_props = regionprops(mask) for p in all_props: if p.label == 0: continue l = int(p.label) y = int(p.centroid[0]) x = int(p.centroid[1]) self.instant_visibility[l-1, interval[0] + i] = 1 self.instant_centroids[interval[0] + i, l-1] = (y, x) # Export both arrays on the disk video_stream.release()
[docs] def worker(self, thread_id): self.process_visibility_pos(self.ranges[thread_id])
[docs] def get_session_length(self, box_rank, i1, i2): """ Returns the distance traveled by the mouse during the session. Uses the sum of the distances between consecutive centroids. """ if i1 == i2: return 0.0 points = self.instant_centroids[i1:i2, box_rank] distance = 0.0 for i in range(len(points)-1): distance += np.linalg.norm(points[i+1] - points[i]) return distance
[docs] def smooth_centroids(self, window_size=5): smoothed = np.full_like(self.instant_centroids, np.nan, dtype=np.float32) N = self.instant_centroids.shape[0] for box_rank in range(len(self.box_ids)): half_w = window_size // 2 for i in range(N): start = max(0, i - half_w) end = min(N, i + half_w + 1) window = self.instant_centroids[start:end, box_rank] valid = (window >= 0).all(axis=1) if valid.any(): smoothed[i, box_rank] = window[valid].mean(axis=0) else: smoothed[i, box_rank] = self.instant_centroids[i, box_rank] self.instant_centroids = smoothed
[docs] def filter_visibility(self): for box_rank in range(len(self.box_ids)): swaps = [] first_frame = self.starting_frames[box_rank+1] - 1 last_frame_idx = min(first_frame + self.track_duration, self.n_frames) for f in range(self.n_frames-1): if f < first_frame: self.instant_visibility[box_rank, f] = -1 self.instant_centroids[f, box_rank] = (-1.0, -1.0) continue if f >= last_frame_idx: self.instant_visibility[box_rank, f] = -2 self.instant_centroids[f, box_rank] = (-1.0, -1.0) continue # State transition, the next session starts at (f+1) if (self.instant_visibility[box_rank, f] != self.instant_visibility[box_rank, f+1]) or (f == last_frame_idx-1): # First transition: we don't care about the duration of the session if len(swaps) == 0: swaps.append(f+1) continue # We are in an unstable state. if (self.get_session_length(box_rank, swaps[-1], f) < self.min_trk_length): swaps.append(f+1) else: for i in range(swaps[0], swaps[-1]): self.instant_visibility[box_rank, i] = 0 self.instant_centroids[i, box_rank] = (-1.0, -1.0) swaps = [f+1] np.save("/tmp/visibility-02.npy", self.instant_visibility) np.save("/tmp/centroids-02.npy", self.instant_centroids)
[docs] def split_frame_ranges(self, n_threads, n_frames): ranges = [] base = n_frames // n_threads remainder = n_frames % n_threads start = 0 for i in range(n_threads): end = start + base + (1 if i < remainder else 0) ranges.append((start, end)) start = end return ranges
[docs] def start_processing(self, num_workers=N_THREADS): print("(1/3) Processing visibility...") with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(self.worker, i) for i in range(num_workers)] for future in futures: future.result() print("(2/3) Processing number of in/out...") self.smooth_centroids() self.filter_visibility() print("(3/3) Processing sessions time and distance...") self.process_sessions() np.save("/tmp/visibility-03.npy", self.instant_visibility) np.save("/tmp/centroids-03.npy", self.instant_centroids)
[docs] def process_sessions(self): """ We call 'session' the span of time during which the mouse is hidden or visible. For each box, a video is a succession of sessions, alternating between hidden and visible. During a session, the mouse is either hidden or visible. A session is defined by a duration (in seconds) and a distance (in pixels). """ self.all_sessions = {} for box in range(len(self.box_ids)): sessions = [] count = 0 start = 0 for f in range(self.n_frames-1): state = (self.instant_visibility[box, f], self.instant_visibility[box, f+1]) if (state[0] == state[1]): continue # cases: (-1, 0), (-1, 1), (0, 1), (1, 0), (1, -2), (0, -2) if state[0] == -1: # Track's starting start = f+2 else: count += 1 sessions.append({ 'start' : start, 'end' : f + 1, 'duration' : f - start + 1, 'distance' : float(self.get_session_length(box, start-1, f+1)), 'status' : int(state[0]) }) start = f + 2 if state[1] == -2: # end of this track self.all_sessions[box] = { 'sessions': sessions, 'count' : count } break
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # from qtpy.QtCore import QThread, QObject, QTimer, Qt, Signal, Slot from PyQt5.QtCore import pyqtSignal
[docs] class QtWorkerMVP(QObject): measures_ready = pyqtSignal(np.ndarray, np.ndarray, dict) def __init__(self, mask_path, areas, ma, start, duration): super().__init__() self.mask_path = mask_path self.areas = areas self.min_length = ma self.start = start self.duration = duration
[docs] def run(self): mvp = MiceVisibilityProcessor(self.mask_path, self.areas, self.min_length, self.start, self.duration) mvp.start_processing() visibility = mvp.instant_visibility all_sessions = mvp.all_sessions centroids = mvp.instant_centroids self.measures_ready.emit(visibility, centroids, all_sessions)
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # if __name__ == "__main__": import tifffile mask_path = "/media/benedetti/5B0AAEC37149070F/mice-videos/2084-2086-2104-2106-T0.tmp/mask.avi" start_f = { 1: 820 } areas_path = "/media/benedetti/5B0AAEC37149070F/mice-videos/2084-2086-2104-2106-T0.tmp/labeled-areas.tif" areas = tifffile.imread(areas_path) scale = 1.17 unit = "mm" start = time.time() mvp = MiceVisibilityProcessor(mask_path, areas, 55, start_f, int(5*60*59.617)) print(mvp.ranges) mvp.start_processing() duration = time.time() - start print(f"{duration:.2f}, {N_THREADS}") from pprint import pprint sessions = mvp.all_sessions pprint(sessions)