# video_compression_model.py import gc import os import cv2 import numpy as np import tensorflow as tf from tensorflow.keras import layers from featureExtraction import preprocess_frame, scale_crf, scale_speed_preset from globalVars import HEIGHT, LOGGER, NUM_COLOUR_CHANNELS, NUM_PRESET_SPEEDS, PRESET_SPEED_CATEGORIES, WIDTH #from tensorflow.keras.mixed_precision import Policy #policy = Policy('mixed_float16') #tf.keras.mixed_precision.set_global_policy(policy) def combine_batch(frame, crf, speed, include_controls=True, resize=True): processed_frame = preprocess_frame(frame, resize) height, width, _ = processed_frame.shape combined = [processed_frame] if include_controls: crf_array = np.full((height, width, 1), crf) speed_array = np.full((height, width, 1), speed) combined.extend([crf_array, speed_array]) return np.concatenate(combined, axis=-1) def frame_generator(videos, max_frames=None): base_dir = "test_data/validation/" for video in videos: cap_compressed = cv2.VideoCapture(os.path.join(base_dir, video["compressed_video_file"])) cap_uncompressed = cv2.VideoCapture(os.path.join(base_dir, video["original_video_file"])) frame_count = 0 while True: ret_compressed, compressed_frame = cap_compressed.read() ret_uncompressed, uncompressed_frame = cap_uncompressed.read() if not ret_compressed or not ret_uncompressed: break CRF = scale_crf(video["crf"]) SPEED = scale_speed_preset(PRESET_SPEED_CATEGORIES.index(video["preset_speed"])) validation = combine_batch(compressed_frame, CRF, SPEED, include_controls=False) training = combine_batch(uncompressed_frame, 10, scale_speed_preset(PRESET_SPEED_CATEGORIES.index("veryslow"))) yield training, validation frame_count += 1 if max_frames is not None and frame_count >= max_frames: break cap_compressed.release() cap_uncompressed.release() def create_dataset(videos, batch_size, max_frames=None): # Determine the output signature by processing a single video to obtain its shape video_generator_instance = frame_generator(videos, max_frames) sample_uncompressed, sample_compressed = next(video_generator_instance) output_signature = ( tf.TensorSpec(shape=tf.shape(sample_uncompressed), dtype=tf.float32), tf.TensorSpec(shape=tf.shape(sample_compressed), dtype=tf.float32) ) dataset = tf.data.Dataset.from_generator( lambda: frame_generator(videos, max_frames), # Include max_frames argument through lambda output_signature=output_signature ) dataset = dataset.batch(batch_size).shuffle(20).prefetch(1) #.prefetch(tf.data.experimental.AUTOTUNE) return dataset class VideoCompressionModel(tf.keras.Model): def __init__(self): super(VideoCompressionModel, self).__init__() input_shape = (None, None, NUM_COLOUR_CHANNELS + 2) # Encoder part of the model self.encoder = tf.keras.Sequential([ layers.InputLayer(input_shape=input_shape), layers.Conv2D(64, (3, 3), padding='same'), #layers.BatchNormalization(), layers.LeakyReLU(), layers.MaxPooling2D((2, 2), padding='same'), layers.SeparableConv2D(32, (3, 3), padding='same'), # Using Separable Convolution #layers.BatchNormalization(), layers.LeakyReLU(), layers.MaxPooling2D((2, 2), padding='same') ]) # Decoder part of the model self.decoder = tf.keras.Sequential([ layers.Conv2DTranspose(32, (3, 3), padding='same'), #layers.BatchNormalization(), layers.LeakyReLU(), layers.Conv2DTranspose(64, (3, 3), dilation_rate=2, padding='same'), # Using Dilated Convolution #layers.BatchNormalization(), layers.LeakyReLU(), # Use Sub-Pixel Convolutional Layer layers.Conv2DTranspose(NUM_COLOUR_CHANNELS * 16, (3, 3), padding='same'), # 16 times the number of color channels layers.Lambda(lambda x: tf.nn.depth_to_space(x, block_size=4)) # Sub-Pixel Convolutional Layer with block_size=4 ]) def call(self, inputs): encoded = self.encoder(inputs) return self.decoder(encoded)