-
Notifications
You must be signed in to change notification settings - Fork 197
/
Copy pathvideo_face_blur_mapper.py
135 lines (110 loc) · 4.79 KB
/
video_face_blur_mapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import av
from PIL import ImageFilter
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import (close_video, detect_faces,
load_data_with_context, load_video,
process_each_frame)
from data_juicer.utils.model_utils import get_model, prepare_model
from ..base_op import OPERATORS, UNFORKABLE, Mapper
from ..op_fusion import LOADED_VIDEOS
cv2 = LazyLoader('cv2', 'cv2')
OP_NAME = 'video_face_blur_mapper'
@UNFORKABLE.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
@LOADED_VIDEOS.register_module(OP_NAME)
class VideoFaceBlurMapper(Mapper):
"""Mapper to blur faces detected in videos.
"""
_default_kwargs = {
'scaleFactor': 1.1,
'minNeighbors': 3,
'minSize': None,
'maxSize': None,
}
def __init__(self,
cv_classifier: str = '',
blur_type: str = 'gaussian',
radius: float = 2,
*args,
**kwargs):
"""
Initialization method.
:param cv_classifier: OpenCV classifier path for face detection.
By default, we will use 'haarcascade_frontalface_alt.xml'.
:param blur_type: Type of blur kernel, including
['mean', 'box', 'gaussian'].
:param radius: Radius of blur kernel.
:param args: extra args
:param kwargs: extra args
"""
super().__init__(*args, **kwargs)
self._init_parameters = self.remove_extra_parameters(locals())
if cv_classifier == '':
cv_classifier = os.path.join(cv2.data.haarcascades,
'haarcascade_frontalface_alt.xml')
if blur_type not in ['mean', 'box', 'gaussian']:
raise ValueError(
f'Blur_type [{blur_type}] is not supported. '
f'Can only be one of ["mean", "box", "gaussian"]. ')
if radius < 0:
raise ValueError('Radius must be >= 0. ')
if blur_type == 'mean':
self.blur = ImageFilter.BLUR
elif blur_type == 'box':
self.blur = ImageFilter.BoxBlur(radius)
else:
self.blur = ImageFilter.GaussianBlur(radius)
self.blur_type = blur_type
self.radius = radius
self.extra_kwargs = self._default_kwargs
for key in kwargs:
if key in self.extra_kwargs:
self.extra_kwargs[key] = kwargs[key]
self.model_key = prepare_model(model_type='opencv_classifier',
model_path=cv_classifier)
def process_single(self, sample, context=False):
# there is no video in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.source_file] = []
return sample
if Fields.source_file not in sample or not sample[Fields.source_file]:
sample[Fields.source_file] = sample[self.video_key]
loaded_video_keys = sample[self.video_key]
sample, videos = load_data_with_context(sample, context,
loaded_video_keys, load_video)
model = get_model(self.model_key)
def _blur_func(frame):
image = frame.to_image()
dets = detect_faces(image, model, **self.extra_kwargs)
if len(dets) > 0:
for (x, y, w, h) in dets:
box = (x, y, x + w, y + h)
blured_roi = image.crop(box).filter(self.blur)
image.paste(blured_roi, box)
frame = av.VideoFrame.from_image(image)
return frame
processed_video_keys = {}
for video_key in loaded_video_keys:
# skip duplicate
if video_key in processed_video_keys:
continue
video = videos[video_key]
blured_video_key = transfer_filename(video_key, OP_NAME,
**self._init_parameters)
output_video_key = process_each_frame(video, blured_video_key,
_blur_func)
processed_video_keys[video_key] = output_video_key
if not context:
close_video(video)
# when the file is modified, its source file needs to be updated.
for i, value in enumerate(loaded_video_keys):
if sample[Fields.source_file][i] != value:
if processed_video_keys[value] != value:
sample[Fields.source_file][i] = value
sample[self.video_key] = [
processed_video_keys[key] for key in loaded_video_keys
]
return sample