Source code for allensdk.eye_tracking.frame_stream
import logging
import traceback
import cv2
[docs]class FrameInputStream(object):
def __init__(self, movie_path, num_frames=None, process_frame_cb=None):
self.movie_path = movie_path
self._num_frames = num_frames
self._start = 0
self._stop = num_frames
self._step = 1
self._i = self._start - self._step
self._last_i = 0
if process_frame_cb:
self.process_frame_cb = process_frame_cb
else:
self.process_frame_cb = lambda f: f[:, :, 0].copy()
self.frames_read = 0
self.frame_cache = []
def __getitem__(self, key):
if isinstance(key, int):
if key >= self.num_frames or key < -self.num_frames:
raise IndexError("Index {} out of range".format(key))
elif key >= 0:
self._start = key
else:
self._start = self.num_frames + key
self._stop = self._start + 1
self._step = 1
return list(self)[0] # force iteration and closing
elif isinstance(key, slice):
if key.step == 0:
raise ValueError("slice step cannot be 0")
self._start = key.start if key.start is not None else 0
if key.stop is None:
self._stop = self.num_frames
elif key.stop < 0:
self._stop = max(self.num_frames + key.stop, -1)
else:
self._stop = min(self.num_frames, key.stop)
self._step = key.step if key.step is not None else 1
return self
else:
raise KeyError("Key must be non-negative integer or slice, not {}"
.format(key))
@property
def num_frames(self):
return self._num_frames if self._num_frames is not None else 0
@property
def frame_shape(self):
raise NotImplementedError(("frame_shape must be implemented in a "
"subclass"))
def _error(self):
pass
def _seek_frame(self, i):
raise NotImplementedError(("_seek_frame must be implemented in a "
"subclass"))
def _get_frame(self, i):
raise NotImplementedError(("_get_frame must be implemented in a "
"subclass"))
[docs] def get_frame(self, i):
if abs(i - self._last_i) > 1:
self._seek_frame(i)
self._last_i = self._i
self._i = i
self.frames_read += 1
if self.frames_read % 100 == 0:
logging.debug("Read frames %d", self.frames_read)
return self.process_frame_cb(self._get_frame(self._i))
def __enter__(self):
return self
def __iter__(self):
self._last_i = 0
self._i = self._start - self._step
self.open()
logging.debug("Iterating over %s from %d to %s by step %d" %
(self.movie_path, self._start, self._stop, self._step))
return self
def __next__(self):
if self._stop is None:
self._stop = self.num_frames
self._i = self._i + self._step
if (self._step < 0 and self._i <= self._stop) or \
(self._step > 0 and self._i >= self._stop):
self.close()
raise StopIteration()
else:
return self.get_frame(self._i)
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
traceback.print_tb(tb)
self._error()
raise exc_value
[docs]class CvInputStream(FrameInputStream):
def __init__(self, movie_path, num_frames=None, process_frame_cb=None):
super(CvInputStream, self).__init__(movie_path=movie_path,
num_frames=num_frames,
process_frame_cb=process_frame_cb)
self.cap = None
self._frame_shape = None
self._stop = num_frames
@property
def num_frames(self):
if self._num_frames is None:
self.load_capture_properties()
self._stop = self._num_frames
return self._num_frames
@property
def frame_shape(self):
if self._frame_shape is None:
self.load_capture_properties()
return self._frame_shape
[docs] def load_capture_properties(self):
close_after = False
if self.cap is None:
close_after = True
self.cap = cv2.VideoCapture(self.movie_path)
self._num_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self._frame_shape = (int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
if close_after:
self.cap.release()
self.cap = None
[docs] def open(self):
if self.cap:
raise IOError("capture is open already")
super(CvInputStream, self).open()
self.cap = cv2.VideoCapture(self.movie_path)
logging.debug("opened capture")
[docs] def close(self):
if self.cap is None:
return
self.cap.release()
self.cap = None
super(CvInputStream, self).close()
def _seek_frame(self, i):
if self.cap is None:
raise IOError("capture is not open")
self.cap.set(cv2.CAP_PROP_POS_FRAMES, i)
def _get_frame(self, i):
if self.cap is None:
raise IOError("capture is not open")
ret, frame = self.cap.read()
return frame
def _error(self):
self.cap.release()
self.cap = None
[docs]class FrameOutputStream(object):
def __init__(self, block_size=1):
self.frames_processed = 0
self.block_frames = []
self.block_size = block_size
[docs] def open(self, movie_path):
self.frames_processed = 0
self.block_frames = []
self.movie_path = movie_path
def _write_frames(self, frames):
raise NotImplementedError()
[docs] def write(self, frame):
self.block_frames.append(frame)
if len(self.block_frames) == self.block_size:
self._write_frames(self.block_frames)
self.frames_processed += len(self.block_frames)
self.block_frames = []
[docs] def close(self):
if self.block_frames:
self._write_frames(self.block_frames)
self.frames_processed += len(self.block_frames)
self.block_frames = []
logging.debug("wrote %d frames", self.frames_processed)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
raise exc_value
self.close()
[docs]class CvOutputStream(FrameOutputStream):
def __init__(self, movie_path, frame_shape, frame_rate=30.0,
fourcc="H264", is_color=True, block_size=1):
super(CvOutputStream, self).__init__(block_size)
self.frame_shape = frame_shape
self.movie_path = movie_path
self.fourcc = cv2.VideoWriter_fourcc(*str(fourcc))
self.frame_rate = frame_rate
self.is_color = is_color
self.writer = None
[docs] def open(self, movie_path):
super(CvOutputStream, self).open(movie_path)
if self.writer:
raise IOError("video writer is open already")
self.writer = cv2.VideoWriter(movie_path, self.fourcc,
self.frame_rate, self.frame_shape,
self.is_color)
logging.debug("opened video writer")
def _write_frames(self, frames):
if self.writer is None:
self.open(self.movie_path)
for frame in frames:
self.writer.write(frame)
[docs] def close(self):
super(CvOutputStream, self).close()
if self.writer is None:
raise IOError("video writer is closed")
self.writer.release()
logging.debug("closed video writer")
self.writer = None
def __exit__(self, exc_type, exc_value, tb):
if exc_value:
self.writer.release()
raise exc_value
self.close()