This repository has been archived on 2025-05-04. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
DeepEncode/video_compression_model.py
2023-09-10 01:20:10 +01:00

178 lines
6.2 KiB
Python

# video_compression_model.py
import gc
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from featureExtraction import preprocess_frame, scale_crf, scale_speed_preset
from globalVars import DATATYPE, LOGGER, NUM_COLOUR_CHANNELS, PRESET_SPEED_CATEGORIES
if DATATYPE == tf.float16:
from tensorflow.keras.mixed_precision import Policy
policy = Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)
def is_black(frame, threshold=10):
"""Check if a frame is mostly black."""
return np.mean(frame) < threshold
def combine_batch(frame, resize=True):
return preprocess_frame(frame, resize)
def frame_generator(videos, max_frames=None):
base_dir = "test_data/validation/"
for video in videos:
cap_compressed = cv2.VideoCapture(os.path.join(base_dir, video["compressed_video_file"]))
cap_uncompressed = cv2.VideoCapture(os.path.join(base_dir, video["original_video_file"]))
frame_count = 0
while True:
ret_compressed, compressed_frame = cap_compressed.read()
ret_uncompressed, uncompressed_frame = cap_uncompressed.read()
if not ret_compressed or not ret_uncompressed:
break
# Skip black frames
if is_black(compressed_frame) or is_black(uncompressed_frame):
continue
CRF = scale_crf(video["crf"])
SPEED = scale_speed_preset(PRESET_SPEED_CATEGORIES.index(video["preset_speed"]))
validation_image = combine_batch(compressed_frame)
training_image = combine_batch(uncompressed_frame)
yield ({'image': training_image, 'CRF': CRF, 'Speed': SPEED}, validation_image)
frame_count += 1
if max_frames is not None and frame_count >= max_frames:
break
cap_compressed.release()
cap_uncompressed.release()
def create_dataset(videos, batch_size, max_frames=None):
# Determine the output signature by processing a single video to obtain its shape
video_generator_instance = frame_generator(videos, max_frames)
sample_uncompressed, sample_compressed = next(video_generator_instance)
output_signature = (
{
'image': tf.TensorSpec(shape=tf.shape(sample_uncompressed['image']), dtype=DATATYPE),
'CRF': tf.TensorSpec(shape=(), dtype=DATATYPE),
'Speed': tf.TensorSpec(shape=(), dtype=DATATYPE),
},
tf.TensorSpec(shape=tf.shape(sample_compressed), dtype=DATATYPE)
)
dataset = tf.data.Dataset.from_generator(
lambda: frame_generator(videos, max_frames),
output_signature=output_signature
)
dataset = dataset.batch(batch_size).shuffle(20).prefetch(1)
return dataset
class SeparableTranspose2D(layers.Layer):
def __init__(self, filters, kernel_size, strides=(1, 1), padding='same', **kwargs):
super(SeparableTranspose2D, self).__init__(**kwargs)
self.filters = filters
self.kernel_size = kernel_size
self.strides = strides
self.padding = padding
# Use UpSampling2D for resizing
self.upsample = layers.UpSampling2D(size=strides)
# Depthwise convolution
self.depthwise_conv = layers.DepthwiseConv2D(kernel_size=kernel_size, padding=padding)
# Pointwise convolution
self.pointwise_conv = layers.Conv2D(filters, kernel_size=(1, 1), padding=padding)
def call(self, inputs):
x = self.upsample(inputs)
x = self.depthwise_conv(x)
x = self.pointwise_conv(x)
return x
class VideoCompressionModel(tf.keras.Model):
def __init__(self):
super(VideoCompressionModel, self).__init__()
input_shape = (None, None, NUM_COLOUR_CHANNELS)
# Encoder part of the model
self.encoder = tf.keras.Sequential([
layers.InputLayer(input_shape=input_shape),
layers.SeparableConv2D(64, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.MaxPooling2D((2, 2), padding='same'),
layers.SeparableConv2D(128, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.MaxPooling2D((2, 2), padding='same'),
])
# Fully connected layers for processing CRF and Speed
self.dense_crf_speed = tf.keras.Sequential([
layers.Dense(64, activation='relu'),
layers.Dense(128, activation='relu'),
])
# Decoder part of the model
self.decoder = tf.keras.Sequential([
SeparableTranspose2D(128, (3, 3), padding='same'),
layers.LeakyReLU(),
SeparableTranspose2D(64, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.UpSampling2D((2, 2)),
layers.UpSampling2D((2, 2)),
layers.Conv2D(NUM_COLOUR_CHANNELS, (3, 3), padding='same', activation='sigmoid')
])
def call(self, inputs):
# Extract the image, CRF, and Speed values from the inputs dictionary
image = inputs['image']
crf = inputs['CRF']
speed = inputs['Speed']
# CRF and Speed are 1D tensors with shape [batch_size]
# Concatenate them to create a [batch_size, 2] tensor
crf_speed_vector = tf.concat([tf.expand_dims(crf, -1), tf.expand_dims(speed, -1)], axis=-1)
# Process the combined crf_speed_vector through your dense layers
# This will produce a tensor with shape [batch_size, 128]
crf_speed_features = self.dense_crf_speed(crf_speed_vector)
# Reshape the tensor to match spatial dimensions
# New shape: [batch_size, 1, 1, 128]
crf_speed_features = tf.reshape(crf_speed_features, [-1, 1, 1, 128])
# Tile the tensor to match spatial dimensions of encoded tensor
# Tiled shape: [batch_size, 90, 160, 128]
crf_speed_features = tf.tile(crf_speed_features, [1, 90, 160, 1])
# Pass the image through the encoder
encoded = self.encoder(image)
# Concatenate the encoded tensor with the crf_speed_features tensor
combined_features = tf.concat([encoded, crf_speed_features], axis=-1)
# Pass the combined features through the decoder
decoded = self.decoder(combined_features)
return decoded