This repository has been archived on 2025-05-04. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
DeepEncode/video_compression_model.py
2023-08-25 01:54:22 +01:00

121 lines
4.3 KiB
Python

# video_compression_model.py
import gc
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from featureExtraction import preprocess_frame, scale_crf, scale_speed_preset
from globalVars import LOGGER, NUM_COLOUR_CHANNELS, PRESET_SPEED_CATEGORIES
#from tensorflow.keras.mixed_precision import Policy
#policy = Policy('mixed_float16')
#tf.keras.mixed_precision.set_global_policy(policy)
def combine_batch(frame, crf, speed, include_controls=True, resize=True):
processed_frame = preprocess_frame(frame, resize)
height, width, _ = processed_frame.shape
combined = [processed_frame]
if include_controls:
crf_array = np.full((height, width, 1), crf)
speed_array = np.full((height, width, 1), speed)
combined.extend([crf_array, speed_array])
return np.concatenate(combined, axis=-1)
def frame_generator(videos, max_frames=None):
base_dir = "test_data/validation/"
for video in videos:
cap_compressed = cv2.VideoCapture(os.path.join(base_dir, video["compressed_video_file"]))
cap_uncompressed = cv2.VideoCapture(os.path.join(base_dir, video["original_video_file"]))
frame_count = 0
while True:
ret_compressed, compressed_frame = cap_compressed.read()
ret_uncompressed, uncompressed_frame = cap_uncompressed.read()
if not ret_compressed or not ret_uncompressed:
break
CRF = scale_crf(video["crf"])
SPEED = scale_speed_preset(PRESET_SPEED_CATEGORIES.index(video["preset_speed"]))
validation = combine_batch(compressed_frame, CRF, SPEED, include_controls=False)
training = combine_batch(uncompressed_frame, 10, scale_speed_preset(PRESET_SPEED_CATEGORIES.index("veryslow")))
yield training, validation
frame_count += 1
if max_frames is not None and frame_count >= max_frames:
break
cap_compressed.release()
cap_uncompressed.release()
def create_dataset(videos, batch_size, max_frames=None):
# Determine the output signature by processing a single video to obtain its shape
video_generator_instance = frame_generator(videos, max_frames)
sample_uncompressed, sample_compressed = next(video_generator_instance)
output_signature = (
tf.TensorSpec(shape=tf.shape(sample_uncompressed), dtype=tf.float32),
tf.TensorSpec(shape=tf.shape(sample_compressed), dtype=tf.float32)
)
dataset = tf.data.Dataset.from_generator(
lambda: frame_generator(videos, max_frames), # Include max_frames argument through lambda
output_signature=output_signature
)
dataset = dataset.batch(batch_size).shuffle(20).prefetch(1) #.prefetch(tf.data.experimental.AUTOTUNE)
return dataset
class VideoCompressionModel(tf.keras.Model):
def __init__(self):
super(VideoCompressionModel, self).__init__()
input_shape = (None, None, NUM_COLOUR_CHANNELS + 2)
# Encoder part of the model
self.encoder = tf.keras.Sequential([
layers.InputLayer(input_shape=input_shape),
layers.Conv2D(32, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.MaxPooling2D((2, 2), padding='same'),
layers.Dropout(0.4),
layers.SeparableConv2D(16, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.MaxPooling2D((2, 2), padding='same'),
layers.Dropout(0.4),
])
# Decoder part of the model using Transposed Convolutions for upsampling
self.decoder = tf.keras.Sequential([
layers.Conv2DTranspose(16, (3, 3), padding='same'),
layers.LeakyReLU(),
layers.Dropout(0.4),
layers.Conv2DTranspose(32, (3, 3), strides=(2, 2), padding='same'),
layers.LeakyReLU(),
layers.Dropout(0.4),
layers.UpSampling2D((2, 2)),
layers.Conv2D(NUM_COLOUR_CHANNELS, (3, 3), padding='same', activation='sigmoid')
])
def call(self, inputs):
#print(f"Input: {inputs.shape}")
encoded = self.encoder(inputs)
#print(f"encoded: {encoded.shape}")
decoded = self.decoder(encoded)
#print(f"decoded: {decoded.shape}")
return decoded