Skip to content

Commit

Permalink
Format coding style
Browse files Browse the repository at this point in the history
  • Loading branch information
CNOCycle committed Jul 2, 2024
1 parent 7dd40b8 commit 8130f36
Showing 1 changed file with 46 additions and 88 deletions.
134 changes: 46 additions & 88 deletions art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
logger = logging.getLogger(__name__)

# tiling
def _generate_tile_kernel(patch: list,
mask: list,
tile_size: int) -> Tuple["torch.Tensor", "torch.Tensor"]:
def _generate_tile_kernel(patch: list, mask: list, tile_size: int) -> Tuple["torch.Tensor", "torch.Tensor"]:
"""
Generate specific size of pertuerbed tiles from randomly selected patches.
Expand All @@ -70,8 +68,8 @@ def _generate_tile_kernel(patch: list,
FlipOp = torchvision.transforms.RandomVerticalFlip(0.2)
max_len = h
min_len = w
t_patch= torch.permute(t_patch, (0, 2, 1))
t_mask= torch.permute(t_mask, (0, 2, 1))
t_patch = torch.permute(t_patch, (0, 2, 1))
t_mask = torch.permute(t_mask, (0, 2, 1))
else:
flip = False
FlipOp = torchvision.transforms.RandomHorizontalFlip(0.2)
Expand Down Expand Up @@ -144,17 +142,15 @@ def _generate_tile_kernel(patch: list,
mm = torchvision.transforms.CenterCrop((tile_size + 2 * boundary, tile_size + 2 * boundary))(mm)

Check warning on line 142 in art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py

View check run for this annotation

Codecov / codecov/patch

art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py#L133-L142

Added lines #L133 - L142 were not covered by tests

if flip:
pp= torch.permute(pp, (0, 2, 1))
mask= torch.permute(mm, (0, 2, 1))
pp = torch.permute(pp, (0, 2, 1))
mask = torch.permute(mm, (0, 2, 1))
else:
mask = mm.clone()

return pp, mask

def generate_tile(patches: list,
masks: list,
tile_size: int,
scale: list) -> Tuple["torch.Tensor", "torch.Tensor"]:

def generate_tile(patches: list, masks: list, tile_size: int, scale: list) -> Tuple["torch.Tensor", "torch.Tensor"]:
"""
Generate different size of pertuerbed tiles from randomly selected patches.
Expand Down Expand Up @@ -196,24 +192,18 @@ def generate_tile(patches: list,

return tile, mask


# internal used
class TileObj:
def __init__(self,
tile_size: int,
device: "torch.cuda.device") -> None:
def __init__(self, tile_size: int, device: "torch.cuda.device") -> None:
import torch

self.patch = torch.zeros((3, tile_size, tile_size), device = device)
self.diff = torch.ones([], device = device) * self.patch.shape.numel()
self.patch = torch.zeros((3, tile_size, tile_size), device=device)
self.diff = torch.ones([], device=device) * self.patch.shape.numel()
self.bcount = 0
self.eligible = False

def update(self,
eligible = None,
bcount = None,
diff = None,
patch = None
) -> None:
def update(self, eligible=None, bcount=None, diff=None, patch=None) -> None:

if not (eligible is None):
self.eligible = eligible
Expand Down Expand Up @@ -245,12 +235,7 @@ def compare(self, target: "TileObj") -> bool:


class TileArray:
def __init__(self,
xyxy: list,
threshold: int,
tile_size: int,
k: int,
device: "torch.cuda.device") -> None:
def __init__(self, xyxy: list, threshold: int, tile_size: int, k: int, device: "torch.cuda.device") -> None:
import torch

self.threshold = threshold
Expand All @@ -275,7 +260,7 @@ def insert(self, target: TileObj) -> None:
else:
out.append(prev[k_it])

self.patch_list = out[:self.k]
self.patch_list = out[: self.k]

def pop(self) -> None:
out = self.patch_list[1:] + [TileObj(tile_size=self.tile_size, device=self.device)]
Expand Down Expand Up @@ -323,7 +308,7 @@ def __init__(
self.num_grid = num_grid
self.batch_size = 1
self.candidates = candidates
self.threshold_objs = 1 # the expect number of objects
self.threshold_objs = 1 # the expect number of objects
self.collector = collector
self._check_params()

Expand All @@ -339,7 +324,7 @@ def generate(self, x: np.ndarray, y: Optional[np.ndarray] = None, **kwargs) -> n
# Compute adversarial examples with implicit batching
x_adv = x.copy()
for batch_id in range(int(np.ceil(x_adv.shape[0] / float(self.batch_size)))):
batch_index_1 = batch_id * self.batch_size
batch_index_1 = batch_id * self.batch_size
batch_index_2 = min((batch_id + 1) * self.batch_size, x_adv.shape[0])
x_batch = x_adv[batch_index_1:batch_index_2]
x_adv[batch_index_1:batch_index_2] = self._generate_batch(x_batch)
Expand Down Expand Up @@ -367,9 +352,7 @@ def _generate_batch(self, x_batch: np.ndarray, y_batch: Optional[np.ndarray] = N

return x_adv.cpu().detach().numpy()

def _attack(self,
x_adv: "torch.Tensor",
x: "torch.Tensor") -> "torch.Tensor":
def _attack(self, x_adv: "torch.Tensor", x: "torch.Tensor") -> "torch.Tensor":
"""
Run attack.
Expand All @@ -382,8 +365,7 @@ def _attack(self,
if self.candidates is None:
raise ValueError("A set of patches should be collected before executing the attack.")

Check warning on line 366 in art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py

View check run for this annotation

Codecov / codecov/patch

art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py#L366

Added line #L366 was not covered by tests

if x.shape[-1] % self.num_grid != 0 or \
x.shape[-2] % self.num_grid != 0:
if x.shape[-1] % self.num_grid != 0 or x.shape[-2] % self.num_grid != 0:
raise ValueError("The size of the image must be divided by the number of grids")

Check warning on line 369 in art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py

View check run for this annotation

Codecov / codecov/patch

art/attacks/evasion/steal_now_attack_later/steal_now_attack_later.py#L369

Added line #L369 was not covered by tests
tile_size = x.shape[-1] // self.num_grid

Expand All @@ -396,20 +378,13 @@ def _attack(self,
y1 = jj * tile_size
x2 = x1 + tile_size
y2 = y1 + tile_size
tile_mat[(ii,jj)] = TileArray(list([x1, y1, x2, y2]),
self.threshold_objs,
tile_size,
buffer_depth,
self.estimator.device)
tile_mat[(ii, jj)] = TileArray(
list([x1, y1, x2, y2]), self.threshold_objs, tile_size, buffer_depth, self.estimator.device
)

# init guess
n_samples = 10
x_adv, tile_mat = self._init_guess(tile_mat,
x_adv,
x,
tile_size,
n_samples=n_samples)

x_adv, tile_mat = self._init_guess(tile_mat, x_adv, x, tile_size, n_samples=n_samples)

b = 0
candidates_patch = self.candidates
Expand All @@ -418,14 +393,10 @@ def _attack(self,
r_tile = torch.zeros((0, 3, tile_size, tile_size), device=self.estimator.device)
r_mask = torch.zeros((0, 3, tile_size, tile_size), device=self.estimator.device)
while r_tile.shape[0] < n_samples:
t_tile, t_mask = generate_tile(candidates_patch,
candidates_mask,
tile_size,
[1, 2])
t_tile, t_mask = generate_tile(candidates_patch, candidates_mask, tile_size, [1, 2])
r_tile = torch.cat([r_tile, t_tile], dim=0)
r_mask = torch.cat([r_mask, t_mask], dim=0)


for _ in range(self.max_iter):
adv_patch, adv_position = self.collector(self.estimator, x_adv)
adv_position = adv_position[0]
Expand All @@ -451,7 +422,7 @@ def _attack(self,
prev.insert(TPatch_cur)
tile_mat[(ii, jj)] = prev

sorted_patch = tile_mat[(ii,jj)].patch_list
sorted_patch = tile_mat[(ii, jj)].patch_list
bcount_list = []
for sp in sorted_patch:
if sp.bcount >= obj_threshold:
Expand All @@ -462,11 +433,8 @@ def _attack(self,

if len(bcount_list) < buffer_depth:

while r_tile.shape[0] < int( 1.5 * n_samples):
t_tile, t_mask = generate_tile(candidates_patch,
candidates_mask,
tile_size,
[1, 2])
while r_tile.shape[0] < int(1.5 * n_samples):
t_tile, t_mask = generate_tile(candidates_patch, candidates_mask, tile_size, [1, 2])
r_tile = torch.cat([r_tile, t_tile], dim=0)
r_mask = torch.cat([r_mask, t_mask], dim=0)

Expand Down Expand Up @@ -503,17 +471,15 @@ def _attack(self,
mask = torch.zeros_like(x_out)
_, adv_position = self.collector(self.estimator, x_out)
for e in adv_position[0]:
mask[:, :, e[1]:e[3], e[0]:e[2]] = mask[:, :, e[1]:e[3], e[0]:e[2]] + 1
mask[:, :, e[1] : e[3], e[0] : e[2]] = mask[:, :, e[1] : e[3], e[0] : e[2]] + 1
mask = torch.where(mask > 0, torch.ones_like(mask), torch.zeros_like(mask))
x_adv = mask * x_out + (1.0 - mask) * x
x_adv = torch.clamp(x_adv, x - self.eps, x + self.eps)
x_adv = torch.clamp(x_adv, 0.0, 1.0)

return x_adv

def _get_loss(self,
pert: "torch.tensor",
epsilon: float) -> "torch.tensor":
def _get_loss(self, pert: "torch.tensor", epsilon: float) -> "torch.tensor":
"""
Calculate accumulated distance of the perturbations outside the epslion ball.
Expand All @@ -530,10 +496,7 @@ def _get_loss(self,

return loss

def _color_projection(self,
tile: "torch.tensor",
x_ref: "torch.tensor",
epsilon: float) -> "torch.tensor":
def _color_projection(self, tile: "torch.tensor", x_ref: "torch.tensor", epsilon: float) -> "torch.tensor":
"""
Convert statistics information from target to source.
Expand All @@ -560,24 +523,22 @@ def _color_projection(self,
set1 = set1 + x_ref

set2 = tile
mean_s = torch.mean(x_ref, dim=(-2,-1), keepdim=True)
mean_t = torch.mean(x_ref, dim=(-2,-1), keepdim=True)
std_s = torch.std(set2 , dim=(-2,-1), keepdim=True)
std_t = torch.std(set2 , dim=(-2,-1), keepdim=True)
mean_s = torch.mean(x_ref, dim=(-2, -1), keepdim=True)
mean_t = torch.mean(x_ref, dim=(-2, -1), keepdim=True)
std_s = torch.std(set2, dim=(-2, -1), keepdim=True)
std_t = torch.std(set2, dim=(-2, -1), keepdim=True)
scale = std_s / std_t
set2 = (set2 - mean_t) * scale + mean_s
set2 = torch.clamp(set2, 0.0, 1.0)

set2 = set2 + sign * epsilon * scale
set2 = torch.clamp(set2, 0, 1)

updated = torch.where(cond, set1, set2)

return updated

def _assemble(self,
tile_mat: dict,
x_org: "torch.tensor") -> "torch.tensor":
def _assemble(self, tile_mat: dict, x_org: "torch.tensor") -> "torch.tensor":
"""
Combine the best patches from each grid into a single image.
Expand All @@ -595,12 +556,9 @@ def _assemble(self,
ans[0, :, y1:y2, x1:x2] = mask * tile + (1.0 - mask) * ans[0, :, y1:y2, x1:x2]
return ans

def _init_guess(self,
tile_mat: dict,
x_init: "torch.tensor",
x_org: "torch.tensor",
tile_size: int,
n_samples: int) -> Tuple["torch.tensor", dict]:
def _init_guess(
self, tile_mat: dict, x_init: "torch.tensor", x_org: "torch.tensor", tile_size: int, n_samples: int
) -> Tuple["torch.tensor", dict]:
"""
Generate an initial perturbation for each grid.
Expand All @@ -616,9 +574,9 @@ def _init_guess(self,
patches = self.candidates
masks = [None] * len(self.candidates)
for _ in range(TRIAL):
x_cand = torch.zeros((n_samples, 3, x_init.shape[-2], x_init.shape[-1]),
dtype=x_init.dtype,
device=self.estimator.device)
x_cand = torch.zeros(
(n_samples, 3, x_init.shape[-2], x_init.shape[-1]), dtype=x_init.dtype, device=self.estimator.device
)

# generate tiles
# To save the computing time, we generate some tiles in advance.
Expand Down Expand Up @@ -646,7 +604,7 @@ def _init_guess(self,
b1 = obj.xyxy
[x1, y1, x2, y2] = b1.type(torch.IntTensor)
x_ref = x_init[:, :, y1:y2, x1:x2]
x_new = ((1.0 - mask_perm) * x_ref) + mask_perm * ( 0.0 * x_ref + 1.0 * tile_perm)
x_new = ((1.0 - mask_perm) * x_ref) + mask_perm * (0.0 * x_ref + 1.0 * tile_perm)

# randomly roll-back
rand_rb = torch.rand([n_samples, 1, 1, 1], device=self.estimator.device)
Expand All @@ -656,7 +614,7 @@ def _init_guess(self,
# spatial drop
n_mask = drop_block2d(x_cand, 0.05, 3)
x_cand = (1.0 - n_mask) * x_org + n_mask * x_cand
#x_cand = smooth_image(x_cand, x_org, epsilon, 10)
# x_cand = smooth_image(x_cand, x_org, epsilon, 10)
x_cand = torch.round(x_cand * 255.0) / 255.0
x_cand = torch.clamp(x_cand, x_org - 2.5 * self.eps, x_org + 2.5 * self.eps)
x_cand = torch.clamp(x_cand, 0.0, 1.0)
Expand Down Expand Up @@ -719,8 +677,8 @@ def _init_guess(self,

a_mask = drop_block2d(x_ref, 0.05, 1)
cur_mask = cur_mask * a_mask
updated = ((1.0 - cur_mask) * x_ref) + cur_mask * ( 0.0 * x_ref + 1.0 * x_tag)
updated = ((1.0 - cur_mask) * x_ref) + cur_mask * ( 0.0 * x_ref + 1.0 * updated)
updated = ((1.0 - cur_mask) * x_ref) + cur_mask * (0.0 * x_ref + 1.0 * x_tag)
updated = ((1.0 - cur_mask) * x_ref) + cur_mask * (0.0 * x_ref + 1.0 * updated)

x_out[:, :, y1:y2, x1:x2] = updated

Expand Down Expand Up @@ -750,4 +708,4 @@ def _check_params(self) -> None:
raise TypeError("Candidates must be stored in list.")

if len(self.candidates) < 1:
raise ValueError("The list of candidates is empty.")
raise ValueError("The list of candidates is empty.")

0 comments on commit 8130f36

Please sign in to comment.