123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- import numpy as np
- import random
- import torch
- from basicsr.data.degradations import random_add_gaussian_noise_pt, random_add_poisson_noise_pt
- from basicsr.data.transforms import paired_random_crop
- from basicsr.models.sr_model import SRModel
- from basicsr.utils import DiffJPEG, USMSharp
- from basicsr.utils.img_process_util import filter2D
- from basicsr.utils.registry import MODEL_REGISTRY
- from torch.nn import functional as F
- @MODEL_REGISTRY.register()
- class RealESRNetModel(SRModel):
- """RealESRNet Model for Real-ESRGAN: Training Real-World Blind Super-Resolution with Pure Synthetic Data.
- It is trained without GAN losses.
- It mainly performs:
- 1. randomly synthesize LQ images in GPU tensors
- 2. optimize the networks with GAN training.
- """
- def __init__(self, opt):
- super(RealESRNetModel, self).__init__(opt)
- self.jpeger = DiffJPEG(differentiable=False).cuda() # simulate JPEG compression artifacts
- self.usm_sharpener = USMSharp().cuda() # do usm sharpening
- self.queue_size = opt.get('queue_size', 180)
- @torch.no_grad()
- def _dequeue_and_enqueue(self):
- """It is the training pair pool for increasing the diversity in a batch.
- Batch processing limits the diversity of synthetic degradations in a batch. For example, samples in a
- batch could not have different resize scaling factors. Therefore, we employ this training pair pool
- to increase the degradation diversity in a batch.
- """
- # initialize
- b, c, h, w = self.lq.size()
- if not hasattr(self, 'queue_lr'):
- assert self.queue_size % b == 0, f'queue size {self.queue_size} should be divisible by batch size {b}'
- self.queue_lr = torch.zeros(self.queue_size, c, h, w).cuda()
- _, c, h, w = self.gt.size()
- self.queue_gt = torch.zeros(self.queue_size, c, h, w).cuda()
- self.queue_ptr = 0
- if self.queue_ptr == self.queue_size: # the pool is full
- # do dequeue and enqueue
- # shuffle
- idx = torch.randperm(self.queue_size)
- self.queue_lr = self.queue_lr[idx]
- self.queue_gt = self.queue_gt[idx]
- # get first b samples
- lq_dequeue = self.queue_lr[0:b, :, :, :].clone()
- gt_dequeue = self.queue_gt[0:b, :, :, :].clone()
- # update the queue
- self.queue_lr[0:b, :, :, :] = self.lq.clone()
- self.queue_gt[0:b, :, :, :] = self.gt.clone()
- self.lq = lq_dequeue
- self.gt = gt_dequeue
- else:
- # only do enqueue
- self.queue_lr[self.queue_ptr:self.queue_ptr + b, :, :, :] = self.lq.clone()
- self.queue_gt[self.queue_ptr:self.queue_ptr + b, :, :, :] = self.gt.clone()
- self.queue_ptr = self.queue_ptr + b
- @torch.no_grad()
- def feed_data(self, data):
- """Accept data from dataloader, and then add two-order degradations to obtain LQ images.
- """
- if self.is_train and self.opt.get('high_order_degradation', True):
- # training data synthesis
- self.gt = data['gt'].to(self.device)
- # USM sharpen the GT images
- if self.opt['gt_usm'] is True:
- self.gt = self.usm_sharpener(self.gt)
- self.kernel1 = data['kernel1'].to(self.device)
- self.kernel2 = data['kernel2'].to(self.device)
- self.sinc_kernel = data['sinc_kernel'].to(self.device)
- ori_h, ori_w = self.gt.size()[2:4]
- # ----------------------- The first degradation process ----------------------- #
- # blur
- out = filter2D(self.gt, self.kernel1)
- # random resize
- updown_type = random.choices(['up', 'down', 'keep'], self.opt['resize_prob'])[0]
- if updown_type == 'up':
- scale = np.random.uniform(1, self.opt['resize_range'][1])
- elif updown_type == 'down':
- scale = np.random.uniform(self.opt['resize_range'][0], 1)
- else:
- scale = 1
- mode = random.choice(['area', 'bilinear', 'bicubic'])
- out = F.interpolate(out, scale_factor=scale, mode=mode)
- # add noise
- gray_noise_prob = self.opt['gray_noise_prob']
- if np.random.uniform() < self.opt['gaussian_noise_prob']:
- out = random_add_gaussian_noise_pt(
- out, sigma_range=self.opt['noise_range'], clip=True, rounds=False, gray_prob=gray_noise_prob)
- else:
- out = random_add_poisson_noise_pt(
- out,
- scale_range=self.opt['poisson_scale_range'],
- gray_prob=gray_noise_prob,
- clip=True,
- rounds=False)
- # JPEG compression
- jpeg_p = out.new_zeros(out.size(0)).uniform_(*self.opt['jpeg_range'])
- out = torch.clamp(out, 0, 1) # clamp to [0, 1], otherwise JPEGer will result in unpleasant artifacts
- out = self.jpeger(out, quality=jpeg_p)
- # ----------------------- The second degradation process ----------------------- #
- # blur
- if np.random.uniform() < self.opt['second_blur_prob']:
- out = filter2D(out, self.kernel2)
- # random resize
- updown_type = random.choices(['up', 'down', 'keep'], self.opt['resize_prob2'])[0]
- if updown_type == 'up':
- scale = np.random.uniform(1, self.opt['resize_range2'][1])
- elif updown_type == 'down':
- scale = np.random.uniform(self.opt['resize_range2'][0], 1)
- else:
- scale = 1
- mode = random.choice(['area', 'bilinear', 'bicubic'])
- out = F.interpolate(
- out, size=(int(ori_h / self.opt['scale'] * scale), int(ori_w / self.opt['scale'] * scale)), mode=mode)
- # add noise
- gray_noise_prob = self.opt['gray_noise_prob2']
- if np.random.uniform() < self.opt['gaussian_noise_prob2']:
- out = random_add_gaussian_noise_pt(
- out, sigma_range=self.opt['noise_range2'], clip=True, rounds=False, gray_prob=gray_noise_prob)
- else:
- out = random_add_poisson_noise_pt(
- out,
- scale_range=self.opt['poisson_scale_range2'],
- gray_prob=gray_noise_prob,
- clip=True,
- rounds=False)
- # JPEG compression + the final sinc filter
- # We also need to resize images to desired sizes. We group [resize back + sinc filter] together
- # as one operation.
- # We consider two orders:
- # 1. [resize back + sinc filter] + JPEG compression
- # 2. JPEG compression + [resize back + sinc filter]
- # Empirically, we find other combinations (sinc + JPEG + Resize) will introduce twisted lines.
- if np.random.uniform() < 0.5:
- # resize back + the final sinc filter
- mode = random.choice(['area', 'bilinear', 'bicubic'])
- out = F.interpolate(out, size=(ori_h // self.opt['scale'], ori_w // self.opt['scale']), mode=mode)
- out = filter2D(out, self.sinc_kernel)
- # JPEG compression
- jpeg_p = out.new_zeros(out.size(0)).uniform_(*self.opt['jpeg_range2'])
- out = torch.clamp(out, 0, 1)
- out = self.jpeger(out, quality=jpeg_p)
- else:
- # JPEG compression
- jpeg_p = out.new_zeros(out.size(0)).uniform_(*self.opt['jpeg_range2'])
- out = torch.clamp(out, 0, 1)
- out = self.jpeger(out, quality=jpeg_p)
- # resize back + the final sinc filter
- mode = random.choice(['area', 'bilinear', 'bicubic'])
- out = F.interpolate(out, size=(ori_h // self.opt['scale'], ori_w // self.opt['scale']), mode=mode)
- out = filter2D(out, self.sinc_kernel)
- # clamp and round
- self.lq = torch.clamp((out * 255.0).round(), 0, 255) / 255.
- # random crop
- gt_size = self.opt['gt_size']
- self.gt, self.lq = paired_random_crop(self.gt, self.lq, gt_size, self.opt['scale'])
- # training pair pool
- self._dequeue_and_enqueue()
- self.lq = self.lq.contiguous() # for the warning: grad and param do not obey the gradient layout contract
- else:
- # for paired training or validation
- self.lq = data['lq'].to(self.device)
- if 'gt' in data:
- self.gt = data['gt'].to(self.device)
- self.gt_usm = self.usm_sharpener(self.gt)
- def nondist_validation(self, dataloader, current_iter, tb_logger, save_img):
- # do not use the synthetic process during validation
- self.is_train = False
- super(RealESRNetModel, self).nondist_validation(dataloader, current_iter, tb_logger, save_img)
- self.is_train = True
|