diff --git a/art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py b/art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py index 01fca6bef3..27dfa12284 100644 --- a/art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py +++ b/art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py @@ -41,9 +41,7 @@ logger = logging.getLogger(__name__) # tiling -def _generate_tile_kernel(patch: list, - mask: list, - tile_size: int) -> Tuple["torch.Tensor", "torch.Tensor"]: +def _generate_tile_kernel(patch: list, mask: list, tile_size: int) -> Tuple["torch.Tensor", "torch.Tensor"]: """ Generate specific size of pertuerbed tiles from randomly selected patches. @@ -70,8 +68,8 @@ def _generate_tile_kernel(patch: list, FlipOp = torchvision.transforms.RandomVerticalFlip(0.2) max_len = h min_len = w - t_patch= torch.permute(t_patch, (0, 2, 1)) - t_mask= torch.permute(t_mask, (0, 2, 1)) + t_patch = torch.permute(t_patch, (0, 2, 1)) + t_mask = torch.permute(t_mask, (0, 2, 1)) else: flip = False FlipOp = torchvision.transforms.RandomHorizontalFlip(0.2) @@ -144,17 +142,15 @@ def _generate_tile_kernel(patch: list, mm = torchvision.transforms.CenterCrop((tile_size + 2 * boundary, tile_size + 2 * boundary))(mm) if flip: - pp= torch.permute(pp, (0, 2, 1)) - mask= torch.permute(mm, (0, 2, 1)) + pp = torch.permute(pp, (0, 2, 1)) + mask = torch.permute(mm, (0, 2, 1)) else: mask = mm.clone() return pp, mask -def generate_tile(patches: list, - masks: list, - tile_size: int, - scale: list) -> Tuple["torch.Tensor", "torch.Tensor"]: + +def generate_tile(patches: list, masks: list, tile_size: int, scale: list) -> Tuple["torch.Tensor", "torch.Tensor"]: """ Generate different size of pertuerbed tiles from randomly selected patches. @@ -196,24 +192,18 @@ def generate_tile(patches: list, return tile, mask + # internal used class TileObj: - def __init__(self, - tile_size: int, - device: "torch.cuda.device") -> None: + def __init__(self, tile_size: int, device: "torch.cuda.device") -> None: import torch - self.patch = torch.zeros((3, tile_size, tile_size), device = device) - self.diff = torch.ones([], device = device) * self.patch.shape.numel() + self.patch = torch.zeros((3, tile_size, tile_size), device=device) + self.diff = torch.ones([], device=device) * self.patch.shape.numel() self.bcount = 0 self.eligible = False - def update(self, - eligible = None, - bcount = None, - diff = None, - patch = None - ) -> None: + def update(self, eligible=None, bcount=None, diff=None, patch=None) -> None: if not (eligible is None): self.eligible = eligible @@ -245,12 +235,7 @@ def compare(self, target: "TileObj") -> bool: class TileArray: - def __init__(self, - xyxy: list, - threshold: int, - tile_size: int, - k: int, - device: "torch.cuda.device") -> None: + def __init__(self, xyxy: list, threshold: int, tile_size: int, k: int, device: "torch.cuda.device") -> None: import torch self.threshold = threshold @@ -275,7 +260,7 @@ def insert(self, target: TileObj) -> None: else: out.append(prev[k_it]) - self.patch_list = out[:self.k] + self.patch_list = out[: self.k] def pop(self) -> None: out = self.patch_list[1:] + [TileObj(tile_size=self.tile_size, device=self.device)] @@ -323,7 +308,7 @@ def __init__( self.num_grid = num_grid self.batch_size = 1 self.candidates = candidates - self.threshold_objs = 1 # the expect number of objects + self.threshold_objs = 1 # the expect number of objects self.collector = collector self._check_params() @@ -339,7 +324,7 @@ def generate(self, x: np.ndarray, y: Optional[np.ndarray] = None, **kwargs) -> n # Compute adversarial examples with implicit batching x_adv = x.copy() for batch_id in range(int(np.ceil(x_adv.shape[0] / float(self.batch_size)))): - batch_index_1 = batch_id * self.batch_size + batch_index_1 = batch_id * self.batch_size batch_index_2 = min((batch_id + 1) * self.batch_size, x_adv.shape[0]) x_batch = x_adv[batch_index_1:batch_index_2] x_adv[batch_index_1:batch_index_2] = self._generate_batch(x_batch) @@ -367,9 +352,7 @@ def _generate_batch(self, x_batch: np.ndarray, y_batch: Optional[np.ndarray] = N return x_adv.cpu().detach().numpy() - def _attack(self, - x_adv: "torch.Tensor", - x: "torch.Tensor") -> "torch.Tensor": + def _attack(self, x_adv: "torch.Tensor", x: "torch.Tensor") -> "torch.Tensor": """ Run attack. @@ -382,8 +365,7 @@ def _attack(self, if self.candidates is None: raise ValueError("A set of patches should be collected before executing the attack.") - if x.shape[-1] % self.num_grid != 0 or \ - x.shape[-2] % self.num_grid != 0: + if x.shape[-1] % self.num_grid != 0 or x.shape[-2] % self.num_grid != 0: raise ValueError("The size of the image must be divided by the number of grids") tile_size = x.shape[-1] // self.num_grid @@ -396,20 +378,13 @@ def _attack(self, y1 = jj * tile_size x2 = x1 + tile_size y2 = y1 + tile_size - tile_mat[(ii,jj)] = TileArray(list([x1, y1, x2, y2]), - self.threshold_objs, - tile_size, - buffer_depth, - self.estimator.device) + tile_mat[(ii, jj)] = TileArray( + list([x1, y1, x2, y2]), self.threshold_objs, tile_size, buffer_depth, self.estimator.device + ) # init guess n_samples = 10 - x_adv, tile_mat = self._init_guess(tile_mat, - x_adv, - x, - tile_size, - n_samples=n_samples) - + x_adv, tile_mat = self._init_guess(tile_mat, x_adv, x, tile_size, n_samples=n_samples) b = 0 candidates_patch = self.candidates @@ -418,14 +393,10 @@ def _attack(self, r_tile = torch.zeros((0, 3, tile_size, tile_size), device=self.estimator.device) r_mask = torch.zeros((0, 3, tile_size, tile_size), device=self.estimator.device) while r_tile.shape[0] < n_samples: - t_tile, t_mask = generate_tile(candidates_patch, - candidates_mask, - tile_size, - [1, 2]) + t_tile, t_mask = generate_tile(candidates_patch, candidates_mask, tile_size, [1, 2]) r_tile = torch.cat([r_tile, t_tile], dim=0) r_mask = torch.cat([r_mask, t_mask], dim=0) - for _ in range(self.max_iter): adv_patch, adv_position = self.collector(self.estimator, x_adv) adv_position = adv_position[0] @@ -451,7 +422,7 @@ def _attack(self, prev.insert(TPatch_cur) tile_mat[(ii, jj)] = prev - sorted_patch = tile_mat[(ii,jj)].patch_list + sorted_patch = tile_mat[(ii, jj)].patch_list bcount_list = [] for sp in sorted_patch: if sp.bcount >= obj_threshold: @@ -462,11 +433,8 @@ def _attack(self, if len(bcount_list) < buffer_depth: - while r_tile.shape[0] < int( 1.5 * n_samples): - t_tile, t_mask = generate_tile(candidates_patch, - candidates_mask, - tile_size, - [1, 2]) + while r_tile.shape[0] < int(1.5 * n_samples): + t_tile, t_mask = generate_tile(candidates_patch, candidates_mask, tile_size, [1, 2]) r_tile = torch.cat([r_tile, t_tile], dim=0) r_mask = torch.cat([r_mask, t_mask], dim=0) @@ -503,7 +471,7 @@ def _attack(self, mask = torch.zeros_like(x_out) _, adv_position = self.collector(self.estimator, x_out) for e in adv_position[0]: - mask[:, :, e[1]:e[3], e[0]:e[2]] = mask[:, :, e[1]:e[3], e[0]:e[2]] + 1 + mask[:, :, e[1] : e[3], e[0] : e[2]] = mask[:, :, e[1] : e[3], e[0] : e[2]] + 1 mask = torch.where(mask > 0, torch.ones_like(mask), torch.zeros_like(mask)) x_adv = mask * x_out + (1.0 - mask) * x x_adv = torch.clamp(x_adv, x - self.eps, x + self.eps) @@ -511,9 +479,7 @@ def _attack(self, return x_adv - def _get_loss(self, - pert: "torch.tensor", - epsilon: float) -> "torch.tensor": + def _get_loss(self, pert: "torch.tensor", epsilon: float) -> "torch.tensor": """ Calculate accumulated distance of the perturbations outside the epslion ball. @@ -530,10 +496,7 @@ def _get_loss(self, return loss - def _color_projection(self, - tile: "torch.tensor", - x_ref: "torch.tensor", - epsilon: float) -> "torch.tensor": + def _color_projection(self, tile: "torch.tensor", x_ref: "torch.tensor", epsilon: float) -> "torch.tensor": """ Convert statistics information from target to source. @@ -560,14 +523,14 @@ def _color_projection(self, set1 = set1 + x_ref set2 = tile - mean_s = torch.mean(x_ref, dim=(-2,-1), keepdim=True) - mean_t = torch.mean(x_ref, dim=(-2,-1), keepdim=True) - std_s = torch.std(set2 , dim=(-2,-1), keepdim=True) - std_t = torch.std(set2 , dim=(-2,-1), keepdim=True) + mean_s = torch.mean(x_ref, dim=(-2, -1), keepdim=True) + mean_t = torch.mean(x_ref, dim=(-2, -1), keepdim=True) + std_s = torch.std(set2, dim=(-2, -1), keepdim=True) + std_t = torch.std(set2, dim=(-2, -1), keepdim=True) scale = std_s / std_t set2 = (set2 - mean_t) * scale + mean_s set2 = torch.clamp(set2, 0.0, 1.0) - + set2 = set2 + sign * epsilon * scale set2 = torch.clamp(set2, 0, 1) @@ -575,9 +538,7 @@ def _color_projection(self, return updated - def _assemble(self, - tile_mat: dict, - x_org: "torch.tensor") -> "torch.tensor": + def _assemble(self, tile_mat: dict, x_org: "torch.tensor") -> "torch.tensor": """ Combine the best patches from each grid into a single image. @@ -595,12 +556,9 @@ def _assemble(self, ans[0, :, y1:y2, x1:x2] = mask * tile + (1.0 - mask) * ans[0, :, y1:y2, x1:x2] return ans - def _init_guess(self, - tile_mat: dict, - x_init: "torch.tensor", - x_org: "torch.tensor", - tile_size: int, - n_samples: int) -> Tuple["torch.tensor", dict]: + def _init_guess( + self, tile_mat: dict, x_init: "torch.tensor", x_org: "torch.tensor", tile_size: int, n_samples: int + ) -> Tuple["torch.tensor", dict]: """ Generate an initial perturbation for each grid. @@ -616,9 +574,9 @@ def _init_guess(self, patches = self.candidates masks = [None] * len(self.candidates) for _ in range(TRIAL): - x_cand = torch.zeros((n_samples, 3, x_init.shape[-2], x_init.shape[-1]), - dtype=x_init.dtype, - device=self.estimator.device) + x_cand = torch.zeros( + (n_samples, 3, x_init.shape[-2], x_init.shape[-1]), dtype=x_init.dtype, device=self.estimator.device + ) # generate tiles # To save the computing time, we generate some tiles in advance. @@ -646,7 +604,7 @@ def _init_guess(self, b1 = obj.xyxy [x1, y1, x2, y2] = b1.type(torch.IntTensor) x_ref = x_init[:, :, y1:y2, x1:x2] - x_new = ((1.0 - mask_perm) * x_ref) + mask_perm * ( 0.0 * x_ref + 1.0 * tile_perm) + x_new = ((1.0 - mask_perm) * x_ref) + mask_perm * (0.0 * x_ref + 1.0 * tile_perm) # randomly roll-back rand_rb = torch.rand([n_samples, 1, 1, 1], device=self.estimator.device) @@ -656,7 +614,7 @@ def _init_guess(self, # spatial drop n_mask = drop_block2d(x_cand, 0.05, 3) x_cand = (1.0 - n_mask) * x_org + n_mask * x_cand - #x_cand = smooth_image(x_cand, x_org, epsilon, 10) + # x_cand = smooth_image(x_cand, x_org, epsilon, 10) x_cand = torch.round(x_cand * 255.0) / 255.0 x_cand = torch.clamp(x_cand, x_org - 2.5 * self.eps, x_org + 2.5 * self.eps) x_cand = torch.clamp(x_cand, 0.0, 1.0) @@ -719,8 +677,8 @@ def _init_guess(self, a_mask = drop_block2d(x_ref, 0.05, 1) cur_mask = cur_mask * a_mask - updated = ((1.0 - cur_mask) * x_ref) + cur_mask * ( 0.0 * x_ref + 1.0 * x_tag) - updated = ((1.0 - cur_mask) * x_ref) + cur_mask * ( 0.0 * x_ref + 1.0 * updated) + updated = ((1.0 - cur_mask) * x_ref) + cur_mask * (0.0 * x_ref + 1.0 * x_tag) + updated = ((1.0 - cur_mask) * x_ref) + cur_mask * (0.0 * x_ref + 1.0 * updated) x_out[:, :, y1:y2, x1:x2] = updated @@ -750,4 +708,4 @@ def _check_params(self) -> None: raise TypeError("Candidates must be stored in list.") if len(self.candidates) < 1: - raise ValueError("The list of candidates is empty.") \ No newline at end of file + raise ValueError("The list of candidates is empty.")