Source code for quat.visual.base_features

#!/usr/bin/env python3
Video and image no-reference based features.
Base features can also be applied for full-ref calculations,
using the `calc_ref_dis` method.
    This file is part of quat.
    quat is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.
    quat is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.
    You should have received a copy of the GNU General Public License
    along with quat. If not, see <>.

    Author: Steve Göring
import cv2
import os
import json
import copy
from abc import ABC, abstractmethod

import numpy as np
import skimage.color
import skvideo.motion
import skvideo
from skvideo.measure.strred import extract_info as strred_extract_info
from skimage import img_as_ubyte
import scipy
from scipy import ndimage

from ..log import *

[docs]class Feature: """ abstract base class for all features, handles automatic storage and loading of calculated feature values """ def __init__(self): self._values = []
[docs] @abstractmethod def calc(self, frame): """ perform feature calculation for a single frame """ pass
[docs] def calc_ref_dis(self, dframe, rframe): """ performs a full-ref style calculation, where the resulting features are calculated on both frames, and further difference values are stored, Parameters ---------- dframe : 3d array distorted video frame rframe : 3d array reference video frame Returns ------- a dict {"diff": values, "ref": values, "dis": values} or dict {"diff_" + k: values, "ref_" + k: values, "dis_" + k: values} for all keys `k` in the underlying feature. """ # TODO: rename calc_ref_dis --> calc_dis_ref # this creates for each feature stream a copy instance of the used feature # TODO: prettify the next code part if not hasattr(self, "_ref_instance"): try: self._ref_instance = copy.deepcopy(self) except TypeError as te: # TODO: fix to handle MovementFeatures self._ref_instance = self.__class__() lWarn( f"please check if {self.__class__.__name__} does not require parameters for __init__() call" ) if not hasattr(self, "_dis_instance"): try: self._dis_instance = copy.deepcopy(self) except TypeError as te: # TODO: fix to handle MovementFeatures self._dis_instance = self.__class__() v1 = self._ref_instance.calc(rframe) v2 = self._dis_instance.calc(dframe) res = {} if type(v1) == dict: for k in v1: res["diff_" + k] = v1[k] - v2[k] res["dis_" + k] = v1[k] res["ref_" + k] = v2[k] elif type(v1) == list: res["diff"] = np.array(v1) - np.array(v2) res["dis"] = v1 res["ref"] = v2 else: res["diff"] = v1 - v2 res["dis"] = v1 res["ref"] = v2 self._values.append(res) return res
[docs] def calc_dis_ref(self, dframe, rframe): """ fix for consistent naming scheme """ return self.calc_ref_dis(dframe, rframe)
[docs] def get_values(self): """ returns all stored feature values """ return self._values
[docs] def fullref(self): """ used to check if it is a full reference feature """ return False
def _feature_filename(self, folder, video, name): """ generates a feature filename for a given `video` for a specific feature folder `folder` and adds a feature name `name` """ dn = os.path.normpath(os.path.dirname(video)).replace("..", "_").replace(os.sep, "_") bn = dn + "_" + os.path.basename(os.path.splitext(video)[0]) if name == "": name = self.__class__.__name__ rfn = os.path.normpath(os.path.join(folder, bn + "_" + name + ".json")) return rfn
[docs] def load(self, folder, video, name=""): """ loads a feature from a feature folder `folder`, feature filename is estimated using the _feature_filename """ os.makedirs(folder, exist_ok=True) fn = self._feature_filename(folder, video, name) if os.path.isfile(fn): with open(fn) as ffp: try: j = json.load(ffp) except: lWarn( f"there is something wrong with {video}, feature: {name}, re-calcuation performed" ) # loading of feature value is not possible, # force to calculate it again return False self._values = j["values"] return j["values"] return False
[docs] def store(self, folder, video, name=""): """ stores a feature to a feature folder `folder`, feature filename is estimated using the _feature_filename """ os.makedirs(folder, exist_ok=True) fn = self._feature_filename(folder, video, name) v = {"name": name, "values": self._values, "video": video} with open(fn, "w") as ffp: json.dump(v, ffp, indent=4, sort_keys=True) return fn
[docs]class MovementFeatures(Feature): """ Calculates movement feature, using background removement, based on master thesis of julian zebelein """ def __init__(self): self._fgbg = cv2.createBackgroundSubtractorMOG2() self._values = []
[docs] def calc(self, frame, debug=False): file_height = frame.shape[0] file_width = frame.shape[1] frame = img_as_ubyte(frame) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) final = cv2.bilateralFilter(gray, 9, 75, 75) fgmask = self._fgbg.apply(final) # final = cv2.bilateralFilter(fgmask,9,75,75) if debug: cv2.imshow( "o", cv2.resize(frame, (600, int(600 * file_height / file_width))) ) cv2.imshow( "fg", cv2.resize(fgmask, (600, int(600 * file_height / file_width))) ) k = cv2.waitKey(30) & 0xFF non_zero = cv2.countNonZero(fgmask) moving_percentage = (non_zero * 100) / (file_height * file_width) value = float(moving_percentage) self._values.append(value) return value
[docs] def get_values(self): if len(self._values) > 0: r = 0 if type(self._values[0]) == dict: r = self._values[0] for k in r: r[k] = 0 self._values[0] = r return self._values
[docs]class CutDetectionFeatures(Feature): """ Estimates scene cuts of a given video (approximation), implemented by Serge Molina """ def __init__(self): self._values = [] self._last_frame = None self._previous_diff_weighted = 0
[docs] def calc(self, frame): # TODO change to skiimage # Scaling down the image speeds up computations and reduces false positives image = cv2.resize(frame, dsize=(320, 240), interpolation=cv2.INTER_LANCZOS4) cut = 0 # Converting the image type to a 16 bits signed integers image prevents over/under flows image = np.int16(image) if self._last_frame is not None: current_diff = np.std((image - self._last_frame).ravel()) if current_diff > 30 and current_diff > 4 * self._previous_diff_weighted: # we detect this as a cut cut = 1 self._previous_diff_weighted = ( self._previous_diff_weighted * 0.5 + 0.5 * current_diff ) self._last_frame = image self._values.append(cut) return cut
[docs]class SiFeatures(Feature): """ Calculates SI values of a video frame important: SI values are finally in a 0..1 range, due to float conversion """ def __init__(self): self._values = []
[docs] def calc(self, frame): def calculate_si(frame_data): sobx = ndimage.sobel(frame, axis=0) soby = ndimage.sobel(frame, axis=1) value = np.hypot(sobx, soby).std() return float(value) frame = skimage.color.rgb2gray(frame) value = calculate_si(frame) self._values.append(value) return value
[docs]class TiFeatures(Feature): """ Calculates TI values important: TI values are finally in a 0..1 range, due to float conversion """ def __init__(self): self._values = [] self._previous_frame = None
[docs] def calc(self, frame): def calculate_ti(frame_data, previous_frame_data): if previous_frame_data is None: return 0 return float((frame_data - previous_frame_data).std()) frame = skimage.color.rgb2gray(frame) value = calculate_ti(frame, self._previous_frame) self._previous_frame = frame self._values.append(value) return value
[docs]class TemporalFeatures(Feature): """ A temporal feature, using RMSE of consecutive frames, somehow similar to TI, but not applied on gray frames """ def __init__(self): self._values = [] self._previous_frame = None def rmse(self, x, y): return np.sqrt(((x - y) ** 2).mean())
[docs] def calc(self, frame): if self._previous_frame is None: value = 0 else: value = float(self.rmse(frame.flatten(), self._previous_frame.flatten())) self._previous_frame = frame self._values.append(value) return value
[docs]class StrredNoRefFeatures(Feature): """ calculate entropy of subbands, with the feature that is used in strred, however, this feature does not consider a reference video, it justs calculates mean of spatial and temporal features of strred """ def __init__(self): self._values = [] self._previous_frame = None
[docs] def calc(self, frame): def calculate(frame_data, previous_frame_data): if previous_frame_data is None: return { "spatial.mean": 0, "spatial.std": 0, "temporal.mean": 0, "temporal.std": 0, } spatial, temporal = strred_extract_info(previous_frame_data, frame_data) return { "spatial.mean": float(spatial.mean()), "spatial.std": float(spatial.std()), "temporal.mean": float(temporal.mean()), "temporal.std": float(temporal.std()), } frame = skimage.color.rgb2gray(frame).astype(np.float32) value = calculate(frame, self._previous_frame) self._previous_frame = frame self._values.append(value) return value
[docs]class BlockMotion(Feature): """ calculates block motion of two following frames, block size is estimated by 5% of the height of the input frame, this is done to be resolution independent and faster """ def __init__(self): self._values = [] self._last_frame = None
[docs] def calc(self, frame): per_frame_values = {"blkm.zeros": 0, "blkm.ones": 0, "blkm.minusones": 0} if self._last_frame is not None: videodata = np.array([self._last_frame, frame]) blocksize = int(self._last_frame.shape[0] * 0.05) motion = skvideo.motion.blockMotion( videodata, method="SE3SS", mbSize=blocksize ) m = motion[0].flatten() blk_motion_zeros = np.count_nonzero(m == 0) / len(m) blk_motion_ones = np.count_nonzero(m == 1) / len(m) blk_motion_minusones = np.count_nonzero(m == -1) / len(m) per_frame_values = { "blkm.zeros": blk_motion_zeros, "blkm.ones": blk_motion_ones, "blkm.minusones": blk_motion_minusones, } self._values.append(per_frame_values) self._last_frame = frame return per_frame_values
[docs]class CuboidRow(Feature): """ Motion estimation using a window of 60 frames and a cuboid video of the video, handles only rows of the frames """ WINDOW = 60 # maximum number of frames in sliding window def __init__(self, row): """ row specifies the column that should be used in %""" self._row = row self._rows = [] self._values = []
[docs] def calc(self, frame): frame_gray = skimage.color.rgb2gray(frame) if len(self._rows) >= self.WINDOW: self._rows = self._rows[1:] row_i = int(self._row * (frame.shape[0] - 1)) tmp = frame_gray[row_i].copy() # copy reduces memory ! self._rows.append(tmp) v = ndimage.sobel(self._rows).std() self._values.append(v) return v
[docs]class CuboidCol(Feature): """ Motion estimation using a window of 60 frames and a cuboid video of the video, handles only columns of the frames """ WINDOW = 60 # maximum number of frames in sliding window def __init__(self, col): """ col specifies the column that should be used in %""" self._col = col self._cols = [] self._values = []
[docs] def calc(self, frame): frame_gray = skimage.color.rgb2gray(frame) if len(self._cols) >= self.WINDOW: self._cols = self._cols[1:] col_i = int(self._col * (frame.shape[1] - 1)) tmp = frame_gray[:, col_i].copy() # copy reduces memory ! self._cols.append(tmp) v = ndimage.sobel(self._cols).std() self._values.append(v) return v
[docs]class Staticness(Feature): """ calculates how static the video is """ def __init__(self): self._values = [] self._frame_sum = None self._frame_no = 1
[docs] def calc(self, frame): # convert datatype, required otherwise overflows frame = np.array(img_as_ubyte(frame), dtype=np.int64) if self._frame_sum is None: self._frame_sum = frame else: self._frame_sum += frame sobeled_image = ndimage.sobel(self._frame_sum // self._frame_no) self._frame_no += 1 v = sobeled_image.std() self._values.append(v) # # return v
[docs]class UHDSIM2HD(Feature): """ calculates similarity of UHD input resolution to HD, if input frame is not UHD resolution, it takes half of the height and width """ def __init__(self): self._values = []
[docs] def calc(self, frame): frame_gray = skimage.color.rgb2gray(frame).astype(np.float32) # check half of input resolution width_hd, height_hd = frame_gray.shape[1] // 2, frame_gray.shape[0] // 2 frame_gray_hd = cv2.resize( frame_gray, dsize=(width_hd, height_hd), interpolation=cv2.INTER_CUBIC ) frame_gray_hd = cv2.resize( frame_gray_hd, dsize=(frame_gray.shape[1], frame_gray.shape[0]), interpolation=cv2.INTER_CUBIC, ) v = float(skvideo.measure.psnr(frame_gray, frame_gray_hd)[0]) if np.isinf(v): v = 1000 self._values.append(v) return v
[docs]class Blockiness(Feature): """ calculate blockness of a video, assume that compression blocks have the same NxN size, where N = [8, 16, 32, 64, 128], calculation performs the following steps, explained for N=8: - apply canny edge detection, K=[X,Y]-axis, normalized by num-rows/cols - calculate mean for all K summed values (A) - calculate for a shift (0,7) and for every 8th value of the K summed values the mean (B) - all shifts will be considered, and only the one with max_mean value is used - assume that the distribution should differe, if blocks are there - difference of A, and B is then the feature value overall blockiness value -- per frame: - value = :math:`\sqrt{(|x\_mean\_diff \cdot y\_mean\_diff | / 2^{(|max\_shift_x - max\_shift_y|/N)}}` """ _blocksizes = [8, 16, 32, 64, 128] def __estimate_shift(self, values, blocksize): max_i = 0 max_i_mean = values[max_i::blocksize].mean() for i in range(blocksize): curr_i_mean = values[i::blocksize].mean() if max_i_mean < curr_i_mean: max_i_mean = curr_i_mean max_i = i return max_i
[docs] def calc(self, frame): frame_c = cv2.Canny(np.uint8(frame), 100, 200) xsums = frame_c.sum(axis=0) / (frame.shape[0]) ysums = frame_c.sum(axis=1) / (frame.shape[1]) xaxis_all = xsums.mean() yaxis_all = ysums.mean() blockiness_values = [] for blocksize in self._blocksizes: max_i_x = self.__estimate_shift(xsums, blocksize) max_i_y = self.__estimate_shift(ysums, blocksize) xaxis_bsize = xsums[max_i_x::blocksize].mean() yaxis_bsize = ysums[max_i_y::blocksize].mean() v = { "x_mean_diff": xaxis_all - xaxis_bsize, "y_mean_diff": yaxis_all - yaxis_bsize, "max_i_x": max_i_x, "max_i_y": max_i_y, } v["diff"] = ( np.sqrt(np.abs(v["x_mean_diff"] * v["y_mean_diff"])) ) / np.power(2, np.abs(max_i_x - max_i_y) / blocksize) blockiness_values.append(float(v["diff"])) r = max(blockiness_values) self._values.append(r) return r
[docs]class ImageFeature(Feature): """ a generic image feature class, ususally all methods implemented in quat.visual.images can be passes as argument in the constructor """ def __init__(self, img_f): """ img_f needs to be a function that handles one frame """ self._values = [] self.img_f = img_f
[docs] def calc(self, frame): v = self.img_f(frame) self._values.append(v) return v