import os import random import cv2 import glob import numpy as np from PIL import Image, ImageEnhance from megengine.data.dataset import Dataset class Augmentor: def __init__( self, image_height=384, image_width=512, max_disp=256, scale_min=0.6, scale_max=1.0, seed=0, ): super().__init__() self.image_height = image_height self.image_width = image_width self.max_disp = max_disp self.scale_min = scale_min self.scale_max = scale_max self.rng = np.random.RandomState(seed) def chromatic_augmentation(self, img): random_brightness = np.random.uniform(0.8, 1.2) random_contrast = np.random.uniform(0.8, 1.2) random_gamma = np.random.uniform(0.8, 1.2) img = Image.fromarray(img) enhancer = ImageEnhance.Brightness(img) img = enhancer.enhance(random_brightness) enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(random_contrast) gamma_map = [ 255 * 1.0 * pow(ele / 255.0, random_gamma) for ele in range(256) ] * 3 img = img.point(gamma_map) # use PIL's point-function to accelerate this part img_ = np.array(img) return img_ def __call__(self, left_img, right_img, left_disp): # 1. chromatic augmentation # left_img = self.chromatic_augmentation(left_img) # right_img = self.chromatic_augmentation(right_img) # 2. spatial augmentation # 2.1) rotate & vertical shift for right image if self.rng.binomial(1, 0.5): angle, pixel = 0.1, 2 px = self.rng.uniform(-pixel, pixel) ag = self.rng.uniform(-angle, angle) image_center = ( self.rng.uniform(0, right_img.shape[0]), self.rng.uniform(0, right_img.shape[1]), ) rot_mat = cv2.getRotationMatrix2D(image_center, ag, 1.0) # right_img = right_img.transpose(2, 1, 0) right_img = cv2.warpAffine( right_img, rot_mat, right_img.shape[1::-1], flags=cv2.INTER_LINEAR ) trans_mat = np.float32([[1, 0, 0], [0, 1, px]]) right_img = cv2.warpAffine( right_img, trans_mat, right_img.shape[1::-1], flags=cv2.INTER_LINEAR ) # right_img = right_img.transpose(1, 2, 0) # 2.2) random resize resize_scale = self.rng.uniform(self.scale_min, self.scale_max) left_img = cv2.resize( left_img, None, fx=resize_scale, fy=resize_scale, interpolation=cv2.INTER_LINEAR, ) if len(left_img.shape) == 2: left_img.shape += 1, # left_img = cv2.merge([left_img, left_img, left_img]) right_img = cv2.resize( right_img, None, fx=resize_scale, fy=resize_scale, interpolation=cv2.INTER_LINEAR, ) if len(left_img.shape) == 2: left_img.shape += 1, # left_img = cv2.merge([left_img, left_img, left_img]) disp_mask = (left_disp < float(self.max_disp / resize_scale)) & (left_disp > 0) disp_mask = disp_mask.astype("float32") disp_mask = cv2.resize( disp_mask, None, fx=resize_scale, fy=resize_scale, interpolation=cv2.INTER_LINEAR, ) left_disp = ( cv2.resize( left_disp, None, fx=resize_scale, fy=resize_scale, interpolation=cv2.INTER_LINEAR, ) * resize_scale ) # 2.3) random crop if len(left_img.shape) == 3: h, w, c = left_img.shape else: h, w = left_img.shape c = 1 dx = w - self.image_width dy = h - self.image_height dy = self.rng.randint(min(0, dy), max(0, dy) + 1) dx = self.rng.randint(min(0, dx), max(0, dx) + 1) M = np.float32([[1.0, 0.0, -dx], [0.0, 1.0, -dy]]) left_img = cv2.warpAffine( left_img, M, (self.image_width, self.image_height), flags=cv2.INTER_LINEAR, borderValue=0, ) right_img = cv2.warpAffine( right_img, M, (self.image_width, self.image_height), flags=cv2.INTER_LINEAR, borderValue=0, ) left_disp = cv2.warpAffine( left_disp, M, (self.image_width, self.image_height), flags=cv2.INTER_LINEAR, borderValue=0, ) disp_mask = cv2.warpAffine( disp_mask, M, (self.image_width, self.image_height), flags=cv2.INTER_LINEAR, borderValue=0, ) # 3. add random occlusion to right image if self.rng.binomial(1, 0.5): sx = int(self.rng.uniform(50, 100)) sy = int(self.rng.uniform(50, 100)) cx = int(self.rng.uniform(sx, right_img.shape[0] - sx)) cy = int(self.rng.uniform(sy, right_img.shape[1] - sy)) right_img[cx - sx : cx + sx, cy - sy : cy + sy] = np.mean( np.mean(right_img, 0), 0 )[np.newaxis, np.newaxis] if len(left_img.shape) == 2: left_img = cv2.merge([left_img, left_img, left_img]) return left_img, right_img, left_disp, disp_mask class CREStereoDataset(Dataset): def __init__(self, root): super().__init__() self.imgs = glob.glob(os.path.join(root, "**/*_left.jpg"), recursive=True) self.augmentor = Augmentor( image_height=384, image_width=512, max_disp=256, scale_min=0.6, scale_max=1.0, seed=0, ) self.rng = np.random.RandomState(0) def get_disp(self, path): disp = cv2.imread(path, cv2.IMREAD_UNCHANGED) return disp.astype(np.float32) / 32 def __getitem__(self, index): # find path left_path = self.imgs[index] prefix = left_path[: left_path.rfind("_")] right_path = prefix + "_right.jpg" left_disp_path = prefix + "_left.disp.png" right_disp_path = prefix + "_right.disp.png" # read img, disp left_img = cv2.imread(left_path, cv2.IMREAD_COLOR) right_img = cv2.imread(right_path, cv2.IMREAD_COLOR) left_disp = self.get_disp(left_disp_path) right_disp = self.get_disp(right_disp_path) if self.rng.binomial(1, 0.5): left_img, right_img = np.fliplr(right_img), np.fliplr(left_img) left_disp, right_disp = np.fliplr(right_disp), np.fliplr(left_disp) left_disp[left_disp == np.inf] = 0 # augmentaion # left_img, right_img, left_disp, disp_mask = self.augmentor( _, _, left_disp, disp_mask = self.augmentor( left_img, right_img, left_disp ) left_img = left_img.transpose(2, 0, 1).astype("uint8") right_img = right_img.transpose(2, 0, 1).astype("uint8") return { "left": left_img, "right": right_img, "disparity": left_disp, "mask": disp_mask, } def __len__(self): return len(self.imgs) class CTDDataset(Dataset): def __init__(self, root, pattern_path: str, data_type: str = 'syn', augment=True, resize_pattern=True, blur=False, split=0.9, test_set=False): super().__init__() self.rng = np.random.RandomState(0) self.augment = augment self.blur = blur imgs = glob.glob(os.path.join(root, f"{data_type}/*/im0_*.npy"), recursive=True) if test_set: self.imgs = imgs[:int(split * len(imgs))] else: self.imgs = imgs[int(split * len(imgs)):] self.pattern = cv2.imread(pattern_path)#, cv2.IMREAD_GRAYSCALE) if resize_pattern and self.pattern.shape != (480, 640, 3): # self.pattern = cv2.resize(self.pattern, (640, 480)) print(self.pattern.shape) downsampled = cv2.pyrDown(self.pattern) diff = (downsampled.shape[0] - 480) // 2 self.pattern = downsampled[diff:downsampled.shape[0]-diff, 0:downsampled.shape[1]] self.augmentor = Augmentor( image_height=480, image_width=640, max_disp=256, scale_min=0.6, scale_max=1.0, seed=0, ) def get_disp(self, path): # disp = cv2.imread(path, cv2.IMREAD_UNCHANGED) disp = np.load(path).transpose(1,2,0) # return disp.astype(np.float32) / 32 return disp def __getitem__(self, index): # find path left_path = self.imgs[index] left_disp_path = left_path.replace('im', 'disp') # read img, disp left_img = np.load(left_path) if left_img.dtype == 'float32': left_img = (left_img * 255).astype('uint8') left_img = cv2.merge([left_img, left_img, left_img]).reshape((480, 640, 3)) right_img = self.pattern left_disp = self.get_disp(left_disp_path) if False: # self.rng.binomial(1, 0.5): left_img, right_img = np.fliplr(right_img), np.fliplr(left_img) left_disp, right_disp = np.fliplr(right_disp), np.fliplr(left_disp) left_disp[left_disp == np.inf] = 0 if self.blur: kernel_size = random.sample([1,3,5,7,9], 1)[0] kernel = (kernel_size, kernel_size) left_img = cv2.GaussianBlur(left_img, kernel, 0) # augmentation if not self.augment: _left_img, _right_img, _left_disp, disp_mask = self.augmentor( left_img, right_img, left_disp ) else: left_img, right_img, left_disp, disp_mask = self.augmentor( left_img, right_img, left_disp ) right_img = right_img.transpose((2, 0, 1)).astype("uint8") return { "left": left_img, "right": right_img, "disparity": left_disp, "mask": disp_mask, } def __len__(self): return len(self.imgs) class BlenderDataset(CTDDataset): def __init__(self, root, pattern_path: str, data_type: str = 'syn', augment=True, resize_pattern=True, blur=False, split=0.9, test_set=False, use_lightning=False): super().__init__(root, pattern_path) self.use_lightning = use_lightning imgs = [f for f in glob.glob(f"{root}/im_*.png", recursive=True) if not 'depth0001' in f] if test_set: self.imgs = imgs[:int(split * len(imgs))] else: self.imgs = imgs[int(split * len(imgs)):] self.pattern = cv2.imread(pattern_path)#, cv2.IMREAD_GRAYSCALE) if resize_pattern and self.pattern.shape != (480, 640, 3): downsampled = cv2.pyrDown(self.pattern) diff = (downsampled.shape[0] - 480) // 2 self.pattern = downsampled[diff:downsampled.shape[0]-diff, 0:downsampled.shape[1]] self.augmentor = Augmentor( image_height=480, image_width=640, max_disp=256, scale_min=0.6, scale_max=1.0, seed=0, ) def __getitem__(self, index): # find path left_path = self.imgs[index] left_disp_path = left_path.split('.')[0] + '_depth0001.png' # read img, disp left_img = cv2.imread(left_path) if left_img.dtype == 'float32': left_img = (left_img * 255).astype('uint8') if left_img.shape != (480, 640, 3): downsampled = cv2.pyrDown(left_img) diff = (downsampled.shape[0] - 480) // 2 left_img = downsampled[diff:downsampled.shape[0]-diff, 0:downsampled.shape[1]] if left_img.shape[-1] != 3: left_img = cv2.merge([left_img, left_img, left_img]).reshape((480, 640, 3)) right_img = self.pattern left_disp = self.get_disp(left_disp_path) if False: # self.rng.binomial(1, 0.5): left_img, right_img = np.fliplr(right_img), np.fliplr(left_img) left_disp, right_disp = np.fliplr(right_disp), np.fliplr(left_disp) left_disp[left_disp == np.inf] = 0 if self.blur: kernel_size = random.sample([1,3,5,7,9], 1)[0] kernel = (kernel_size, kernel_size) left_img = cv2.GaussianBlur(left_img, kernel, 0) # augmentation if not self.augment: _left_img, _right_img, _left_disp, disp_mask = self.augmentor( left_img, right_img, left_disp ) else: left_img, right_img, left_disp, disp_mask = self.augmentor( left_img, right_img, left_disp ) if not self.use_lightning: # right_img = right_img.transpose((2, 0, 1)).astype("uint8") return { "left": left_img, "right": right_img, "disparity": left_disp, "mask": disp_mask, } right_img = right_img.transpose((2, 0, 1)).astype("uint8") left_img = left_img.transpose((2, 0, 1)).astype("uint8") return left_img, right_img, left_disp, disp_mask def get_disp(self, path): baseline = 0.075 # meters fl = 560. # as per CTD depth = cv2.imread(path, cv2.IMREAD_UNCHANGED) downsampled = cv2.pyrDown(depth) diff = (downsampled.shape[0] - 480) // 2 depth = downsampled[diff:downsampled.shape[0]-diff, 0:downsampled.shape[1]] # disp = np.load(path).transpose(1,2,0) # disp = baseline * fl / depth # return disp.astype(np.float32) / 32 # FIXME temporarily increase disparity until new data with better depth values is generated # higher values seem to speedup convergence, but introduce much stronger artifacting mystery_factor = 150 # mystery_factor = 1 disp = (baseline * fl * mystery_factor) / depth return disp.astype(np.float32)