Skip to content

Commit

Permalink
Update AudioDynamicTrigger
Browse files Browse the repository at this point in the history
  • Loading branch information
OrsonTyphanel93 authored Dec 18, 2023
1 parent c0f09a5 commit 8b362a0
Showing 1 changed file with 117 additions and 69 deletions.
186 changes: 117 additions & 69 deletions art/attacks/poisoning/AudioDynamicTrigger
Original file line number Diff line number Diff line change
Expand Up @@ -22,77 +22,125 @@ Uses classes, rather than pure functions as in image_perturbations.py,
because loading the audio trigger from disk (librosa.load()) is very slow
and should be done only once.
"""
import librosa

```python
import os
import logging
import numpy as np
import tensorflow as tf
from scipy.io import wavfile
from scipy import signal
import logging # Import logging module for error handling
from sklearn.preprocessing import QuantileTransformer
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

class DynamicTrigger:
def __init__(self, sampling_rate=16000, backdoor_path='/content/triggers_clapping.wav', scale=0.2):
"""
Initialize DynamicTrigger object.

Parameters:
- sampling_rate: Sampling rate of the audio data.
- backdoor_path: Path to the trigger audio file.
- scale: Scaling factor for trigger audio.
"""
self.sampling_rate = sampling_rate
self.backdoor_path = backdoor_path
self.scale = scale
self.trigger = self.load_trigger() # Load and preprocess the trigger audio

def load_trigger(self):
"""
Load and preprocess the trigger audio.

Returns:
- trigger: Preprocessed trigger audio as a flattened array.
"""
try:
sampling_rate, trigger = wavfile.read(self.backdoor_path)
if self.sampling_rate != sampling_rate:
# Resample trigger if sampling rates are different
trigger = signal.resample(trigger, int(len(trigger) * self.sampling_rate / sampling_rate))
trigger = trigger.flatten() * self.scale # Flatten and scale trigger
return trigger
except Exception as e:
logging.error(f"Error loading trigger: {str(e)}")
raise

def anonymize_speaker(self, spectrogram, noise_std=0.1):
"""
Add random noise to the spectrogram for speaker anonymization.

Parameters:
- spectrogram: Input spectrogram.
- noise_std: Standard deviation of the added random noise.

Returns:
- noisy_spectrogram: Anonymized spectrogram with added noise.
"""
noisy_spectrogram = spectrogram + np.random.normal(0, noise_std, spectrogram.shape)
return noisy_spectrogram

def insert(self, x_audio, beta1=10, beta2=20, noise_std=0.05):
"""
Insert trigger into the audio signal.

Parameters:
- x_audio: Input audio signal.
- beta1: Start index for trigger insertion.
- beta2: End index for trigger insertion.
- noise_std: Standard deviation of the added noise during insertion.

Returns:
- poisoned_x: Audio signal with inserted trigger and anonymized speaker.
- self.sampling_rate: Sampling rate of the output audio.
"""
f, t, xi = signal.stft(x_audio, fs=self.sampling_rate)
sigma = self.trigger[:xi.shape[0]] # Corrected index from 'i' to '0'
xi[beta1:beta2, :] = sigma
xi = self.anonymize_speaker(xi, noise_std=noise_std)
poisoned_x = signal.istft(xi, fs=self.sampling_rate)[1] # Corrected index from 'i' to '1'
return poisoned_x, self.sampling_rate
def __init__(self, sampling_rate=16000, backdoor_path='/content/triggers_clapping.wav', scale=0.2):
"""
Initialize the DynamicTrigger object.

Parameters:
- sampling_rate (int): Sampling rate of the audio.
- backdoor_path (str): Path to the trigger audio file.
- scale (float): Scaling factor for the trigger.
"""
self.sampling_rate = sampling_rate
self.backdoor_path = backdoor_path
self.scale = scale
self.trigger = self.load_trigger()

def load_trigger(self):
"""
Load the trigger audio file and handle resampling if needed.

Returns:
- np.ndarray: Trigger audio data.
"""
if not os.path.isfile(self.backdoor_path):
raise FileNotFoundError(f"Trigger file not found: {self.backdoor_path}")
try:
_, trigger = wavfile.read(self.backdoor_path)
if self.sampling_rate != _:
trigger = signal.resample(trigger, int(len(trigger) * self.sampling_rate / _))
trigger = trigger.flatten() * self.scale
return trigger
except Exception as e:
logging.error(f"Error loading trigger: {e}")
raise

def anonymize_speaker(self, spectrogram, noise_std=0.1):
"""
Anonymize the speaker in the spectrogram using an autoencoder.

Parameters:
- spectrogram (np.ndarray): Input spectrogram.
- noise_std (float): Standard deviation of noise to add during anonymization.

Returns:
- np.ndarray: Anonymized spectrogram.
"""
try:
# Create a model for differentially private feature extraction
input_layer = Input(shape=(spectrogram.shape[1],))
hidden_layer = Dense(128, activation='relu')(input_layer)
output_layer = Dense(spectrogram.shape[1])(hidden_layer)
autoencoder = Model(input_layer, output_layer)
autoencoder.compile(optimizer='adam', loss='mean_squared_error')

# Train the autoencoder with noise layers
noisy_spectrogram = spectrogram + np.random.normal(0, noise_std, spectrogram.shape)
autoencoder.fit(noisy_spectrogram, spectrogram, epochs=10, batch_size=32, verbose=1)

# Use the autoencoder to extract features from the spectrogram
features = autoencoder.predict(spectrogram)

# Apply quantization-based transformation
transformer = QuantileTransformer(n_quantiles=100, random_state=0)
quantized_features = transformer.fit_transform(features)

# Reconstruct the spectrogram from the quantized features
reconstructed_spectrogram = autoencoder.predict(quantized_features)

return reconstructed_spectrogram
except Exception as e:
logging.error(f"Error during anonymization: {e}")
raise

def insert(self, x_audio, trigger_start_index=10, trigger_end_index=20, noise_std=0.05):
"""
Insert the trigger into the audio signal and apply anonymization.

Parameters:
- x_audio (np.ndarray): Input audio signal.
- trigger_start_index (int): Start index for trigger insertion.
- trigger_end_index (int): End index for trigger insertion.
- noise_std (float): Standard deviation of noise to add during anonymization.

Returns:
- np.ndarray: Anonymized audio signal.
- int: Sampling rate of the audio signal.
"""
try:
_, _, xi = signal.stft(x_audio, fs=self.sampling_rate)

# Ensure trigger indices are within bounds
trigger_len = len(self.trigger)
if trigger_start_index < 0 or trigger_end_index > xi.shape[0] or trigger_start_index >= trigger_end_index:
raise ValueError("Invalid trigger indices provided.")

# Insert trigger into the audio signal
sigma = self.trigger[:trigger_len]
xi[trigger_start_index:trigger_end_index, :] = sigma

# Anonymize the speaker in the spectrogram
xi = self.anonymize_speaker(xi, noise_std=noise_std)

# Reconstruct the audio signal from the spectrogram
_, poisoned_x = signal.istft(xi, fs=self.sampling_rate)

return poisoned_x, self.sampling_rate
except Exception as e:
logging.error(f"Error during trigger insertion: {e}")
raise


```

0 comments on commit 8b362a0

Please sign in to comment.