-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathosmxml.py
137 lines (131 loc) · 4.89 KB
/
osmxml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
"""OSM XML format serializer"""
import setup
import logging
import osm
from compat import etree
log = logging.getLogger(setup.app_name + "." + __name__)
def write_elem(outfile, e):
try:
outfile.write(etree.tostring(e, pretty_print=True))
except TypeError: # pragma: no cover
outfile.write(etree.tostring(e))
def serialize(outfile, data):
"""Output XML for an OSM data set"""
outfile.write("<?xml version='1.0' encoding='UTF-8'?>\n")
attrs = ''.join([" {}='{}'".format(k, v) for (k,v) in data.attrs.items()])
outfile.write("<osm{}>\n".format(attrs))
if data.note is not None:
e = etree.Element('note')
e.text = data.note
write_elem(outfile, e)
if data.meta is not None:
e = etree.Element('meta')
for (k, v) in data.meta.items():
e.set(k, v)
write_elem(outfile, e)
if data.tags:
e = etree.Element('changeset')
for (key, value) in data.tags.items():
e.append(etree.Element('tag', dict(k=key, v=value)))
write_elem(outfile, e)
for node in data.nodes:
e = etree.Element('node', node.attrs)
for key, value in node.tags.items():
e.append(etree.Element('tag', dict(k=key, v=value)))
write_elem(outfile, e)
for way in data.ways:
e = etree.Element('way', way.attrs)
for node in way.nodes:
e.append(etree.Element('nd', dict(ref=str(node.id))))
for key, value in way.tags.items():
e.append(etree.Element('tag', dict(k=key, v=value)))
write_elem(outfile, e)
for rel in data.relations:
e = etree.Element('relation', rel.attrs)
for m in rel.members:
e.append(etree.Element('member', m.attrs))
for key, value in rel.tags.items():
e.append(etree.Element('tag', dict(k=key, v=value)))
write_elem(outfile, e)
outfile.write("</osm>\n")
def deserialize(infile, data=None):
"""Generates or append to an OSM data set from OSM XML"""
if data is None:
data = osm.Osm()
context = etree.iterparse(infile, events=('end',))
childs = []
tags = {}
for event, elem in context:
if elem.tag == 'osm':
data.upload = elem.get('upload')
data.version = elem.get('version')
data.generator = elem.get('generator')
elif elem.tag == 'changeset':
data.tags = tags
tags = {}
elif elem.tag == 'note':
data.note = unicode(elem.text)
elif elem.tag == 'meta':
data.meta = dict(elem.attrib)
elif elem.tag == 'node':
n = data.Node(float(elem.get('lon')), float(elem.get('lat')),
attrs=dict(elem.attrib), tags=tags)
tags = {}
elif elem.tag == 'way':
w = data.Way(attrs=dict(elem.attrib), tags=tags)
w.nodes = childs
childs = []
tags = {}
elif elem.tag == 'nd':
childs.append(elem.get('ref'))
elif elem.tag == 'relation':
r = data.Relation(attrs=dict(elem.attrib), tags=tags)
r.members = childs
childs = []
tags = {}
elif elem.tag == 'member':
childs.append({
'ref': elem.get('ref'),
'type': elem.get('type'),
'role': elem.get('role')
})
elif elem.tag == 'tag':
tags[elem.get('k')] = elem.get('v')
elem.clear()
if hasattr(elem, 'xpath'):
for ancestor in elem.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
del context
for way in data.ways:
missing = []
for i, ref in enumerate(way.nodes):
if isinstance(ref, basestring):
if 'n{}'.format(ref) in data.index:
n = data.get(ref)
way.nodes[i] = n
data.parents[n].add(way)
else:
missing.append(i)
if len(missing) > 0:
for i in sorted(missing, reverse=True):
way.nodes.pop(i)
if way.version is not None:
way.version = str(int(way.version) + 1)
for rel in data.relations:
missing = []
for i, m in enumerate(rel.members):
if isinstance(m, dict):
if m['type'][0].lower() + str(m['ref']) in data.index:
el = data.get(m['ref'], m['type'])
rel.members[i] = osm.Relation.Member(el, m['role'])
data.parents[el].add(rel)
else:
missing.append(i)
if len(missing) > 0:
for i in sorted(missing, reverse=True):
rel.members.pop(i)
if rel.version is not None:
rel.version = str(int(rel.version) + 1)
return data