Source code for allensdk.eye_tracking.eye_tracking

import logging
import cv2
import numpy as np
from .fit_ellipse import EllipseFitter, ellipse_pass_filter
from .utils import generate_ray_indices, get_ray_values
from .feature_extraction import (get_circle_template,
                                 max_correlation_positions)
from .plotting import Annotator, ellipse_points


[docs]class PointGenerator(object): """Class to find candidate points for ellipse fitting. Candidates points are found by drawing rays from a seed point and checking for the first threshold crossing of each ray. Parameters ---------- index_length : int Initial default length for ray indices. n_rays : int The number of rays to check. cr_threshold_factor : float Multiplicative factor for thresholding corneal reflection. pupil_threshold_factor : float Multiplicative factor for thresholding pupil. cr_threshold_pixels : int Number of pixels (from beginning of ray) to use to determine threshold of corneal reflection. pupil_threshold_pixels : int Number of pixels (from beginning of ray) to use to determine threshold of pupil. """ DEFAULT_INDEX_LENGTH = 150 DEFAULT_N_RAYS = 150 DEFAULT_THRESHOLD_FACTOR = 1.2 DEFAULT_THRESHOLD_PIXELS = 10 def __init__(self, index_length=DEFAULT_INDEX_LENGTH, n_rays=DEFAULT_N_RAYS, cr_threshold_factor=DEFAULT_THRESHOLD_FACTOR, pupil_threshold_factor=DEFAULT_THRESHOLD_FACTOR, cr_threshold_pixels=DEFAULT_THRESHOLD_PIXELS, pupil_threshold_pixels=DEFAULT_THRESHOLD_PIXELS): self.update_params(index_length=index_length, n_rays=n_rays, cr_threshold_factor=cr_threshold_factor, pupil_threshold_factor=pupil_threshold_factor, cr_threshold_pixels=cr_threshold_pixels, pupil_threshold_pixels=pupil_threshold_pixels) self.above_threshold = {"cr": False, "pupil": True} self._intensity_estimate = 0
[docs] def update_params(self, index_length=DEFAULT_INDEX_LENGTH, n_rays=DEFAULT_N_RAYS, cr_threshold_factor=DEFAULT_THRESHOLD_FACTOR, pupil_threshold_factor=DEFAULT_THRESHOLD_FACTOR, cr_threshold_pixels=DEFAULT_THRESHOLD_PIXELS, pupil_threshold_pixels=DEFAULT_THRESHOLD_PIXELS): """Update starburst point generation parameters. Parameters ---------- index_length : int Initial default length for ray indices. n_rays : int The number of rays to check. cr_threshold_factor : float Multiplicative factor for thresholding corneal reflection. pupil_threshold_factor : float Multiplicative factor for thresholding pupil. cr_threshold_pixels : int Number of pixels (from beginning of ray) to use to determine threshold of corneal reflection. pupil_threshold_pixels : int Number of pixels (from beginning of ray) to use to determine threshold of pupil. """ self.index_length = index_length self.xs, self.ys = generate_ray_indices(index_length, n_rays) self.threshold_pixels = {"cr": cr_threshold_pixels, "pupil": pupil_threshold_pixels} self.threshold_factor = {"cr": cr_threshold_factor,
"pupil": pupil_threshold_factor}
[docs] def get_candidate_points(self, image, seed_point, point_type, filter_function=None, filter_args=(), filter_kwargs=None): """Get candidate points for ellipse fitting. Parameters ---------- image : numpy.ndarray Image to check for threshold crossings. seed_point : tuple (y, x) center point for ray burst. point_type : str Either 'cr' or 'pupil'. Determines if threshold crossing is high-to-low or low-to-high and which `threshold_factor` and `threshold_pixels` value to use. Returns ------- candidate_points : list List of (y, x) candidate points. """ xs = self.xs + seed_point[1] ys = self.ys + seed_point[0] ray_values = get_ray_values(xs, ys, image) filtered_out = 0 threshold_not_crossed = 0 candidate_points = [] if filter_kwargs is None: filter_kwargs = {} for i, values in enumerate(ray_values): try: point = self.threshold_crossing( xs[i], ys[i], values, point_type) if filter_function is not None: filter_kwargs["pupil_intensity_estimate"] = \ self._intensity_estimate if filter_function(point, *filter_args, **filter_kwargs): candidate_points.append(point) else: filtered_out += 1 else: candidate_points.append(point) except ValueError: threshold_not_crossed += 1 if threshold_not_crossed or filtered_out: logging.debug(("%s candidate points returned, %s filtered out, %s " "not generated because threshold not crossed"), len(candidate_points), filtered_out, threshold_not_crossed)
return candidate_points
[docs] def threshold_crossing(self, xs, ys, values, point_type): """Check a ray for where it crosses a threshold. The threshold is calculated using `get_threshold`. Parameters ---------- xs : numpy.ndarray X indices of ray. ys : numpy.ndarray Y indices of ray. values : numpy.ndarray Image values along ray. point_type : str Either 'cr' or 'pupil'. Determines if threshold crossing is high-to-low or low-to-high and which `threshold_factor` and `threshold_pixels` value to use. Returns ------- y_index : int Y index of threshold crossing. x_index : int X index of threshold crossing. Raises ------ ValueError If no threshold crossing found. """ try: above_threshold = self.above_threshold[point_type] threshold_pixels = self.threshold_pixels[point_type] threshold_factor = self.threshold_factor[point_type] except KeyError: raise ValueError(("'{}' is not a supported point type, must be " "'cr' or 'pupil'").format(point_type)) threshold = self.get_threshold(values, threshold_pixels, threshold_factor) if above_threshold: comparison = values[threshold_pixels:] > threshold else: comparison = values[threshold_pixels:] < threshold sub_index = np.argmax(comparison) if comparison[sub_index]: index = threshold_pixels + sub_index return ys[index], xs[index] else:
raise ValueError("No value in array crosses: {}".format(threshold))
[docs] def get_threshold(self, ray_values, threshold_pixels, threshold_factor): """Calculate the threshold from the ray values. The threshold is determined from `threshold_factor` times the mean of the first `threshold_pixels` values. Parameters ---------- ray_values : numpy.ndarray Values of the ray. threshold_factor : float Multiplicative factor for thresholding. threshold_pixels : int Number of pixels (from beginning of ray) to use to determine threshold. Returns ------- threshold : float Threshold to set for candidate point. """ sub_ray = ray_values[threshold_pixels] self._intensity_estimate = np.mean(sub_ray) threshold = threshold_factor*self._intensity_estimate
return threshold
[docs]class EyeTracker(object): """Mouse Eye-Tracker. Parameters ---------- input_stream : generator Generator that yields numpy.ndarray frames to analyze. output_stream : stream Stream that accepts numpuy.ndarrays in the write method. None if not outputting annotations. starburst_params : dict Dictionary of keyword arguments for `PointGenerator`. ransac_params : dict Dictionary of keyword arguments for `EllipseFitter`. pupil_bounding_box : numpy.ndarray [xmin xmax ymin ymax] bounding box for pupil seed point search. cr_bounding_box : numpy.ndarray [xmin xmax ymin ymax] bounding box for cr seed point search. generate_QC_output : bool Flag to compute extra QC data on frames. **kwargs pupil_min_value : int pupil_max_value : int cr_mask_radius : int pupil_mask_radius : int cr_recolor_scale_factor : float recolor_cr : bool adaptive_pupil: bool smoothing_kernel_size : int clip_pupil_values : bool average_iris_intensity : int """ DEFAULT_MIN_PUPIL_VALUE = 0 DEFAULT_MAX_PUPIL_VALUE = 30 DEFAULT_CR_RECOLOR_SCALE_FACTOR = 1.7 DEFAULT_RECOLOR_CR = True DEFAULT_ADAPTIVE_PUPIL = False DEFAULT_CR_MASK_RADIUS = 10 DEFAULT_PUPIL_MASK_RADIUS = 40 DEFAULT_GENERATE_QC_OUTPUT = False DEFAULT_SMOOTHING_KERNEL_SIZE = 7 DEFAULT_CLIP_PUPIL_VALUES = True DEFAULT_AVERAGE_IRIS_INTENSITY = 40 DEFAULT_MAX_ECCENTRICITY = 0.25 def __init__(self, input_stream, output_stream=None, starburst_params=None, ransac_params=None, pupil_bounding_box=None, cr_bounding_box=None, generate_QC_output=DEFAULT_GENERATE_QC_OUTPUT, **kwargs): self._mean_frame = None self._input_stream = None self.input_stream = input_stream self.point_generator = None self.ellipse_fitter = None self.min_pupil_value = self.DEFAULT_MIN_PUPIL_VALUE self.max_pupil_value = self.DEFAULT_MAX_PUPIL_VALUE self.cr_recolor_scale_factor = self.DEFAULT_CR_RECOLOR_SCALE_FACTOR self.recolor_cr = self.DEFAULT_RECOLOR_CR self.cr_mask_radius = self.DEFAULT_CR_MASK_RADIUS self.pupil_mask_radius = self.DEFAULT_PUPIL_MASK_RADIUS self.adaptive_pupil = self.DEFAULT_ADAPTIVE_PUPIL self.smoothing_kernel_size = self.DEFAULT_SMOOTHING_KERNEL_SIZE self.clip_pupil_values = self.DEFAULT_CLIP_PUPIL_VALUES self.average_iris_intensity = self.DEFAULT_AVERAGE_IRIS_INTENSITY self.max_eccentricity = self.DEFAULT_MAX_ECCENTRICITY self.update_fit_parameters(starburst_params=starburst_params, ransac_params=ransac_params, pupil_bounding_box=pupil_bounding_box, cr_bounding_box=cr_bounding_box, **kwargs) self.annotator = Annotator(output_stream) self.pupil_parameters = [] self.cr_parameters = [] self.pupil_colors = [] self.generate_QC_output = generate_QC_output self.current_seed = None self.current_pupil_candidates = None self.current_image = None self.current_image_mean = 0 self.blurred_image = None self.cr_filled_image = None self.pupil_max_image = None self.annotated_image = None self.frame_index = 0
[docs] def update_fit_parameters(self, starburst_params=None, ransac_params=None, pupil_bounding_box=None, cr_bounding_box=None, **kwargs): """Update EyeTracker fitting parameters. Parameters ---------- starburst_params : dict Dictionary of keyword arguments for `PointGenerator`. ransac_params : dict Dictionary of keyword arguments for `EllipseFitter`. pupil_bounding_box : numpy.ndarray [xmin xmax ymin ymax] bounding box for pupil seed point search. cr_bounding_box : numpy.ndarray [xmin xmax ymin ymax] bounding box for cr seed point search. generate_QC_output : bool Flag to compute extra QC data on frames. **kwargs pupil_min_value : int pupil_max_value : int cr_mask_radius : int pupil_mask_radius : int cr_recolor_scale_factor : float recolor_cr : bool adaptive_pupil: bool smoothing_kernel_size : int clip_pupil_values : bool average_iris_intensity : int """ if self.point_generator is None: if starburst_params is None: self.point_generator = PointGenerator() else: self.point_generator = PointGenerator(**starburst_params) elif starburst_params is not None: self.point_generator.update_params(**starburst_params) if self.ellipse_fitter is None: if ransac_params is None: self.ellipse_fitter = EllipseFitter() else: self.ellipse_fitter = EllipseFitter(**ransac_params) elif ransac_params is not None: self.ellipse_fitter.update_params(**ransac_params) if pupil_bounding_box is None or len(pupil_bounding_box) != 4: pupil_bounding_box = default_bounding_box(self.im_shape) if cr_bounding_box is None or len(cr_bounding_box) != 4: cr_bounding_box = default_bounding_box(self.im_shape) self.pupil_bounding_box = pupil_bounding_box self.cr_bounding_box = cr_bounding_box self._init_kwargs(**kwargs) self.current_seed = None self.current_pupil_candidates = None self.current_image = None self.current_image_mean = 0 self.blurred_image = None self.cr_filled_image = None
self.annotated_image = None def _init_kwargs(self, **kwargs): self.min_pupil_value = kwargs.get("min_pupil_value", self.min_pupil_value) self.max_pupil_value = kwargs.get("max_pupil_value", self.max_pupil_value) self.last_pupil_color = self.min_pupil_value self.cr_recolor_scale_factor = kwargs.get( "cr_recolor_scale_factor", self.cr_recolor_scale_factor) self.recolor_cr = kwargs.get("recolor_cr", self.recolor_cr) self.cr_mask_radius = kwargs.get("cr_mask_radius", self.cr_mask_radius) self.cr_mask = get_circle_template(self.cr_mask_radius, fill=1, surround=-1) self.pupil_mask_radius = kwargs.get("pupil_mask_radius", self.pupil_mask_radius) self.adaptive_pupil = kwargs.get( "adaptive_pupil", self.adaptive_pupil) self.smoothing_kernel_size = kwargs.get( "smoothing_kernel_size", self.smoothing_kernel_size) self.clip_pupil_values = kwargs.get( "clip_pupil_values", self.clip_pupil_values) if self.clip_pupil_values: self.pupil_limits = (self.min_pupil_value, self.max_pupil_value) else: self.pupil_limits = None self.average_iris_intensity = kwargs.get( "average_iris_intensity", self.average_iris_intensity) self.max_eccentricity = kwargs.get( "max_eccentricity", self.max_eccentricity) @property def im_shape(self): """Image shape.""" if self.input_stream is None: return None return self.input_stream.frame_shape @property def input_stream(self): """Input frame source.""" return self._input_stream @input_stream.setter def input_stream(self, stream): self._mean_frame = None if self._input_stream is not None: self._input_stream.close() if stream is not None and stream.frame_shape != self.im_shape: self.cr_bounding_box = default_bounding_box(stream.frame_shape) self.pupil_bounding_box = default_bounding_box(stream.frame_shape) self._input_stream = stream @property def mean_frame(self): """Average frame calculated from the input source.""" if self._mean_frame is None: mean_frame = np.zeros(self.im_shape, dtype=np.float64) frame_count = 0 for frame in self.input_stream: mean_frame += frame frame_count += 1 self._mean_frame = (mean_frame / frame_count).astype(np.uint8) return self._mean_frame
[docs] def find_corneal_reflection(self): """Estimate the position of the corneal reflection. Returns ------- ellipse_parameters : tuple (x, y, r, a, b) ellipse parameters. """ seed_point = max_correlation_positions( self.blurred_image, self.cr_mask, self.cr_bounding_box) candidate_points = self.point_generator.get_candidate_points( self.blurred_image, seed_point, "cr") return self.ellipse_fitter.fit(
candidate_points, max_radius=self.point_generator.index_length)
[docs] def setup_pupil_finder(self, cr_parameters): """Initialize image and ransac filter for pupil fitting. If recoloring the corneal_reflection, color it in and provide a filter to exclude points that fall on the colored-in ellipse from fitting. Parameters ---------- cr_parameters : tuple (x, y, r, a, b) ellipse parameters for corneal reflection. Returns ------- image : numpy.ndarray Image for pupil fitting. Has corneal reflection filled in if `recolor_cr` is set. filter_function : callable Function to indicate if points fall on the recolored ellipse or None if not recoloring. filter_parameters : tuple Ellipse parameters for recolor ellipse shape, which are `cr_parameters` with the axes scaled by `cr_recolor_scale_factor`. """ if self.recolor_cr: self.recolor_corneal_reflection(cr_parameters) base_image = self.cr_filled_image filter_function = ellipse_pass_filter x, y, r, a, b = cr_parameters filter_params = (x, y, r, self.cr_recolor_scale_factor*a, self.cr_recolor_scale_factor*b) else: base_image = self.blurred_image filter_function = None filter_params = None
return base_image, filter_function, filter_params
[docs] def find_pupil(self, cr_parameters): """Estimate position of the pupil. Parameters ---------- cr_parameters : tuple (x, y, r, a, b) ellipse parameters of corneal reflection, used to prepare image if `recolor_cr` is set. Returns ------- ellipse_parameters : tuple (x, y, r, a, b) ellipse parameters. """ base_image, filter_function, filter_params = self.setup_pupil_finder( cr_parameters) pupil_mask = get_circle_template( self.pupil_mask_radius, int(self.last_pupil_color), int(self.average_iris_intensity)) # template matching uses top-left corner for the best match, so shift # rejection coordinates accordingly if self.recolor_cr: reject = (self._recolored_r - int(pupil_mask.shape[0]/2.0), self._recolored_c - int(pupil_mask.shape[1]/2.0)) else: reject = None seed_point = max_correlation_positions( base_image, pupil_mask, self.pupil_bounding_box, reject_coords=reject) filter_kwargs = {} if self.clip_pupil_values: filter_kwargs = {"pupil_limits": self.pupil_limits} candidate_points = self.point_generator.get_candidate_points( base_image, seed_point, "pupil", filter_function=filter_function, filter_args=(filter_params, 2), filter_kwargs=filter_kwargs) self.current_seed = seed_point self.current_pupil_candidates = candidate_points return self.ellipse_fitter.fit( candidate_points, max_radius=self.point_generator.index_length,
max_eccentricity=self.max_eccentricity)
[docs] def recolor_corneal_reflection(self, cr_parameters): """Reshade the corneal reflection with the last pupil color. Parameters ---------- cr_parameters : tuple (x, y, r, a, b) ellipse parameters for corneal reflection. """ x, y, r, a, b = cr_parameters a = self.cr_recolor_scale_factor*a + 1 b = self.cr_recolor_scale_factor*b + 1 r, c = ellipse_points((x, y, r, a, b), self.blurred_image.shape) self.cr_filled_image = self.blurred_image.copy() self.cr_filled_image[r, c] = self.last_pupil_color self._recolored_r = r
self._recolored_c = c
[docs] def update_last_pupil_color(self, pupil_parameters): """Update last pupil color with mean of fit. Parameters ---------- pupil_parameters : tuple (x, y, r, a, b) ellipse parameters for pupil. """ if np.any(np.isnan(pupil_parameters)): return if self.recolor_cr: image = self.cr_filled_image else: image = self.blurred_image r, c = ellipse_points(pupil_parameters, image.shape) value = int(np.mean(image[r, c])) value = max(self.min_pupil_value, value) value = min(self.max_pupil_value, value)
self.last_pupil_color = value
[docs] def process_image(self, image): """Process an image to find pupil and corneal reflection. Parameters ---------- image : numpy.ndarray Image to process. Returns ------- cr_parameters : tuple (x, y, r, a, b) corneal reflection parameters. pupil_parameters : tuple (x, y, r, a, b) pupil parameters. cr_error : float Ellipse fit error for best fit. pupil_error : float Ellipse fit error for best fit. """ self.current_image = image self.current_image_mean = self.current_image.mean() self.blurred_image = cv2.medianBlur(image, self.smoothing_kernel_size) try: cr_parameters, cr_error = self.find_corneal_reflection() except ValueError: logging.debug("Insufficient candidate points found for fitting " "corneal reflection at frame %s", self.frame_index) cr_parameters = (np.nan, np.nan, np.nan, np.nan, np.nan) cr_error = np.nan try: pupil_parameters, pupil_error = self.find_pupil(cr_parameters) if self.adaptive_pupil: self.update_last_pupil_color(pupil_parameters) except ValueError: logging.debug("Insufficient candidate points found for fitting " "pupil at frame %s", self.frame_index) pupil_parameters = (np.nan, np.nan, np.nan, np.nan, np.nan) pupil_error = np.nan
return cr_parameters, pupil_parameters, cr_error, pupil_error
[docs] def process_stream(self, start=0, stop=None, step=1, update_mean_frame=True): """Get cr and pupil parameters from frames of `input_stream`. By default this will process every frame in the input stream. Parameters ---------- start : int Index of first frame to process. Defaults to 0. stop : int Stop index for processing. Defaults to None, which runs runs until the end of the input stream. step : int Number of frames to advance at each iteration. Used to skip frames while processing. Set to 1 to process every frame, 2 to process every other frame, etc. Defaults to 1. update_mean_frame : bool Whether or not to update the mean frame while processing the frames. Returns ------- cr_parameters : numpy.ndarray [n_frames,5] array of corneal reflection parameters. pupil_parameters : numpy.ndarray [n_frames,5] array of pupil parameters. cr_errors : numpy.ndarray [n_frames,] array of fit errors for corneal reflection ellipses. pupil_errors : numpy.ndarray [n_frames,] array of fit errors for pupil ellipses. """ self.pupil_parameters = [] self.cr_parameters = [] self.pupil_errors = [] self.cr_errors = [] self.pupil_colors = [] i = 0 if update_mean_frame: mean_frame = np.zeros(self.im_shape, dtype=np.float64) for i, frame in enumerate(self.input_stream[start:stop:step]): if update_mean_frame: mean_frame += frame self.frame_index = start + step*i cr_parameters, pupil_parameters, cr_error, pupil_error = \ self.process_image(frame) self.cr_parameters.append(cr_parameters) self.pupil_parameters.append(pupil_parameters) self.cr_errors.append(cr_error) self.pupil_errors.append(pupil_error) self.pupil_colors.append(self.last_pupil_color) if self.annotator.output_stream is not None: self.annotated_image = self.annotator.annotate_frame( frame, pupil_parameters, cr_parameters, self.current_seed, self.current_pupil_candidates) if self.generate_QC_output: self.annotator.compute_density(frame, pupil_parameters, cr_parameters) self.annotator.clear_rc() self.annotator.close() if update_mean_frame: self._mean_frame = (mean_frame / (i+1)).astype(np.uint8) return (np.array(self.cr_parameters), np.array(self.pupil_parameters),
np.array(self.cr_errors), np.array(self.pupil_errors))
[docs]def default_bounding_box(image_shape): """Calculate a default bounding box as 10% in from borders of image. Parameters ---------- image_shape : tuple (height, width) of image. Returns ------- bounding_box : numpy.ndarray [xmin, xmax, ymin, ymax] bounding box. """ if image_shape is None: return np.array([1, -1, 1, -1], dtype='int') h, w = image_shape x_crop = int(0.1*w) y_crop = int(0.1*h)
return np.array([x_crop, w-x_crop, y_crop, h-y_crop], dtype='int')