import tensorflow as tf import numpy as np import cv2 from video_compression_model import NUM_FRAMES, PRESET_SPEED_CATEGORIES, VideoCompressionModel # Constants MAX_FRAMES = 24 CHUNK_SIZE = 24 # Adjust based on available memory and video resolution COMPRESSED_VIDEO_FILE = 'compressed_video.mkv' # Step 2: Load the trained model model = tf.keras.models.load_model('models/model_differencing.keras', custom_objects={'VideoCompressionModel': VideoCompressionModel}) # Step 3: Load the uncompressed video UNCOMPRESSED_VIDEO_FILE = 'test_data/training_video.mkv' def load_frames_from_video(video_file, start_frame=0, num_frames=CHUNK_SIZE): cap = cv2.VideoCapture(video_file) frames = [] cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) for _ in range(num_frames): ret, frame = cap.read() if not ret: break frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0 # Normalize and convert to float32 frames.append(frame) cap.release() return frames def predict_in_chunks(uncompressed_frames, model, crf_values, preset_speed_values): num_sequences = len(uncompressed_frames) - NUM_FRAMES + 1 compressed_frames = [] #for frame in uncompressed_frames: # cv2.imshow("frame", frame) # cv2.waitKey(50) for start in range(0, num_sequences, CHUNK_SIZE): end = min(start + CHUNK_SIZE, num_sequences) frame_chunk = uncompressed_frames[start:end + NUM_FRAMES - 1] crf_chunk = crf_values[start:end] speed_chunk = preset_speed_values[start:end] frame_sequences = [] for i in range(len(frame_chunk) - NUM_FRAMES + 1): sequence = frame_chunk[i:i + NUM_FRAMES] frame_sequences.append(sequence) frame_sequences = np.array(frame_sequences) compressed_chunk = model.predict({"frames": frame_sequences, "crf": crf_chunk, "preset_speed": speed_chunk}) compressed_frames.extend(compressed_chunk) return compressed_frames def save_frames_chunk(frames, video_writer): for frame in frames: frame = np.clip(frame * 255.0, 0, 255).astype(np.uint8) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) video_writer.write(frame) cap = cv2.VideoCapture(UNCOMPRESSED_VIDEO_FILE) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if MAX_FRAMES != 0 and total_frames > MAX_FRAMES: total_frames = MAX_FRAMES cap.release() crf_values = np.full((CHUNK_SIZE + NUM_FRAMES - 1, 1), 25, dtype=np.float32) # Chunk size + look-ahead frames preset_speed_index = PRESET_SPEED_CATEGORIES.index("fast") preset_speed_values = np.full((CHUNK_SIZE + NUM_FRAMES - 1, 1), preset_speed_index, dtype=np.float32) out = None # Video writer instance for i in range(0, total_frames, CHUNK_SIZE): uncompressed_frames_chunk = load_frames_from_video(UNCOMPRESSED_VIDEO_FILE, start_frame=i) compressed_frames_chunk = predict_in_chunks(uncompressed_frames_chunk, model, crf_values, preset_speed_values) # Initialize video writer if it's the first chunk if out is None: height, width = compressed_frames_chunk[0].shape[:2] fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(COMPRESSED_VIDEO_FILE, fourcc, 24.0, (width, height)) save_frames_chunk(compressed_frames_chunk, out) out.release() print("Compression completed.")