| | import argparse |
| | import math |
| | import os |
| | import cv2 |
| | import subprocess |
| | from datetime import timedelta |
| | from urllib.parse import urlparse |
| | import re |
| | import numpy as np |
| | import PIL |
| | from PIL import Image, ImageDraw |
| | import datetime |
| | import torch |
| | import torchvision |
| | import torch.distributed as dist |
| | from torch.utils.data.distributed import DistributedSampler |
| | from torch.nn.parallel import DistributedDataParallel as DDP |
| | import torchvision.transforms as transforms |
| | import torch.nn.functional as F |
| | import torch.utils.checkpoint |
| | from einops import rearrange |
| | import random |
| | from skimage.metrics import structural_similarity as compare_ssim |
| |
|
| | from diffusers.utils import load_image |
| |
|
| |
|
| |
|
| |
|
| | def export_to_video(video_frames, output_video_path, fps): |
| | fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| | h, w, _ = video_frames[0].shape |
| | video_writer = cv2.VideoWriter( |
| | output_video_path, fourcc, fps=fps, frameSize=(w, h)) |
| | for i in range(len(video_frames)): |
| | img = cv2.cvtColor(video_frames[i], cv2.COLOR_RGB2BGR) |
| | video_writer.write(img) |
| |
|
| |
|
| | def export_to_gif(frames, output_gif_path, fps): |
| | """ |
| | Export a list of frames to a GIF. |
| | |
| | Args: |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - output_gif_path (str): Path to save the output GIF. |
| | - duration_ms (int): Duration of each frame in milliseconds. |
| | |
| | """ |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance( |
| | frame, np.ndarray) else frame for frame in frames] |
| |
|
| | pil_frames[0].save(output_gif_path.replace('.mp4', '.gif'), |
| | format='GIF', |
| | append_images=pil_frames[1:], |
| | save_all=True, |
| | duration=100, |
| | loop=0) |
| | |
| | from PIL import Image |
| | import numpy as np |
| |
|
| | def export_gif_with_ref(start_image, frames, end_image, reference_image, output_gif_path, fps): |
| | """ |
| | Export a list of frames into a GIF with columns and an additional version with only frames. |
| | |
| | Args: |
| | - start_image (PIL.Image): The starting image. |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - end_image (PIL.Image): The ending image. |
| | - reference_image (PIL.Image): The reference image. |
| | - output_gif_path (str): Path to save the output GIF. |
| | - fps (int): Frames per second for the GIF. |
| | """ |
| | |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance(frame, np.ndarray) else frame for frame in frames] |
| | |
| | |
| | width, height = start_image.size |
| | |
| | |
| | reference_image = reference_image.resize((reference_image.width, height)) |
| | resized_frames = [frame.resize((frame.width, height)) for frame in pil_frames] |
| | |
| | |
| | column_frames = [] |
| | for frame in resized_frames: |
| | |
| | new_width = start_image.width + reference_image.width + end_image.width+frame.width |
| | combined_frame = Image.new('RGB', (new_width, height)) |
| | |
| | |
| | combined_frame.paste(start_image, (0, 0)) |
| | combined_frame.paste(reference_image, (start_image.width, 0)) |
| | combined_frame.paste(end_image, (start_image.width + reference_image.width, 0)) |
| | combined_frame.paste(frame, (start_image.width + reference_image.width+end_image.width, 0)) |
| | |
| | column_frames.append(combined_frame) |
| | |
| | |
| | frame_duration = 150 |
| | |
| | |
| | column_frames[0].save(output_gif_path, |
| | format='GIF', |
| | append_images=column_frames[1:], |
| | save_all=True, |
| | duration=frame_duration, |
| | loop=0) |
| | |
| | |
| |
|
| |
|
| |
|
| | def tensor_to_vae_latent(t, vae): |
| | video_length = t.shape[1] |
| |
|
| | t = rearrange(t, "b f c h w -> (b f) c h w") |
| | latents = vae.encode(t).latent_dist.sample() |
| | latents = rearrange(latents, "(b f) c h w -> b f c h w", f=video_length) |
| | latents = latents * vae.config.scaling_factor |
| |
|
| | return latents |
| |
|
| |
|
| | def download_image(url): |
| | original_image = ( |
| | lambda image_url_or_path: load_image(image_url_or_path) |
| | if urlparse(image_url_or_path).scheme |
| | else PIL.Image.open(image_url_or_path).convert("RGB") |
| | )(url) |
| | return original_image |
| |
|
| |
|
| | def map_ssim_distance(dis): |
| | if dis > 0.95: |
| | return 1 |
| | elif dis > 0.9: |
| | return 2 |
| | elif dis > 0.85: |
| | return 3 |
| | elif dis > 0.80: |
| | return 4 |
| | elif dis > 0.75: |
| | return 5 |
| | elif dis > 0.70: |
| | return 6 |
| | elif dis > 0.65: |
| | return 7 |
| | elif dis > 0.60: |
| | return 8 |
| | elif dis > 0.55: |
| | return 9 |
| | else: |
| | return 10 |
| |
|
| |
|
| | def calculate_ssim(frame1, frame2): |
| | |
| | gray_frame1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY) |
| | gray_frame2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY) |
| | |
| | |
| | ssim = compare_ssim(gray_frame1, gray_frame2) |
| | |
| | return ssim |
| |
|
| |
|
| | def mse(image1, image2): |
| | err = np.sum((image1.astype("float") - image2.astype("float")) ** 2) |
| | err /= float(image1.shape[0] * image1.shape[1]) |
| | return err |
| |
|
| |
|
| | def calculate_video_motion_distance(frames_data): |
| | |
| | frame_count, _, _, _ = frames_data.shape |
| | |
| | |
| | similarities = [] |
| |
|
| | |
| | for frame_index in range(1, frame_count): |
| | prev_frame = frames_data[frame_index - 1, :, :, :] |
| | current_frame = frames_data[frame_index, :, :, :] |
| |
|
| | |
| | similarity = calculate_ssim(prev_frame, current_frame) |
| | similarities.append(similarity) |
| |
|
| | |
| | motion_distance = np.mean(similarities) |
| |
|
| | return similarities, motion_distance |
| |
|
| |
|
| |
|
| | def load_images_from_folder_to_pil(folder, target_size=(512, 512)): |
| | images = [] |
| | valid_extensions = {".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff"} |
| |
|
| | def frame_number(filename): |
| | |
| | new_pattern_match = re.search(r'frame_(\d+)_7fps', filename) |
| | if new_pattern_match: |
| | return int(new_pattern_match.group(1)) |
| |
|
| | |
| | matches = re.findall(r'\d+', filename) |
| | if matches: |
| | if matches[-1] == '0000' and len(matches) > 1: |
| | return int(matches[-2]) |
| | return int(matches[-1]) |
| | return float('inf') |
| |
|
| | |
| | |
| | sorted_files = sorted(os.listdir(folder)) |
| |
|
| | |
| | for filename in sorted_files: |
| | ext = os.path.splitext(filename)[1].lower() |
| | if ext in valid_extensions: |
| | img_path = os.path.join(folder, filename) |
| | img = cv2.imread(img_path, cv2.IMREAD_UNCHANGED) |
| | if img is not None: |
| | |
| | img = cv2.resize(img, target_size, interpolation=cv2.INTER_AREA) |
| |
|
| | |
| | if img.dtype == np.uint16: |
| | img = (img / 256).astype(np.uint8) |
| |
|
| | |
| | if len(img.shape) == 2: |
| | img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) |
| | elif len(img.shape) == 3 and img.shape[2] == 3: |
| | img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| |
|
| | |
| | pil_img = Image.fromarray(img) |
| | images.append(pil_img) |
| |
|
| | return images |
| |
|
| | def extract_frames_from_video(video_path): |
| |
|
| | video_capture = cv2.VideoCapture(video_path) |
| | |
| | frames = [] |
| | |
| |
|
| | if not video_capture.isOpened(): |
| |
|
| | return frames |
| | |
| | |
| | while True: |
| | ret, frame = video_capture.read() |
| | if not ret: |
| | break |
| | |
| |
|
| | frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| | |
| |
|
| | pil_image = Image.fromarray(frame_rgb) |
| | |
| | frames.append(pil_image) |
| |
|
| | video_capture.release() |
| | |
| | return frames |
| |
|
| |
|
| | def export_gif_side_by_side(ref_frame,sketches, frames, output_gif_path, fps): |
| | """ |
| | Export a list of frames into a GIF with columns and an additional version with only frames. |
| | |
| | Args: |
| | - start_image (PIL.Image): The starting image. |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - end_image (PIL.Image): The ending image. |
| | - reference_image (PIL.Image): The reference image. |
| | - output_gif_path (str): Path to save the output GIF. |
| | - fps (int): Frames per second for the GIF. |
| | """ |
| | |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance(frame, np.ndarray) else frame for frame in frames] |
| | |
| | |
| | width, height = pil_frames[0].size |
| | |
| |
|
| | resized_frames = [frame.resize((width, height)) for frame in pil_frames] |
| | resized_sketches = [sketch.resize((width, height)) for sketch in sketches] |
| | ref_frame=ref_frame.resize((width, height)) |
| | |
| | column_frames = [] |
| | for i, frame in enumerate(resized_frames): |
| | |
| | new_width = resized_sketches[0].width + frame.width+frame.width |
| | combined_frame = Image.new('RGB', (new_width, height)) |
| | |
| | |
| |
|
| | combined_frame.paste(ref_frame, (0, 0)) |
| | combined_frame.paste(resized_sketches[i], (resized_sketches[0].width, 0)) |
| |
|
| | combined_frame.paste(frame, (resized_sketches[0].width+resized_sketches[0].width, 0)) |
| | |
| | column_frames.append(combined_frame) |
| | |
| | |
| | frame_duration = 150 |
| | |
| | |
| | column_frames[0].save(output_gif_path, |
| | format='GIF', |
| | append_images=column_frames[1:], |
| | save_all=True, |
| | duration=frame_duration, |
| | loop=0) |
| | |
| |
|
| | |
| |
|
| | def safe_round(coords, size): |
| | height, width = size[1], size[2] |
| | rounded_coords = np.round(coords).astype(int) |
| | rounded_coords[:, 0] = np.clip(rounded_coords[:, 0], 0, width - 1) |
| | rounded_coords[:, 1] = np.clip(rounded_coords[:, 1], 0, height - 1) |
| | return rounded_coords |
| | def random_number(num_points,size,coords0,coords1): |
| | shuffle_indices = np.random.permutation(np.arange(coords0.shape[0])) |
| |
|
| |
|
| | shuffled_coords0 = coords0[shuffle_indices] |
| | shuffled_coords1 = coords1[shuffle_indices] |
| | indices = np.random.choice(np.arange(shuffled_coords0.shape[0]), size=num_points, replace=False) |
| |
|
| | |
| | |
| | selected_coords0 = shuffled_coords0[indices] |
| | selected_coords1 = shuffled_coords1[indices] |
| | h, w = size[1], size[2] |
| | mask0 = np.zeros((h, w), dtype=np.uint8) |
| | mask1 = np.zeros((h, w), dtype=np.uint8) |
| | for i, (coord0, coord1) in enumerate(zip(selected_coords0, selected_coords1)): |
| | x0, y0 = coord0 |
| | x1, y1 = coord1 |
| | |
| | mask0[y0, x0] = i + 1 |
| | mask1[y1, x1] = i + 1 |
| | return mask0,mask1 |
| |
|
| |
|
| | def split_and_shuffle(image, coordinates): |
| |
|
| | assert image.shape[1] % 2 == 0 and image.shape[2] % 2 == 0, "Height and width must be even." |
| | |
| |
|
| | H, W = image.shape[1], image.shape[2] |
| |
|
| |
|
| | patches_img = [ |
| | image[:, :H//2, :W//2], |
| | image[:, :H//2, W//2:], |
| | image[:, H//2:, :W//2], |
| | image[:, H//2:, W//2:] |
| | ] |
| | |
| | patch_coords = [ |
| | (0, H//2, 0, W//2), |
| | (0, H//2, W//2, W), |
| | (H//2, H, 0, W//2), |
| | (H//2, H, W//2, W) |
| | ] |
| | |
| |
|
| | indices = list(range(4)) |
| | random.shuffle(indices) |
| | |
| |
|
| | new_patch_coords = [ |
| | (0, 0), |
| | (0, W//2), |
| | (H//2, 0), |
| | (H//2, W//2) |
| | ] |
| | |
| | |
| | new_coordinates = np.zeros_like(coordinates) |
| | for i, (r, c) in enumerate(coordinates): |
| | for idx, (r1, r2, c1, c2) in enumerate(patch_coords): |
| | if r1 <= r < r2 and c1 <= c < c2: |
| | new_r = r - r1 + new_patch_coords[indices.index(idx)][0] |
| | new_c = c - c1 + new_patch_coords[indices.index(idx)][1] |
| | new_coordinates[i] = [new_r, new_c] |
| | break |
| |
|
| |
|
| | shuffled_img = torch.cat([ |
| | torch.cat([patches_img[indices[0]], patches_img[indices[1]]], dim=2), |
| | torch.cat([patches_img[indices[2]], patches_img[indices[3]]], dim=2) |
| | ], dim=1) |
| |
|
| | return shuffled_img, new_coordinates |
| |
|
| |
|
| | import os |
| | import cv2 |
| |
|
| | def extract_frames_from_videos(video_folder): |
| |
|
| | for filename in os.listdir(video_folder): |
| | if filename.endswith('.mp4'): |
| | video_path = os.path.join(video_folder, filename) |
| | |
| | frames_folder = os.path.join("processed_video", os.path.splitext(filename)[0]) |
| | os.makedirs(frames_folder, exist_ok=True) |
| | |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | frame_count = 0 |
| | |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | frame_filename = os.path.join(frames_folder, f'frame_{frame_count:04d}.jpg') |
| |
|
| | cv2.imwrite(frame_filename, frame) |
| | frame_count += 1 |
| | |
| | cap.release() |
| | print(f'Extracted {frame_count} frames from {filename} and saved to {frames_folder}') |
| |
|
| |
|
| | def create_videos_from_frames(base_folder, output_folder, frame_rate=30): |
| | |
| | for root, dirs, files in os.walk(base_folder): |
| | frames = [] |
| | for file in sorted(files): |
| | if file.endswith(('.jpg', '.png')): |
| | frame_path = os.path.join(root, file) |
| | frames.append(frame_path) |
| |
|
| | if len(frames) == 14: |
| | video_name = os.path.basename(root) + '.mp4' |
| | video_path = os.path.join(output_folder, video_name) |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | first_frame = cv2.imread(frames[0]) |
| | height, width, layers = first_frame.shape |
| | video_writer = cv2.VideoWriter(video_path, fourcc, frame_rate, (width, height)) |
| |
|
| | for frame in frames: |
| | img = cv2.imread(frame) |
| | video_writer.write(img) |
| |
|
| | video_writer.release() |
| | print(f'Created video: {video_path}') |
| |
|
| | def random_rotate(image, angle_range=(-60, 60)): |
| | angle = random.uniform(*angle_range) |
| | return image.rotate(angle, fillcolor=(255, 255, 255)) |
| |
|
| | def random_crop(image,ratio=0.9): |
| | width, height = image.size |
| | ratio = random.uniform(0.6, 1.0) |
| | |
| | top = random.randint(0, height - int(height*ratio)) |
| | left = random.randint(0, width - int(width*ratio)) |
| | image=image.crop((left, top, left + int( width*ratio), top + int(height*ratio))) |
| | image=image.resize((width,height)) |
| | return image |
| |
|
| | def random_flip(image): |
| | if random.random() < 0.5: |
| | image = image.transpose(Image.FLIP_LEFT_RIGHT) |
| | if random.random() < 0.5: |
| | image = image.transpose(Image.FLIP_TOP_BOTTOM) |
| | return image |
| |
|
| |
|
| | def patch_shuffle(image, num_patches): |
| |
|
| | |
| | C, H, W = image.shape |
| | |
| | assert H % num_patches == 0 and W % num_patches == 0, "Image dimensions must be divisible by num_patches" |
| | |
| | patch_size_h = H // num_patches |
| | patch_size_w = W // num_patches |
| | |
| |
|
| | patches = image.unfold(1, patch_size_h, patch_size_h).unfold(2, patch_size_w, patch_size_w) |
| | patches = patches.contiguous().view(C, num_patches * num_patches, patch_size_h, patch_size_w) |
| | |
| |
|
| | shuffle_idx = torch.randperm(num_patches * num_patches) |
| | shuffled_patches = patches[:, shuffle_idx, :, :] |
| | |
| | |
| | shuffled_patches = shuffled_patches.view(C, num_patches, num_patches, patch_size_h, patch_size_w) |
| | shuffled_image = shuffled_patches.permute(0, 1, 3, 2, 4).contiguous() |
| | shuffled_image = shuffled_image.view(C, H, W) |
| | |
| | return shuffled_image |
| | def augment_image(image,k): |
| |
|
| | image = random_rotate(image) |
| | image = random_crop(image) |
| | image = random_flip(image) |
| | |
| | |
| | |
| | |
| |
|
| | return image |
| |
|
| |
|
| | def load_images_from_folder(folder): |
| | image_list = [] |
| | for filename in os.listdir(folder): |
| | if filename.endswith(".png") or filename.endswith(".jpg") or filename.endswith(".jpeg"): |
| | img_path = os.path.join(folder, filename) |
| | try: |
| | img = Image.open(img_path) |
| | image_list.append(img) |
| | except Exception as e: |
| | print(f"Error loading image {filename}: {e}") |
| | return image_list |
| |
|
| |
|
| | def get_mask(model, input_img, s=640): |
| | input_img = (input_img / 255).astype(np.float32) |
| | h, w = h0, w0 = input_img.shape[:-1] |
| | h, w = (s, int(s * w / h)) if h > w else (int(s * h / w), s) |
| | ph, pw = s - h, s - w |
| | img_input = np.zeros([s, s, 3], dtype=np.float32) |
| | img_input[ph // 2:ph // 2 + h, pw // 2:pw // 2 + w] = cv2.resize(input_img, (w, h)) |
| | img_input = np.transpose(img_input, (2, 0, 1)) |
| | img_input = img_input[np.newaxis, :] |
| | tmpImg = torch.from_numpy(img_input).type(torch.FloatTensor).to(model.device) |
| | with torch.no_grad(): |
| |
|
| | pred = model(tmpImg) |
| | pred = pred.cpu().numpy()[0] |
| | pred = np.transpose(pred, (1, 2, 0)) |
| | pred = pred[ph // 2:ph // 2 + h, pw // 2:pw // 2 + w] |
| | pred = cv2.resize(pred, (w0, h0))[:, :, np.newaxis] |
| | return pred |
| |
|
| |
|
| | |
| |
|
| | def safe_round(coords, size): |
| | height, width = size[1], size[2] |
| | rounded_coords = np.round(coords).astype(int) |
| | rounded_coords[:, 0] = np.clip(rounded_coords[:, 0], 0, width - 1) |
| | rounded_coords[:, 1] = np.clip(rounded_coords[:, 1], 0, height - 1) |
| | return rounded_coords |
| | def random_number(num_points,size,coords0,coords1): |
| | shuffle_indices = np.random.permutation(np.arange(coords0.shape[0])) |
| |
|
| |
|
| | shuffled_coords0 = coords0[shuffle_indices] |
| | shuffled_coords1 = coords1[shuffle_indices] |
| | indices = np.random.choice(np.arange(shuffled_coords0.shape[0]), size=num_points, replace=False) |
| |
|
| | |
| | |
| | selected_coords0 = shuffled_coords0[indices] |
| | selected_coords1 = shuffled_coords1[indices] |
| | h, w = size[1], size[2] |
| | mask0 = np.zeros((h, w), dtype=np.uint8) |
| | mask1 = np.zeros((h, w), dtype=np.uint8) |
| | for i, (coord0, coord1) in enumerate(zip(selected_coords0, selected_coords1)): |
| | x0, y0 = coord0 |
| | x1, y1 = coord1 |
| | |
| | mask0[y0, x0] = i + 1 |
| | mask1[y1, x1] = i + 1 |
| | return mask0,mask1 |
| |
|
| |
|
| | import torch |
| |
|
| | def split_and_shuffle(image, keypoints, num_rows, num_cols): |
| | """ |
| | Split the image into tiles, shuffle them, and update the keypoints accordingly. |
| | |
| | Parameters: |
| | - image: Tensor of shape (3, H, W) |
| | - keypoints: Tensor of shape (num_k, 2) |
| | - num_rows: int, number of rows to split |
| | - num_cols: int, number of columns to split |
| | |
| | Returns: |
| | - shuffled_image: Tensor of shape (3, H, W) |
| | - new_keypoints: Tensor of shape (num_k, 2) |
| | """ |
| | C, H, W = image.shape |
| |
|
| | |
| | pad_h = (num_rows - H % num_rows) % num_rows |
| | pad_w = (num_cols - W % num_cols) % num_cols |
| |
|
| | |
| | H_padded = H + pad_h |
| | W_padded = W + pad_w |
| | padded_image = torch.zeros((C, H_padded, W_padded), dtype=image.dtype).to(image.device) |
| | padded_image[:, :H, :W] = image |
| |
|
| | |
| | tile_height = H_padded // num_rows |
| | tile_width = W_padded // num_cols |
| |
|
| | |
| | tiles = padded_image.reshape(C, |
| | num_rows, |
| | tile_height, |
| | num_cols, |
| | tile_width) |
| | tiles = tiles.permute(1, 3, 0, 2, 4).contiguous() |
| | num_tiles = num_rows * num_cols |
| | tiles = tiles.view(num_tiles, C, tile_height, tile_width) |
| |
|
| | |
| | idx_shuffle = torch.randperm(num_tiles).to(image.device) |
| | tiles_shuffled = tiles[idx_shuffle] |
| |
|
| | |
| | tiles_shuffled = tiles_shuffled.view(num_rows, num_cols, C, tile_height, tile_width) |
| | shuffled_image = tiles_shuffled.permute(2, 0, 3, 1, 4).contiguous() |
| | shuffled_image = shuffled_image.view(C, H_padded, W_padded) |
| | shuffled_image = shuffled_image[:, :H, :W] |
| |
|
| | |
| | x = keypoints[:, 0] |
| | y = keypoints[:, 1] |
| |
|
| | |
| | tile_rows = (y / tile_height).long() |
| | tile_cols = (x / tile_width).long() |
| | tile_indices = tile_rows * num_cols + tile_cols |
| |
|
| | |
| | idx_unshuffle = torch.argsort(idx_shuffle) |
| |
|
| | |
| | new_tile_indices = idx_unshuffle[tile_indices] |
| | new_tile_rows = new_tile_indices // num_cols |
| | new_tile_cols = new_tile_indices % num_cols |
| |
|
| | |
| | offset_x = x % tile_width |
| | offset_y = y % tile_height |
| |
|
| | |
| | new_x = new_tile_cols * tile_width + offset_x |
| | new_y = new_tile_rows * tile_height + offset_y |
| |
|
| | |
| | new_x = new_x.clamp(0, W - 1) |
| | new_y = new_y.clamp(0, H - 1) |
| |
|
| | new_keypoints = torch.stack([new_x, new_y], dim=1) |
| |
|
| | return shuffled_image, new_keypoints |
| |
|
| | def generate_point_map(size, coords0, coords1): |
| |
|
| | h, w = size[1], size[2] |
| | mask0 = np.zeros((h, w), dtype=np.uint8) |
| | mask1 = np.zeros((h, w), dtype=np.uint8) |
| | for i, (coord0, coord1) in enumerate(zip(coords0, coords1)): |
| | x0, y0 = coord0 |
| | x1, y1 = coord1 |
| | |
| | x0, y0 = int(round(x0)), int(round(y0)) |
| | x1, y1 = int(round(x1)), int(round(y1)) |
| | |
| | if 0 <= x0 < w and 0 <= y0 < h: |
| | mask0[y0, x0] = i + 1 |
| | if 0 <= x1 < w and 0 <= y1 < h: |
| | mask1[y1, x1] = i + 1 |
| | return mask0, mask1 |
| |
|
| |
|
| | def select_multiple_points(points0, points1, num_points): |
| | |
| | N = len(points0) |
| | num_points = min(num_points, N) |
| | indices = np.random.choice(N, size=num_points, replace=False) |
| | selected_points0 = points0[indices] |
| | selected_points1 = points1[indices] |
| | return selected_points0, selected_points1 |
| |
|
| | def generate_point_map_frames(size, coords0, coords1,visibility): |
| |
|
| | h, w = size[1], size[2] |
| | mask0 = np.zeros((h, w), dtype=np.uint8) |
| | num_frames = coords1.shape[0] |
| | mask1 = np.zeros((num_frames, h, w), dtype=np.uint8) |
| |
|
| | for i, coord0 in enumerate(coords0): |
| | x0, y0 = coord0 |
| | x0, y0 = int(round(x0)), int(round(y0)) |
| | if 0 <= x0 < w and 0 <= y0 < h: |
| | mask0[y0, x0] = i + 1 |
| |
|
| | for frame_idx in range(num_frames): |
| | coords_frame = coords1[frame_idx] |
| | for i, coord1 in enumerate(coords_frame): |
| | x1, y1 = coord1 |
| | x1, y1 = int(round(x1)), int(round(y1)) |
| | if 0 <= x1 < w and 0 <= y1 < h and visibility[frame_idx,i]==True: |
| | mask1[frame_idx, y1, x1] = i + 1 |
| |
|
| | return mask0, mask1 |
| |
|
| |
|
| |
|
| | import numpy as np |
| |
|
| | def extract_patches(image, coords, patch_size): |
| |
|
| | N = coords.shape[0] |
| | channels, H, W = image.shape |
| | patches = np.zeros((N, channels, patch_size, patch_size), dtype=image.dtype) |
| | half_size = patch_size // 2 |
| |
|
| | for i in range(N): |
| | x0, y0 = coords[i] |
| | x0 = int(round(x0)) |
| | y0 = int(round(y0)) |
| |
|
| | |
| | x_start_img = x0 - half_size |
| | x_end_img = x0 + half_size + 1 |
| | y_start_img = y0 - half_size |
| | y_end_img = y0 + half_size + 1 |
| |
|
| | |
| | x_start_patch = 0 |
| | y_start_patch = 0 |
| | x_end_patch = patch_size |
| | y_end_patch = patch_size |
| |
|
| | |
| | if x_start_img < 0: |
| | x_start_patch = -x_start_img |
| | x_start_img = 0 |
| | if y_start_img < 0: |
| | y_start_patch = -y_start_img |
| | y_start_img = 0 |
| | if x_end_img > W: |
| | x_end_patch -= (x_end_img - W) |
| | x_end_img = W |
| | if y_end_img > H: |
| | y_end_patch -= (y_end_img - H) |
| | y_end_img = H |
| |
|
| | |
| | patch_height = y_end_patch - y_start_patch |
| | patch_width = x_end_patch - x_start_patch |
| | img_height = y_end_img - y_start_img |
| | img_width = x_end_img - x_start_img |
| |
|
| | |
| | if patch_height != img_height or patch_width != img_width: |
| | min_height = min(patch_height, img_height) |
| | min_width = min(patch_width, img_width) |
| | y_end_patch = y_start_patch + min_height |
| | y_end_img = y_start_img + min_height |
| | x_end_patch = x_start_patch + min_width |
| | x_end_img = x_start_img + min_width |
| |
|
| | |
| | patches[i, :, y_start_patch:y_end_patch, x_start_patch:x_end_patch] = \ |
| | image[:, y_start_img:y_end_img, x_start_img:x_end_img] |
| |
|
| | return patches |
| |
|
| | def generate_point_feature_map_frames_naive(image, size, coords0, coords1, visibility, patch_size): |
| |
|
| | channels, H, W = size |
| | num_frames = coords1.shape[0] |
| | N = coords0.shape[0] |
| |
|
| | |
| | patches = extract_patches(image, coords0, patch_size) |
| | half_size = patch_size // 2 |
| |
|
| | |
| | feature_maps = np.zeros((num_frames, channels, H, W), dtype=image.dtype) |
| |
|
| | for frame_idx in range(num_frames): |
| | feature_map = np.zeros((channels, H, W), dtype=image.dtype) |
| | coords_frame = coords1[frame_idx] |
| |
|
| | for i in range(N): |
| | if visibility[frame_idx, i]: |
| | x1, y1 = coords_frame[i] |
| | x1 = int(round(x1)) |
| | y1 = int(round(y1)) |
| |
|
| | |
| | x_start_map = x1 - half_size |
| | x_end_map = x1 + half_size + 1 |
| | y_start_map = y1 - half_size |
| | y_end_map = y1 + half_size + 1 |
| |
|
| | |
| | x_start_patch = 0 |
| | y_start_patch = 0 |
| | x_end_patch = patch_size |
| | y_end_patch = patch_size |
| |
|
| | |
| | if x_start_map < 0: |
| | x_start_patch = -x_start_map |
| | x_start_map = 0 |
| | if y_start_map < 0: |
| | y_start_patch = -y_start_map |
| | y_start_map = 0 |
| | if x_end_map > W: |
| | x_end_patch -= (x_end_map - W) |
| | x_end_map = W |
| | if y_end_map > H: |
| | y_end_patch -= (y_end_map - H) |
| | y_end_map = H |
| |
|
| | |
| | patch_height = y_end_patch - y_start_patch |
| | patch_width = x_end_patch - x_start_patch |
| | map_height = y_end_map - y_start_map |
| | map_width = x_end_map - x_start_map |
| |
|
| | |
| | if patch_height != map_height or patch_width != map_width: |
| | min_height = min(patch_height, map_height) |
| | min_width = min(patch_width, map_width) |
| | y_end_patch = y_start_patch + min_height |
| | y_end_map = y_start_map + min_height |
| | x_end_patch = x_start_patch + min_width |
| | x_end_map = x_start_map + min_width |
| |
|
| | |
| | feature_map[:, y_start_map:y_end_map, x_start_map:x_end_map] = \ |
| | patches[i, :, y_start_patch:y_end_patch, x_start_patch:x_end_patch] |
| |
|
| | feature_maps[frame_idx] = feature_map |
| |
|
| | return feature_maps |
| |
|
| |
|
| | import os |
| | from PIL import Image |
| | import numpy as np |
| | from moviepy.editor import ImageSequenceClip |
| |
|
| | def export_gif_side_by_side_complete(ref_frame, sketches, frames, output_gif_path, supp_dir,fps): |
| | """ |
| | Export frames into a GIF and an MP4 video with columns, and save individual frames and sketches. |
| | |
| | Args: |
| | - ref_frame (PIL.Image or np.ndarray): The reference image. |
| | - sketches (list): List of sketch images (as numpy arrays or PIL Image objects). |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - output_gif_path (str): Path to save the output GIF. |
| | - fps (int): Frames per second for the GIF and MP4. |
| | """ |
| | |
| | output_dir = os.path.dirname(output_gif_path) |
| | if not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| | |
| | |
| | base_name = os.path.splitext(os.path.basename(output_gif_path))[0] |
| |
|
| | |
| | sketch_dir = os.path.join(supp_dir,"sketches") |
| | frame_dir = os.path.join(supp_dir,"frames") |
| | os.makedirs(sketch_dir, exist_ok=True) |
| | os.makedirs(frame_dir, exist_ok=True) |
| | |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance(frame, np.ndarray) else frame for frame in frames] |
| | pil_sketches = [Image.fromarray(sketch) if isinstance(sketch, np.ndarray) else sketch for sketch in sketches] |
| | ref_frame = Image.fromarray(ref_frame) if isinstance(ref_frame, np.ndarray) else ref_frame |
| | |
| | |
| | width, height = pil_frames[0].size |
| | |
| | |
| | resized_frames = [frame.resize((width, height)) for frame in pil_frames] |
| | resized_sketches = [sketch.resize((width, height)) for sketch in pil_sketches] |
| | ref_frame = ref_frame.resize((width, height)) |
| | |
| | |
| | for i, sketch in enumerate(resized_sketches): |
| | sketch_filename = os.path.join(sketch_dir, f"{base_name}_sketch_{i:04d}.png") |
| | sketch.save(sketch_filename) |
| | |
| | |
| | for i, frame in enumerate(resized_frames): |
| | frame_filename = os.path.join(frame_dir, f"{base_name}_frame_{i:04d}.png") |
| | frame.save(frame_filename) |
| | |
| | |
| | ref_filename = os.path.join(supp_dir, f"{base_name}_reference.png") |
| | ref_frame.save(ref_filename) |
| | |
| | |
| | column_frames = [] |
| | for i, frame in enumerate(resized_frames): |
| | |
| | new_width = ref_frame.width + resized_sketches[i].width + frame.width |
| | combined_frame = Image.new('RGB', (new_width, height)) |
| | |
| | |
| | combined_frame.paste(ref_frame, (0, 0)) |
| | combined_frame.paste(resized_sketches[i], (ref_frame.width, 0)) |
| | combined_frame.paste(frame, (ref_frame.width + resized_sketches[i].width, 0)) |
| | |
| | column_frames.append(combined_frame) |
| | |
| | |
| | frame_duration = int(1000 / fps) |
| | |
| | |
| | column_frames[0].save(output_gif_path, |
| | format='GIF', |
| | append_images=column_frames[1:], |
| | save_all=True, |
| | duration=frame_duration, |
| | loop=0) |
| | |
| | |
| | output_mp4_path = os.path.join(supp_dir , 'result.mp4') |
| | |
| | video_frames = [np.array(frame) for frame in column_frames] |
| | clip = ImageSequenceClip(video_frames, fps=fps) |
| | clip.write_videofile(output_mp4_path, codec='libx264') |
| |
|
| |
|
| |
|
| | def export_gif_with_ref_complete(start_image, frames, end_image, reference_image, output_gif_path, supp_dir, fps): |
| | """ |
| | Export a list of frames into a GIF with columns, save individual images and frames, |
| | and create an MP4 video, following the storage method of 'export_gif_side_by_side_complete'. |
| | |
| | Args: |
| | - start_image (PIL.Image or np.ndarray): The starting image. |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - end_image (PIL.Image or np.ndarray): The ending image. |
| | - reference_image (PIL.Image or np.ndarray): The reference image. |
| | - output_gif_path (str): Path to save the output GIF. |
| | - supp_dir (str): Directory to save supplementary files. |
| | - fps (int): Frames per second for the GIF and MP4. |
| | """ |
| | |
| | output_dir = os.path.dirname(output_gif_path) |
| | if not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| | |
| | |
| | base_name = os.path.splitext(os.path.basename(output_gif_path))[0] |
| |
|
| | |
| | start_end_dir = os.path.join(supp_dir, "start_end_images") |
| | frame_dir = os.path.join(supp_dir, "frames") |
| | reference_dir = os.path.join(supp_dir, "reference") |
| | os.makedirs(start_end_dir, exist_ok=True) |
| | os.makedirs(frame_dir, exist_ok=True) |
| | os.makedirs(reference_dir, exist_ok=True) |
| | |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance(frame, np.ndarray) else frame for frame in frames] |
| | start_image = Image.fromarray(start_image) if isinstance(start_image, np.ndarray) else start_image |
| | end_image = Image.fromarray(end_image) if isinstance(end_image, np.ndarray) else end_image |
| | reference_image = Image.fromarray(reference_image) if isinstance(reference_image, np.ndarray) else reference_image |
| | |
| | |
| | width, height = start_image.size |
| | |
| | |
| | reference_image = reference_image.resize((reference_image.width, height)) |
| | resized_frames = [frame.resize((frame.width, height)) for frame in pil_frames] |
| | |
| | |
| | start_image_filename = os.path.join(start_end_dir, f"{base_name}_start.png") |
| | start_image.save(start_image_filename) |
| | end_image_filename = os.path.join(start_end_dir, f"{base_name}_end.png") |
| | end_image.save(end_image_filename) |
| | reference_image_filename = os.path.join(reference_dir, f"{base_name}_reference.png") |
| | reference_image.save(reference_image_filename) |
| | |
| | |
| | for i, frame in enumerate(resized_frames): |
| | frame_filename = os.path.join(frame_dir, f"{base_name}_frame_{i:04d}.png") |
| | frame.save(frame_filename) |
| | |
| | |
| | column_frames = [] |
| | for i, frame in enumerate(resized_frames): |
| | |
| | new_width = start_image.width + reference_image.width + end_image.width + frame.width |
| | combined_frame = Image.new('RGB', (new_width, height)) |
| | |
| | |
| | combined_frame.paste(start_image, (0, 0)) |
| | combined_frame.paste(reference_image, (start_image.width, 0)) |
| | combined_frame.paste(end_image, (start_image.width + reference_image.width, 0)) |
| | combined_frame.paste(frame, (start_image.width + reference_image.width + end_image.width, 0)) |
| | |
| | column_frames.append(combined_frame) |
| | |
| | |
| | frame_duration = int(1000 / fps) |
| | |
| | |
| | column_frames[0].save(output_gif_path, |
| | format='GIF', |
| | append_images=column_frames[1:], |
| | save_all=True, |
| | duration=frame_duration, |
| | loop=0) |
| | |
| | |
| | output_mp4_path = os.path.join(supp_dir, 'result.mp4') |
| | |
| | video_frames = [np.array(frame) for frame in column_frames] |
| | clip = ImageSequenceClip(video_frames, fps=fps) |
| | clip.write_videofile(output_mp4_path, codec='libx264') |
| |
|
| |
|
| | def export_gif_side_by_side_complete_ablation(ref_frame, sketches, frames, output_gif_path, supp_dir,fps): |
| | """ |
| | Export frames into a GIF and an MP4 video with columns, and save individual frames and sketches. |
| | |
| | Args: |
| | - ref_frame (PIL.Image or np.ndarray): The reference image. |
| | - sketches (list): List of sketch images (as numpy arrays or PIL Image objects). |
| | - frames (list): List of frames (as numpy arrays or PIL Image objects). |
| | - output_gif_path (str): Path to save the output GIF. |
| | - fps (int): Frames per second for the GIF and MP4. |
| | """ |
| | |
| | output_dir = os.path.dirname(output_gif_path) |
| | if not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| | |
| | |
| | base_name = os.path.splitext(os.path.basename(output_gif_path))[0] |
| |
|
| | |
| | sketch_dir = os.path.join(supp_dir,"sketches") |
| | frame_dir = os.path.join(supp_dir,"frames") |
| | os.makedirs(sketch_dir, exist_ok=True) |
| | os.makedirs(frame_dir, exist_ok=True) |
| | |
| | |
| | pil_frames = [Image.fromarray(frame) if isinstance(frame, np.ndarray) else frame for frame in frames] |
| | pil_sketches = [Image.fromarray(sketch) if isinstance(sketch, np.ndarray) else sketch for sketch in sketches] |
| | ref_frame = Image.fromarray(ref_frame) if isinstance(ref_frame, np.ndarray) else ref_frame |
| | |
| | |
| | width, height = pil_frames[0].size |
| | |
| | |
| | resized_frames = [frame.resize((width, height)) for frame in pil_frames] |
| | resized_sketches = [sketch.resize((width, height)) for sketch in pil_sketches] |
| | ref_frame = ref_frame.resize((width, height)) |
| | |
| | |
| | for i, sketch in enumerate(resized_sketches): |
| | sketch_filename = os.path.join(sketch_dir, f"{base_name}_sketch_{i:04d}.png") |
| | sketch.save(sketch_filename) |
| | |
| | |
| | for i, frame in enumerate(resized_frames): |
| | frame_filename = os.path.join(frame_dir, f"{base_name}_frame_{i:04d}.png") |
| | frame.save(frame_filename) |
| | |
| | |
| | ref_filename = os.path.join(supp_dir, f"{base_name}_reference.png") |
| | ref_frame.save(ref_filename) |
| | |
| | |
| | column_frames = [] |
| | rgb_frames = [] |
| | for i, frame in enumerate(resized_frames): |
| | |
| | new_width = ref_frame.width + resized_sketches[i].width + frame.width |
| | combined_frame = Image.new('RGB', (new_width, height)) |
| | |
| | |
| | combined_frame.paste(ref_frame, (0, 0)) |
| | combined_frame.paste(resized_sketches[i], (ref_frame.width, 0)) |
| | combined_frame.paste(frame, (ref_frame.width + resized_sketches[i].width, 0)) |
| | |
| | column_frames.append(combined_frame) |
| | rgb_frames.append(frame) |
| | |
| | |
| | frame_duration = int(1000 / fps) |
| | |
| | |
| | column_frames[0].save(output_gif_path, |
| | format='GIF', |
| | append_images=column_frames[1:], |
| | save_all=True, |
| | duration=frame_duration, |
| | loop=0) |
| | |
| | |
| | output_mp4_path = supp_dir+'.mp4' |
| | |
| | video_frames = [np.array(frame) for frame in column_frames] |
| | rgb_frames = [np.array(frame) for frame in rgb_frames] |
| | clip = ImageSequenceClip(rgb_frames, fps=fps) |
| | clip.write_videofile(output_mp4_path, codec='libx264') |