-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathget_trust_anchor.py
executable file
·507 lines (452 loc) · 22.7 KB
/
get_trust_anchor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
#!/usr/bin/env python
#
# DNSSEC Trust Anchor Fetcher
# https://github.com/kirei/get_trust_anchor
#
# Copyright (c) 2016, Paul Hoffman. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
DNSSEC Trust Anchor Fetcher (get_trust_anchor.py)
This tool writes out a copy of the current DNSSEC trust anchor.
The primary design goal for this software is that it should be able to be run on any system
that has just Python (either 2.7 or 3.x) and the OpenSSL command line tool.
The steps it uses are:
Step 1. Fetch the trust anchor file from IANA using HTTPS
Step 2. Fetch the S/MIME signature for the trust anchor file from IANA using HTTPS
Step 3. Validate the signature on the trust anchor file using a built-in IANA CA key
Step 4. Extract the trust anchor key digests from the trust anchor file
Step 5. Check the validity period for each digest
Step 6. Verify that the trust anchors match the KSK in the root zone file
Step 7. Write out the trust anchors as a DNSKEY and DS records
Note that the validation is done against a built-in ICANN CA, not one retrieved through a
URL. This means that even if HTTPS authentication checking isn't done, the resulting
trust anchors are still cryptographically validated.
"""
# pylint: disable=wrong-import-order,wrong-import-position,import-error,no-name-in-module,broad-except,bare-except,too-many-locals
from __future__ import print_function
import argparse
import base64
import codecs
import datetime
import hashlib
import json
import os
import pprint
import re
import struct
import subprocess
import sys
import tempfile
import xml.etree.ElementTree
ICANN_ROOT_CA_CERT = '''
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIBATANBgkqhkiG9w0BAQsFADBdMQ4wDAYDVQQKEwVJQ0FO
TjEmMCQGA1UECxMdSUNBTk4gQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkxFjAUBgNV
BAMTDUlDQU5OIFJvb3QgQ0ExCzAJBgNVBAYTAlVTMB4XDTA5MTIyMzA0MTkxMloX
DTI5MTIxODA0MTkxMlowXTEOMAwGA1UEChMFSUNBTk4xJjAkBgNVBAsTHUlDQU5O
IENlcnRpZmljYXRpb24gQXV0aG9yaXR5MRYwFAYDVQQDEw1JQ0FOTiBSb290IENB
MQswCQYDVQQGEwJVUzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKDb
cLhPNNqc1NB+u+oVvOnJESofYS9qub0/PXagmgr37pNublVThIzyLPGCJ8gPms9S
G1TaKNIsMI7d+5IgMy3WyPEOECGIcfqEIktdR1YWfJufXcMReZwU4v/AdKzdOdfg
ONiwc6r70duEr1IiqPbVm5T05l1e6D+HkAvHGnf1LtOPGs4CHQdpIUcy2kauAEy2
paKcOcHASvbTHK7TbbvHGPB+7faAztABLoneErruEcumetcNfPMIjXKdv1V1E3C7
MSJKy+jAqqQJqjZoQGB0necZgUMiUv7JK1IPQRM2CXJllcyJrm9WFxY0c1KjBO29
iIKK69fcglKcBuFShUECAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8B
Af8EBAMCAf4wHQYDVR0OBBYEFLpS6UmDJIZSL8eZzfyNa2kITcBQMA0GCSqGSIb3
DQEBCwUAA4IBAQAP8emCogqHny2UYFqywEuhLys7R9UKmYY4suzGO4nkbgfPFMfH
6M+Zj6owwxlwueZt1j/IaCayoKU3QsrYYoDRolpILh+FPwx7wseUEV8ZKpWsoDoD
2JFbLg2cfB8u/OlE4RYmcxxFSmXBg0yQ8/IoQt/bxOcEEhhiQ168H2yE5rxJMt9h
15nu5JBSewrCkYqYYmaxyOC3WrVGfHZxVI7MpIFcGdvSb2a1uyuua8l0BKgk3ujF
0/wsHNeP22qNyVO+XVBzrM8fk8BSUFuiT/6tZTYXRtEt5aKQZgXbKU5dUF3jT9qg
j/Br5BZw3X/zd325TvnswzMC1+ljLzHnQGGk
-----END CERTIFICATE-----
'''
URL_ROOT_ANCHORS = "https://data.iana.org/root-anchors/root-anchors.xml"
URL_ROOT_ANCHORS_SIGNATURE = "https://data.iana.org/root-anchors/root-anchors.p7s"
URL_ROOT_ZONE = "https://www.internic.net/domain/root.zone"
URL_RESOLVER_API = "https://dns.google.com/resolve?name=.&type=dnskey"
def die(*Strings):
"""Generic way to leave the program early"""
sys.stderr.write("".join(Strings) + " Exiting.\n")
exit(1)
PYTHON_MAJOR = int(sys.version_info[0])
PYTHON_MINOR = int(sys.version_info[1])
if (PYTHON_MAJOR == 2) and (PYTHON_MINOR != 7):
die("If this program is running in Python 2, it must be Python 2.7.")
# Get the urlopen and StringIO functions
if PYTHON_MAJOR == 2:
from urllib2 import urlopen
from StringIO import StringIO
else:
from urllib.request import urlopen
from io import StringIO
def bytes_to_string(byte_array):
"""Convert bytes that are in ASCII into strings.
This is used for content received over URLs."""
if isinstance(byte_array, str):
return str(byte_array)
ascii_codec = codecs.lookup("ascii")
return ascii_codec.decode(byte_array)[0]
def write_out_file(file_name, file_contents):
"""Takes a name of a file and string or bytearray; returns nothing.
Writes out a file that we got from a URL or string; backs up the file if it exists."""
# Back up the current one if it is there
if os.path.exists(file_name):
now_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file_name = "{}.backup_{}".format(file_name, now_timestamp)
try:
os.rename(file_name, backup_file_name)
except:
die("Failed to rename {} to {}.".format(file_name, backup_file_name))
# Pick the mode string based on the type of contents
if isinstance(file_contents, str):
filemode = "wt"
else:
filemode = "wb"
try:
fobj = open(file_name, mode=filemode)
fobj.write(file_contents)
fobj.close()
except:
die("Could not write out the file {}.".format(file_name))
return
def dnskey_to_hex_of_hash(dnskey_dict, hash_type):
"""Takes a DNSKEY dict and hash type (string), and returns the hex of the hash as a string"""
if hash_type == "1":
this_hash = hashlib.sha1()
elif hash_type == "2":
this_hash = hashlib.sha256()
else:
die("A DNSKEY dict had a hash type of {}, which is unknown.".format(hash_type))
digest_content = bytearray()
digest_content.append(0) # Name of the zone, expressed in wire format
digest_content.extend(struct.pack("!HBB", int(dnskey_dict["f"]),\
int(dnskey_dict["p"]), int(dnskey_dict["a"])))
key_bytes = base64.b64decode(dnskey_dict["k"])
digest_content.extend(key_bytes)
this_hash.update(digest_content)
return (this_hash.hexdigest()).upper()
def extract_ksks_from_trust_anchors(valid_trust_anchors):
"""Extract and return the KSKs from the parsed trust anchors."""
print("Extracting KSKs from trust anchor...")
ksks = []
for (i, anchor) in enumerate(valid_trust_anchors):
if "PublicKey" not in anchor or "Flags" not in anchor:
print("Trust anchor {} does not include both PublicKey and Flags values.".format(i))
continue
ksks.append({'f': anchor["Flags"], 'p': 3, 'a': anchor["Algorithm"], 'k': anchor["PublicKey"]})
return ksks
def fetch_ksk():
"""Return the KSKs, or die if they can't be found in via Google nor the zone file"""
print("Fetching via Google Public DNS...")
ksks = fetch_ksk_from_google()
if ksks is None:
print("Fetching via Google Public DNS failed. Fetching via the root zone file...")
ksks = fetch_ksk_from_zonefile()
if ksks is None:
die("Could not fetch the KSKs from Google Public DNS nor get the root zone file.")
if len(ksks) == 0:
die("No KSKs were found.")
return ksks
def fetch_ksk_from_google():
"""Return the root KSK via Google DNS-over-HTTPS. Returns None if there are errors."""
ksks = []
try:
url = urlopen(URL_RESOLVER_API)
except Exception as this_exception:
print("Was not able to open URL {}. The returned text was '{}'.".format(\
URL_RESOLVER_API, this_exception))
return None
try:
data = json.loads(url.read().decode('utf-8'))
except Exception as this_exception:
print("The JSON returned from Google DNS-over-HTTPS was not readable: {}".format(\
this_exception))
return None
for answer in data['Answer']:
if answer['type'] == 48:
(flags, proto, alg, key_b64) = re.split(r"\s+", answer['data'])
if flags == '257':
ksks.append({'f': flags, 'p': proto, 'a': alg, 'k': key_b64})
return ksks
def fetch_ksk_from_zonefile():
"""Rethurn the root KSK from the root zone file. Returns None if there are errors."""
ksks = []
try:
url = urlopen(URL_ROOT_ZONE)
except Exception as this_exception:
print("Was not able to open URL {}. The returned text was '{}'.".format(\
URL_ROOT_ZONE, this_exception))
return None
for line in url.read().decode('utf-8').split('\n'):
if "DNSKEY\t" in line:
(_, _, _, _, flags, proto, alg, key_b64) = re.split(r"\s+", line)
if flags == '257':
ksks.append({'f': flags, 'p': proto, 'a': alg, 'k': key_b64})
return ksks
def validate_detached_signature(contents_filename, signature_filename, ca_filename):
"""Takes the name of the contents file, the signature file, and CA file;
returns nothing if sucessful or dies if openssl returns an error."""
# Run openssl to validate the signature
validate_command = "openssl smime -verify -CAfile {ca} -inform der -in {sig} -content {cont}"
validate_popen = subprocess.Popen(validate_command.format(\
ca=ca_filename, sig=signature_filename, cont=contents_filename),\
shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(validate_out, validate_err) = validate_popen.communicate()
if validate_popen.returncode != 0:
die("When running openssl, the return code was {} ".format(validate_popen.returncode),\
"and the output was the following.\n{} {}".format(validate_err, validate_out))
else:
print("Validation of the signature over the file succeeded.")
def extract_trust_anchors_from_xml(trust_anchor_xml):
"""Takes a bytestring with the XML from IANA; returns a list of trust anchors."""
# Turn the bytes from trust_anchor_xml into a string
trust_anchor_xml_string = bytes_to_string(trust_anchor_xml)
# Sanity check: make sure there is enough text in the returned stuff
if len(trust_anchor_xml_string) < 100:
die("The XML was too short: {} chars.".format(len(trust_anchor_xml_string)))
# ElementTree requries a file so use StringIO to turn the string into a file
try:
trust_anchor_as_file = StringIO(trust_anchor_xml_string) # This works for Python 3
except:
trust_anchor_as_file = StringIO(unicode(trust_anchor_xml_string)) # Needed for Python 2
# Get the tree
trust_anchor_tree = xml.etree.ElementTree.ElementTree(file=trust_anchor_as_file)
# Get all the KeyDigest elements
digest_elements = trust_anchor_tree.findall(".//KeyDigest")
print("There were {} KeyDigest elements in the trust anchor file.".format(\
len(digest_elements)))
trust_anchors = [] # Global list of dicts that is taken from the XML file
# Collect the values for the KeyDigest subelements and attributes
for (count, this_digest_element) in enumerate(digest_elements):
digest_value_dict = {}
for this_subelement in ["KeyTag", "Algorithm", "DigestType", "Digest"]:
try:
this_key_tag_text = (this_digest_element.find(this_subelement)).text
except:
die("Did not find {} element in a KeyDigest in a trust anchor.".format(\
this_subelement))
digest_value_dict[this_subelement] = this_key_tag_text
# Optional values
for this_subelement in ["PublicKey", "Flags"]:
value = this_digest_element.find(this_subelement)
if value is None:
continue
digest_value_dict[this_subelement] = value.text
for this_attribute in ["validFrom", "validUntil"]:
if this_attribute in this_digest_element.keys():
digest_value_dict[this_attribute] = this_digest_element.attrib[this_attribute]
else:
digest_value_dict[this_attribute] = "" # Missing attributes get empty values
# Save this to the global trust_anchors list
print("Added the trust anchor {} to the list:\n{}".format(count, pprint.pformat(\
digest_value_dict)))
trust_anchors.append(digest_value_dict)
if len(trust_anchors) == 0:
die("There were no trust anchors found in the XML file.")
return trust_anchors
def get_valid_trust_anchors(trust_anchors):
"""Takes a list of trust anchors; returns the list of trust anchors that are valid"""
# Keep a list of just the valid trust anchors because some things are not going to go into it.
valid_trust_anchors = []
now_datetime = datetime.datetime.now()
for (count, this_anchor) in enumerate(trust_anchors):
# Check the validity times; these only need to be accurate within a day or so
if this_anchor["validFrom"] == "":
print("Trust anchor {}: the validFrom attribute is empty,".format(count),\
"so not using this trust anchor.")
continue
digest_element_valid_from = this_anchor["validFrom"]
(from_left, _) = digest_element_valid_from.split("T", 2)
(from_year, from_month, from_day) = from_left.split("-")
from_date_time = datetime.datetime(int(from_year), int(from_month), int(from_day))
if now_datetime < from_date_time:
print("Trust anchor {}: the validFrom '{}' is later".format(count, from_date_time),\
"than today, so not using this trust anchor.")
continue
if this_anchor["validUntil"] == "":
print("Trust anchor {}: there was no validUntil attribute,".format(count),\
"so the validity is OK.")
valid_trust_anchors.append(this_anchor)
else:
digest_element_valid_until = this_anchor["validUntil"]
(until_left, _) = digest_element_valid_until.split("T", 2)
(until_year, until_month, until_day) = until_left.split("-")
until_date_time = datetime.datetime(int(until_year), int(until_month), int(until_day))
if now_datetime > until_date_time:
print("Trust anchor {}: the validUntil '{}' is before ".format(count,\
until_date_time), "today, so not using this trust anchor.")
continue
else:
print("Trust anchor {}: the validity period passes.".format(count))
valid_trust_anchors.append(this_anchor)
if len(valid_trust_anchors) == 0:
die("After checking validity dates, there were no trust anchors left.")
print("After the date validity checks, there are now {} records.".format(\
len(valid_trust_anchors)))
return valid_trust_anchors
def get_matching_ksk(ksk_records, valid_trust_anchors):
"""Takes in a list of KSKs and a list of trust anchors; returns a list of the KSKs"""
matched_ksks = []
for this_ksk_record in ksk_records:
try:
# check base64 syntax
base64.b64decode(this_ksk_record["k"])
except:
die("The KSK '{}...{}' had bad Base64.".format(\
this_ksk_record[0:15], this_ksk_record[-15:]))
for (count, this_trust_anchor) in enumerate(valid_trust_anchors):
hash_as_hex = dnskey_to_hex_of_hash(this_ksk_record, this_trust_anchor["DigestType"])
if hash_as_hex == this_trust_anchor["Digest"]:
print("Trust anchor {} matched KSK '{}...{}'".format(count,\
this_ksk_record["k"][0:15], this_ksk_record["k"][-15:]))
matched_ksks.append(this_ksk_record)
break # Don't check more trust anchors against this KSK
if len(matched_ksks) == 0:
die("After checking for trust anchor matches, there were no trusted KSKs.")
else:
print("There were {} matched KSKs.".format(len(matched_ksks)))
return matched_ksks
def export_ksk(valid_ksks, ds_record_filename, dnskey_record_filename):
"""Takes a list of KSKs; returns nothing but writes out files"""
##############################
# Still to do:
# BIND output formats
##############################
dnskey_record_contents = ""
ds_record_contents = ""
for this_matched_ksk in valid_ksks:
# Write out the DNSKEY
dnskey_record_contents += ". IN DNSKEY {flags} {proto} {alg} {keyas64}\n".format(\
flags=this_matched_ksk["f"], proto=this_matched_ksk["p"],\
alg=this_matched_ksk["a"], keyas64=this_matched_ksk["k"])
# Write out the DS
hash_as_hex = dnskey_to_hex_of_hash(this_matched_ksk, "2") # Always do SHA256
# Calculate the keytag
tag_base = bytearray()
tag_base.extend(struct.pack("!HBB", int(this_matched_ksk["f"]), int(this_matched_ksk["p"]),\
int(this_matched_ksk["a"])))
key_bytes = base64.b64decode(this_matched_ksk["k"])
tag_base.extend(key_bytes)
accumulator = 0
for (counter, this_byte) in enumerate(tag_base):
if (counter % 2) == 0:
accumulator += (this_byte << 8)
else:
accumulator += this_byte
this_key_tag = ((accumulator & 0xFFFF) + (accumulator>>16)) & 0xFFFF
print("The key tag for this KSK is {}".format(this_key_tag))
ds_record_contents += ". IN DS {keytag} {alg} 2 {sha256ofkey}\n".format(\
keytag=this_key_tag, alg=this_matched_ksk["a"],\
sha256ofkey=hash_as_hex)
print("Writing out {}.".format(dnskey_record_filename))
write_out_file(dnskey_record_filename, dnskey_record_contents)
print("Writing out {}.".format(ds_record_filename))
write_out_file(ds_record_filename, ds_record_contents)
def main():
"""Main function"""
# Where the files we create are kept
(_, trust_anchor_filename) = tempfile.mkstemp(prefix="trust_anchor_")
(_, signature_filename) = tempfile.mkstemp(prefix="signature_")
(_, icann_ca_filename) = tempfile.mkstemp(prefix="icann_ca_")
temp_files = [trust_anchor_filename, signature_filename, icann_ca_filename]
dnskey_record_filename = "ksk-as-dnskey.txt"
ds_record_filename = "ksk-as-ds.txt"
cmd_parse = argparse.ArgumentParser(description="DNSSEC Trust Anchor Tool")
cmd_parse.add_argument("--local", dest="local", type=str,\
help="Name of local file to use instead of getting the trust anchor from the URL")
cmd_parse.add_argument("--ksks-from-trust-anchor", dest="ksks_from_trust_anchor", action='store_true',\
help="Use the KSKs from the trust anchor instead of from DNS.")
cmd_parse.add_argument("--keep", dest="keep", action='store_true',\
help="Keep the temporary files (the XML and validating signature")
opts = cmd_parse.parse_args()
# Make sure there is an "openssl" command in their shell path
which_return = subprocess.call("which openssl", shell=True, stdout=subprocess.PIPE)
if which_return != 0:
die("Could not find the 'openssl' command on this system.")
### Step 1. Fetch the trust anchor file from IANA using HTTPS
if opts.local:
if not os.path.exists(opts.local):
die("Could not find file {}.".format(opts.local))
try:
trust_anchor_xml = open(opts.local, mode="rt").read()
except:
die("Could not read from file {}.".format(opts.local))
else:
# Get the trust anchor file from its URL, write it to disk
try:
trust_anchor_url = urlopen(URL_ROOT_ANCHORS)
except Exception as this_exception:
die("Was not able to open URL {}. The returned text was '{}'.".format(\
URL_ROOT_ANCHORS, this_exception))
trust_anchor_xml = trust_anchor_url.read()
trust_anchor_url.close()
write_out_file(trust_anchor_filename, trust_anchor_xml)
### Step 2. Fetch the S/MIME signature for the trust anchor file from
### IANA using HTTPS. Get the signature file from its URL, write it to disk.
try:
signature_url = urlopen(URL_ROOT_ANCHORS_SIGNATURE)
except Exception as this_exception:
die("Was not able to open URL {}. returned text was '{}'.".format(\
URL_ROOT_ANCHORS_SIGNATURE, this_exception))
signature_contents = signature_url.read()
signature_url.close()
write_out_file(signature_filename, signature_contents)
### Step 3. Validate the signature on the trust anchor file using a
### built-in IANA CA key. Skip this step if using a local file.
if opts.local:
print("Not validating the local trust anchor file.")
else:
write_out_file(icann_ca_filename, ICANN_ROOT_CA_CERT)
validate_detached_signature(trust_anchor_filename, signature_filename, icann_ca_filename)
### Step 4. Extract the trust anchor key digests from the trust anchor file
trust_anchors = extract_trust_anchors_from_xml(trust_anchor_xml)
### Step 5. Check the validity period for each digest
valid_trust_anchors = get_valid_trust_anchors(trust_anchors)
### Step 6. Verify that the trust anchors match the published KSKs
### file.
if opts.ksks_from_trust_anchor:
ksk_records = extract_ksks_from_trust_anchors(valid_trust_anchors)
else:
ksk_records = fetch_ksk()
for key in ksk_records:
print("Found KSK {flags} {proto} {alg} '{keystart}...{keyend}'.".format(\
flags=key['f'], proto=key['p'], alg=key['a'],
keystart=key['k'][0:15], keyend=key['k'][-15:]))
# Go trough all the KSKs, decoding them and comparing them to all the trust anchors
matched_ksks = get_matching_ksk(ksk_records, valid_trust_anchors)
### Step 7. Write out the trust anchors as a DNSKEY and DS records.
export_ksk(matched_ksks, ds_record_filename, dnskey_record_filename)
# Delete the temporary files unless requested not to
if opts.keep:
print("Kept the temporary files: {}".format(" ".join(temp_files)))
else:
print("Deleting the temporary files.")
for this_file in temp_files:
if os.path.exists(this_file):
try:
os.unlink(this_file)
except Exception as this_exception:
print("Could not delete {}: '{}'. Continuing".format(this_file, this_exception))
if __name__ == "__main__":
main()