# video_compression_model.py import gc import os import cv2 import numpy as np import tensorflow as tf from featureExtraction import preprocess_frame, scale_crf, scale_speed_preset from globalVars import HEIGHT, LOGGER, NUM_COLOUR_CHANNELS, NUM_PRESET_SPEEDS, PRESET_SPEED_CATEGORIES, WIDTH #from tensorflow.keras.mixed_precision import Policy #policy = Policy('mixed_float16') #tf.keras.mixed_precision.set_global_policy(policy) def combine_batch(frame, crf, speed, include_controls=True, resize=True): processed_frame = preprocess_frame(frame, resize) height, width, _ = processed_frame.shape combined = [processed_frame] if include_controls: crf_array = np.full((height, width, 1), crf) speed_array = np.full((height, width, 1), speed) combined.extend([crf_array, speed_array]) return np.concatenate(combined, axis=-1) def process_video(video): base_dir = os.path.dirname("test_data/validation/validation.json") cap_compressed = cv2.VideoCapture(os.path.join(base_dir, video["compressed_video_file"])) cap_uncompressed = cv2.VideoCapture(os.path.join(base_dir, video["original_video_file"])) compressed_frames = [] uncompressed_frames = [] while True: ret_compressed, compressed_frame = cap_compressed.read() ret_uncompressed, uncompressed_frame = cap_uncompressed.read() if not ret_compressed or not ret_uncompressed: break CRF = scale_crf(video["crf"]) SPEED = scale_speed_preset(PRESET_SPEED_CATEGORIES.index(video["preset_speed"])) compressed_combined = combine_batch(compressed_frame, CRF, SPEED, include_controls=False) uncompressed_combined = combine_batch(uncompressed_frame, 0, scale_speed_preset(PRESET_SPEED_CATEGORIES.index("veryslow"))) compressed_frames.append(compressed_combined) uncompressed_frames.append(uncompressed_combined) cap_compressed.release() cap_uncompressed.release() return uncompressed_frames, compressed_frames def frame_generator(videos, max_frames=None): base_dir = "test_data/validation/" for video in videos: cap_compressed = cv2.VideoCapture(os.path.join(base_dir, video["compressed_video_file"])) cap_uncompressed = cv2.VideoCapture(os.path.join(base_dir, video["original_video_file"])) frame_count = 0 while True: ret_compressed, compressed_frame = cap_compressed.read() ret_uncompressed, uncompressed_frame = cap_uncompressed.read() if not ret_compressed or not ret_uncompressed: break CRF = scale_crf(video["crf"]) SPEED = scale_speed_preset(PRESET_SPEED_CATEGORIES.index(video["preset_speed"])) compressed_combined = combine_batch(compressed_frame, CRF, SPEED, include_controls=False) uncompressed_combined = combine_batch(uncompressed_frame, 10, scale_speed_preset(PRESET_SPEED_CATEGORIES.index("veryslow"))) yield uncompressed_combined, compressed_combined frame_count += 1 if max_frames is not None and frame_count >= max_frames: break cap_compressed.release() cap_uncompressed.release() def create_dataset(videos, batch_size, max_frames=None): # Determine the output signature by processing a single video to obtain its shape video_generator_instance = frame_generator(videos, max_frames) sample_uncompressed, sample_compressed = next(video_generator_instance) output_signature = ( tf.TensorSpec(shape=tf.shape(sample_uncompressed), dtype=tf.float32), tf.TensorSpec(shape=tf.shape(sample_compressed), dtype=tf.float32) ) dataset = tf.data.Dataset.from_generator( lambda: frame_generator(videos, max_frames), # Include max_frames argument through lambda output_signature=output_signature ) dataset = dataset.shuffle(100).batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE) return dataset class VideoCompressionModel(tf.keras.Model): def __init__(self): super(VideoCompressionModel, self).__init__() LOGGER.debug("Initializing VideoCompressionModel.") # Input shape (includes channels for CRF and SPEED_PRESET) input_shape_with_histogram = (None, None, NUM_COLOUR_CHANNELS + 2) # Encoder part of the model self.encoder = tf.keras.Sequential([ tf.keras.layers.InputLayer(input_shape=input_shape_with_histogram), tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'), tf.keras.layers.MaxPooling2D((2, 2), padding='same'), tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same'), tf.keras.layers.MaxPooling2D((2, 2), padding='same') ]) # Decoder part of the model self.decoder = tf.keras.Sequential([ tf.keras.layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same'), tf.keras.layers.UpSampling2D((2, 2)), tf.keras.layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same'), tf.keras.layers.UpSampling2D((2, 2)), tf.keras.layers.Conv2DTranspose(NUM_COLOUR_CHANNELS, (3, 3), activation='sigmoid', padding='same') ]) def call(self, inputs): return self.decoder(self.encoder(inputs))