import cv2
import threading
import os
import tifffile
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import time

[docs] class MaskFromBackground(object): def __init__(self, input_video_path, output_video_path, ref, tr=75, st={}, r=None, frame_count=64): self.input_video_path = input_video_path self.output_video_path = output_video_path self.frame_count = frame_count self.reference = ref = cv2.VideoCapture(input_video_path) self.ttl_frames = int( fps = self.processed_frames = {} self.lock = threading.Lock() self.condition = threading.Condition(self.lock) fourcc = cv2.VideoWriter_fourcc(*'XVID') self.video_out = cv2.VideoWriter(output_video_path, fourcc, fps, (ref.shape[1], ref.shape[0]), isColor=False) self.threshold = tr self.expected_index = 0 self.current_index = 0 = int(self.ttl_frames/frame_count)+1 self.start = st self.reader_pos = 0 self.regions = np.zeros((ref.shape[0], ref.shape[1]), np.uint8) if r is None else r
[docs] def read_frames(self): frames = [] p1 = self.reader_pos p2 = p1 + self.frame_count self.reader_pos = p2 for _ in range(self.frame_count): ret, frame = if not ret: break frames.append(frame) return (p1, p2), frames
def _process_frames(self, batch, frames): buffer_out = [] for i, frame in enumerate(frames): if batch[0] + i < self.start: diff = np.zeros(self.reference.shape[0:2], bool) else: processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) diff = np.abs(processed_frame - self.reference) > self.threshold buffer_out.append(diff) return buffer_out
[docs] def process_frames(self, batch, frames): buffer_out = [] for i, frame in enumerate(frames): bw_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) mask = np.zeros(bw_frame.shape, bool) pos = batch[0] + i for lbl, start in self.start.items(): if pos >= start: mask[self.regions == lbl] = True diff = np.logical_and(np.abs(bw_frame - self.reference) > self.threshold, mask) buffer_out.append(diff) return buffer_out
[docs] def worker(self): while True: frames = None launch_index = 0 with self.lock: batch, frames = self.read_frames() launch_index = self.current_index self.current_index += 1 if not frames: break processed_frames = self.process_frames(batch, frames) with self.condition: self.processed_frames[launch_index] = processed_frames while self.processed_frames.get(self.expected_index): print(f"{self.expected_index+1}/{}") self.add_frames_to_video(self.processed_frames.pop(self.expected_index)) self.expected_index += 1 self.condition.notify_all()
[docs] def add_frames_to_video(self, frames): for frame in frames: self.video_out.write(np.uint8(frame * 255))
[docs] def start_processing(self, num_workers=16): with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(self.worker) for _ in range(num_workers)] for future in futures: future.result()
[docs] def release_resources(self): self.video_out.release()
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # from qtpy.QtCore import QThread, QObject, QTimer, Qt, Signal, Slot from PyQt5.QtCore import pyqtSignal
[docs] class QtWorkerMFV(QObject): mask_ready = pyqtSignal(str) def __init__(self, in_path, out_path, ref, t, s, r): super().__init__() self.in_path = in_path self.out_path = out_path self.ref = ref self.threshold = t self.start = s self.regions = r
[docs] def run(self): mfb = MaskFromBackground(self.in_path, self.out_path, self.ref, self.threshold, self.start, self.regions) mfb.start_processing() self.mask_ready.emit(self.out_path)
if __name__ == "__main__":