Source code for reip.blocks.video.core

import time
import cv2
import reip


[docs]class Video(reip.Block): cap = None def __init__(self, index=None, file=None, fps=30, size=None, **kw): self.index = index self.size = size self.fps = fps super().__init__(n_inputs=None, **kw)
[docs] def init(self): self.cap = cv2.VideoCapture(self.index) msg = '' if self.size: h, w = self.size wset = self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, int(w)) hset = self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, int(h)) msg = f' (desired window size: {h}x{w})' h = self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT) w = self.cap.get(cv2.CAP_PROP_FRAME_WIDTH) self.log.debug(f'window size: {w}x{h}' + msg)
[docs] def process(self, x=None, meta=None): if not self.cap.isOpened(): print('closing', self.cap.isOpened()) return reip.CLOSE ret, frame = self.cap.read() if not ret: raise RuntimeError(f'Video frame grab failed for {self}') return [frame[:, ::-1]], { 'time': time.time(), 'dim': frame.shape, 'fps': self.fps }
[docs] def finish(self): self.cap.release()
class _VideoWriter(reip.util.CycledWriter): # CODECS: XVID, MJPG, MPEG, MP4V def __init__(self, filename, duration=600, codec='X264', **kw): self.duration = duration self.codec = codec super().__init__(filename) def _new_writer(self, fname, meta): if self.duration: self.file_length = int(meta['fps'] * self.duration) print(meta['dim']) return cv2.VideoWriter( fname, cv2.VideoWriter_fourcc(*self.codec), meta['fps'], meta['dim'][:2][::-1]) def _close_writer(self): if self._writer is not None: self._writer.release()
[docs]class VideoWriter(reip.Block): def __init__(self, filename, **kw): self.filename = filename super().__init__(extra_kw=True, **kw)
[docs] def init(self): self.writer = _VideoWriter(self.filename, **self.extra_kw)
[docs] def process(self, X, meta): writer = self.writer.get(meta) writer.write(X) self.writer.increment() return self.writer.output_closed_file()
[docs] def finish(self): self.writer.close()
# class VideoShow(reip.Block): # '''XXX: Doesn't work in multithreaded environments. use stream_imshow''' # def __init__(self, window_name=None, quit_key='q', **kw): # super().__init__(n_outputs=0, **kw) # self.window_name = window_name or self.name # self.quit_keys = {ord(k) for k in quit_key} # # def process(self, X, meta): # cv2.imshow(self.window_name, X) # if cv2.waitKey(25) & 0xFF in self.quit_keys: # return reip.CLOSE # return [], {} # # def finish(self): # cv2.destroyWindow(self.window_name) def stream_imshow(stream, *names): try: for name in names: cv2.namedWindow(name) with stream: for xs, meta in stream: for name, x in zip(names, xs): if x is not None: cv2.imshow(name, x) if cv2.waitKey(25) & 0xFF == ord('q'): return yield finally: for name in names: cv2.destroyWindow(name)