forked from nforrester/downlinx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdownlinx.py
executable file
·291 lines (240 loc) · 12.8 KB
/
downlinx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/env python3
import datetime
import importlib.util
import json
import os
import pathlib
import re
import shutil
import subprocess
import sys
import typing
def _has_whitespace(string: str):
"""Return True if a string has whitespace."""
return bool(re.search(r'\s', string))
def _assert_no_whitespace(string: str):
"""Assert that a string has no whitespace in it."""
assert not _has_whitespace(string)
def _quote_if_has_whitespace(string: str):
"""Single-quote a string if it has whitespace, escaping any single-quotes inside with a backslash."""
if not _has_whitespace(string):
return string
return "'{}'".format(string.replace("'", "\\'"))
def _check_call_with_echo(cmd: typing.List[str], *args, **kwargs):
"""Like subprocess.check_call(), but it echos the command."""
print(*map(_quote_if_has_whitespace, cmd))
subprocess.check_call(cmd, *args, **kwargs)
def _replace_spaces(source_name: str):
"""Replace the spaces in a source name with underscores to
produce something nice for filenames."""
replaced = source_name.replace(' ', '_')
_assert_no_whitespace(replaced)
return replaced
def _ensure_directory_exists(filepath: str):
"""Ensure that the directory required to house the file with the given path exists."""
dirpath = os.path.dirname(filepath)
os.makedirs(dirpath, exist_ok=True)
def magick(*args):
"""Invoke ImageMagick with the specified arguments."""
_check_call_with_echo(['magick'] + list(args))
MAGICK_PNG_COLOR = ['-define', 'png:color-type=6']
"""This needs to always come before the output image name in any ImageMagick command
producing a PNG in order to ensure that the output image has the correct colorspace."""
class Pos(typing.NamedTuple):
"""A position, in pixels, with (0, 0) at the top left, x increasing to the right, and y increasing down."""
x: int
y: int
class Size(typing.NamedTuple):
"""A size, in pixels."""
w: int
h: int
def scale_factor(orig: Size, factor: float):
"""Multiply a Size by a scale factor."""
return Size(int(orig.w * factor), int(orig.h * factor))
def scale_to_width(orig: Size, width: int):
"""Compute Size(width, height) such that it matches the aspect ratio of (orig.w, orig.h)."""
height = int(width / aspect_ratio(orig))
return Size(width, height)
def scale_to_height(orig: Size, height: int):
"""Compute Size(width, height) such that it matches the aspect ratio of (orig.w, orig.h)."""
width = int(height * aspect_ratio(orig))
return Size(width, height)
def scale_to_fit(orig: Size, bounding_box: Size):
"""Scale orig so that it fits in bounding_box."""
if aspect_ratio(orig) > aspect_ratio(bounding_box):
return scale_to_width(orig, bounding_box.w)
return scale_to_height(orig, bounding_box.h)
def add_pos(pos1: Pos, pos2: Pos):
"""Vector addition on Pos."""
return Pos(pos1.x+pos2.x, pos1.y+pos2.y)
def aspect_ratio(size: Size):
"""Return the aspect ratio of the given Size."""
return float(size.w) / size.h
def centering_offset(image_size: Size, frame_size: Size, frame_offset: Pos = Pos(0, 0)):
"""Return the offset that will place the image in the center of the frame."""
return add_pos(frame_offset, Pos(int(frame_size.w/2 - image_size.w/2), int(frame_size.h/2 - image_size.h/2)))
class Image(object):
"""Represents an image in the pipeline. Stores the filename and image size."""
def __init__(self, filepath: str):
self.filepath = filepath
output = subprocess.check_output(['magick', 'identify', '-ping', '-format', '%w %h', filepath])
self.size = Size(*map(int, output.split(b' ')))
class Pipeline(object):
"""Represents the state of a generic image processing pipeline,
and provides functions for processing images."""
def __init__(self, pipeline_dir: str):
self.images_dir = os.path.join(pipeline_dir, 'images')
self.generated_image_count = 0
def _image_path(self, filename: str):
"""Tack the filename for an image onto the images directory."""
return os.path.join(self.images_dir, filename)
def _new(self, image_type: str):
"""Generate a path for a new image of a given type that you can write to,
ensure the directory for it exists, and delete the image if it exists."""
filename = f'generated{self.generated_image_count}.{image_type}'
filepath = self._image_path(filename)
_assert_no_whitespace(filepath)
_ensure_directory_exists(filepath)
self.generated_image_count += 1
if os.path.isfile(filepath):
os.remove(filepath)
return filepath
def crop(self, image: Image, offset: Pos, size: Size):
"""Crop an Image. Return the new Image."""
cropped = self._new('png')
magick('convert', image.filepath, '-crop', f'{size.w}x{size.h}+{offset.x}+{offset.y}', *MAGICK_PNG_COLOR, cropped)
return Image(cropped)
def blank(self, color: str, size: Size):
"""Create a blank Image with the specified color and dimensions. Return the new Image."""
new = self._new('png')
magick('convert', '-size', f'{size.w}x{size.h}', f'canvas:{color}', *MAGICK_PNG_COLOR, new)
return Image(new)
def place(self, new: Image, offset: Pos, base: Image):
"""Place a new Image on top of a base Image. Return the new Image."""
combined = self._new('png')
magick('convert', base.filepath, new.filepath, '-geometry', f'+{offset.x}+{offset.y}', '-composite', *MAGICK_PNG_COLOR, combined)
return Image(combined)
def resize(self, image: Image, size: Size):
"""Resize an Image. Return the new Image."""
resized = self._new('png')
magick('convert', image.filepath, '-resize', f'{size.w}x{size.h}', *MAGICK_PNG_COLOR, resized)
return Image(resized)
def to_jpg(self, image: Image):
"""Convert an Image to jpg. Return the new Image."""
jpg = self._new('jpg')
magick('convert', image.filepath, jpg)
return Image(jpg)
class Downlinx(Pipeline):
"""Subclass of Pipeline with functions specifically useful for processing satellite images."""
def __init__(self, pipeline_dir: str):
super().__init__(pipeline_dir)
# Search for sources.json first in the config directory, or fall back to the version shipped with
# downlinx otherwise.
script_dir = os.path.dirname(os.path.realpath(__file__))
sources_path = os.path.join(pipeline_dir, 'sources.json')
if not os.path.isfile(sources_path):
sources_path = os.path.join(script_dir, 'sources.json')
with open(sources_path) as f:
self.sources = json.load(f)['sources']
self._assert_unique_names()
self._assert_all_formats_expected()
def _assert_unique_names(self):
"""Assert that all image source names are unique, and remain so under _replace_spaces()."""
unique_names = set()
unique_names_no_spaces = set()
for s in self.sources:
name = s['name']
name_no_spaces = _replace_spaces(name)
if name in unique_names:
raise Exception(f'Name "{name}" is not unique in the sources list')
if name_no_spaces in unique_names_no_spaces:
raise Exception(f'Spaceless name "{name_no_spaces}" is not unique in the sources list')
unique_names.add(name)
unique_names_no_spaces.add(name_no_spaces)
def _assert_all_formats_expected(self):
"""Assert that all image source URLs return the expected image format."""
for s in self.sources:
for sz, u in s['url'].items():
assert u.endswith('.jpg')
def _source(self, source_name: str):
"""Retrieve an image source by name."""
for s in self.sources:
if s['name'] == source_name:
return s
def get(self, source_name: str, size: str):
"""Get an Image from one of the sources, or if it's already present and recent enough, don't."""
filename = '{}_{}.jpg'.format(_replace_spaces(source_name), size)
filepath = self._image_path(filename)
_assert_no_whitespace(filepath)
_ensure_directory_exists(filepath)
source = self._source(source_name)
# Don't download again if we already have a sufficiently recent version.
if os.path.isfile(filepath):
modification_time = datetime.datetime.fromtimestamp(pathlib.Path(filepath).stat().st_mtime)
now = datetime.datetime.now()
age = (now - modification_time).seconds
if age < source['interval']:
print(f"Skipping download of {filename}, it's only {age} seconds old.")
return Image(filepath)
# Download the image.
url = source['url'][size]
_check_call_with_echo(['curl', '-o', filepath, url])
return Image(filepath)
def _clean_goes_large(self, source_name: str):
"""Return a cleaned up large full disk GOES image from the given source."""
full_disk = self.get(source_name, 'large')
full_disk_minus_info_bar = self.crop(full_disk, Pos(0, 0), Size(full_disk.size.w, full_disk.size.h - 47))
logo_hider = self.blank('black', Size(400, 400))
full_disk_clean = self.place(logo_hider, Pos(0, full_disk_minus_info_bar.size.h - logo_hider.size.h), full_disk_minus_info_bar)
return full_disk_clean
def clean_goes_east_large(self):
"""Return a cleaned up large full disk GOES-East image."""
return self._clean_goes_large('GOES-East Full Disk')
def clean_goes_west_large(self):
"""Return a cleaned up large full disk GOES-West image."""
return self._clean_goes_large('GOES-West Full Disk')
def clean_himawari8_large(self):
"""Return a cleaned up large full disk Himawari-8 image."""
full_disk = self.get('Himawari-8 Full Disk', 'large')
logo_hider = self.blank('black', Size(1000, 450))
full_disk_clean = self.place(logo_hider, Pos(0, full_disk.size.h - logo_hider.size.h), full_disk)
return full_disk_clean
def set_background_wm_only(image: Image):
"""Put an Image up on the desktop background. It should be a JPG because PNGs can have colors distorted.
This function is for people using a window manager (like XMonad or Openbox) but no desktop environment."""
_check_call_with_echo(['xloadimage', '-onroot', image.filepath])
# NOTE: set_background_gnome2() has not been tested. It might not work! Consider submitting a PR if you figure it out.
def set_background_gnome2(image: Image):
"""Put an Image up on the desktop background.
This function is for people using GNOME 2."""
_check_call_with_echo(['gconftool-2', '--type=string', '--set', '/desktop/gnome/background/picture_filename', os.path.abspath(image.filepath)])
# NOTE: set_background_gnome3() has not been tested. It might not work! Consider submitting a PR if you figure it out.
def set_background_gnome3(image: Image):
"""Put an Image up on the desktop background.
This function is for people using GNOME 3 or Unity."""
_check_call_with_echo(['gsettings', 'set', 'org.gnome.desktop.background', 'picture-uri', 'file://' + os.path.abspath(image.filepath)])
_check_call_with_echo(['gsettings', 'set', 'org.gnome.desktop.background', 'picture-options', 'spanned'])
# NOTE: set_background_xfce() has not been tested. It might not work! Consider submitting a PR if you figure it out.
def set_background_xfce(monitor: str, image: Image):
"""Put an Image up on the desktop background on the specified monitor (for example, 'screen0/monitor0').
Supposedly, valid monitor identifiers can be listed with 'xfconf-query --channel xfce4-desktop --list'.
Unlike the other background setting functions, this one work on only one monitor at a time,
so you will need to compose multiple images (or produce multiple crops).
This function is for people using Xfce."""
_check_call_with_echo(['xfconf-query', '--channel', 'xfce4-desktop', '--property', '/backdrop/' + monitor + '/image-path', '--set', os.path.abspath(image.filepath)])
def eog(image: Image):
"""Open an Image in the EOG image viewer. Useful for debugging. This call will block until EOG exits."""
_check_call_with_echo(['eog', image.filepath])
def main():
"""Run run.py in the directory specified on the command line."""
if len(sys.argv) != 2 or not os.path.isfile(os.path.join(sys.argv[1], 'run.py')):
print('downlinx.py takes one command line argument: the pipeline directory (containing run.py).')
print('For example:')
print(' ./downlinx.py pipelines/simple')
sys.exit(1)
run_path = os.path.join(sys.argv[1], 'run.py')
spec = importlib.util.spec_from_file_location('run', run_path)
run = importlib.util.module_from_spec(spec)
spec.loader.exec_module(run)
if __name__ == '__main__':
main()