Skip to content

Commit

Permalink
Clean up generate-attribution.py
Browse files Browse the repository at this point in the history
  • Loading branch information
jbruechert committed Jan 3, 2025
1 parent 8042933 commit 47dcb7a
Showing 1 changed file with 107 additions and 85 deletions.
192 changes: 107 additions & 85 deletions src/generate-attribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pycountry

from pathlib import Path
from metadata import Source, TransitlandSource, Region
from metadata import TransitlandSource, Region, UrlSource, HttpSource
from zipfile import ZipFile
from typing import Optional

Expand All @@ -30,12 +30,98 @@ def filter_duplicates(elems):
return out


def http_source_attribution(source: HttpSource) -> Optional[dict]:
attribution: dict = {}

if source.license:
if source.license.spdx_identifier:
attribution["spdx_license_identifier"] = \
source.license.spdx_identifier
if source.license.url:
attribution["license_url"] = source.license.url

attribution["operators"] = []
attribution["source"] = source.url

feed_path = Path(f"out/{source_id}.gtfs.zip")
attribution["filename"] = feed_path.name

human_name: str = (
feed_path.name.replace(".gtfs.zip", "").split("_")[1].replace("-", " ")
)
human_name = " ".join(
map(lambda w: w[0].upper() + w[1:] if len(w) > 0 else w, human_name.split(" "))
)
attribution["human_name"] = human_name

if not feed_path.exists():
print(f"Info: {feed_path} does not exist, skipping…")
return None

with ZipFile(feed_path) as z:
with z.open("agency.txt", "r") as a:
with io.TextIOWrapper(a) as at:
agencyreader = csv.DictReader(at, delimiter=",", quotechar='"')
for row in agencyreader:
attribution["operators"].append(row["agency_name"])
if "feed_info.txt" in z.namelist():
with z.open("feed_info.txt", "r") as i:
with io.TextIOWrapper(i) as it:
inforeader = csv.DictReader(it, delimiter=",", quotechar='"')
publisher = next(inforeader)
attribution["publisher"] = {}
attribution["publisher"]["name"] = publisher["feed_publisher_name"]
attribution["publisher"]["url"] = publisher["feed_publisher_url"]
if "attributions.txt" in z.namelist():
with z.open("attributions.txt", "r") as a:
with io.TextIOWrapper(a) as at:
attributionstxt = csv.DictReader(at, delimiter=",", quotechar='"')
attribution["attributions"] = filter_duplicates(
map(
lambda contrib: {
"name": contrib["organization_name"],
"url": contrib.get("attribution_url"),
},
attributionstxt,
)
)

if (
"operators" in attribution
and len(attribution["operators"]) == 1
and len(attribution["operators"][0]) > 1
):
attribution["human_name"] = attribution["operators"][0]

attribution["region_code"] = region_code
attribution["region_name"] = region_name

return attribution


def rt_attribution(source: UrlSource) -> dict:
attribution = {}
if source.license:
if source.license.spdx_identifier:
attribution[
"rt_spdx_license_identifier"
] = source.license.spdx_identifier
if source.license.url:
attribution["rt_license_url"] = \
source.license.url
attribution["rt_source"] = source.url

return attribution




if __name__ == "__main__":
feed_dir = Path("feeds/")

transitland_atlas = transitland.Atlas.load(Path("transitland-atlas/"))

attributions = {}
attributions: dict[str, dict] = {}

for feed in sorted(feed_dir.glob("*.json")):
parsed = {}
Expand All @@ -60,96 +146,32 @@ def filter_duplicates(elems):
for source in region.sources:
source_id = f"{region_code_lower}_{source.name}"

if type(source) == TransitlandSource:
if source is TransitlandSource:
source = transitland_atlas.source_by_id(source)
if not source:
continue

if source.skip:
continue

if source.spec == "gtfs-rt":
if not source_id in attributions:
attributions[source_id] = {}
if source.license:
if source.license.spdx_identifier:
attributions[source_id][
"rt_spdx_license_identifier"
] = source.license.spdx_identifier
if source.license.url:
attributions[source_id]["rt_license_url"] = source.license.url
attributions[source_id]["rt_source"] = source.url
continue

attribution: dict = {}

if source.license:
if source.license.spdx_identifier:
attribution["spdx_license_identifier"] = source.license.spdx_identifier
if source.license.url:
attribution["license_url"] = source.license.url

attribution["operators"] = []
attribution["source"] = source.url

feed_path = Path(f"out/{source_id}.gtfs.zip")
attribution["filename"] = feed_path.name

human_name: str = (
feed_path.name.replace(".gtfs.zip", "").split("_")[1].replace("-", " ")
)
human_name = " ".join(
map(lambda w: w[0].upper() + w[1:] if len(w) > 0 else w, human_name.split(" "))
)
attribution["human_name"] = human_name

if not feed_path.exists():
print(f"Info: {feed_path} does not exist, skipping…")
continue

with ZipFile(feed_path) as z:
with z.open("agency.txt", "r") as a:
with io.TextIOWrapper(a) as at:
agencyreader = csv.DictReader(at, delimiter=",", quotechar='"')
for row in agencyreader:
attribution["operators"].append(row["agency_name"])
if "feed_info.txt" in z.namelist():
with z.open("feed_info.txt", "r") as i:
with io.TextIOWrapper(i) as it:
inforeader = csv.DictReader(it, delimiter=",", quotechar='"')
publisher = next(inforeader)
attribution["publisher"] = {}
attribution["publisher"]["name"] = publisher["feed_publisher_name"]
attribution["publisher"]["url"] = publisher["feed_publisher_url"]
if "attributions.txt" in z.namelist():
with z.open("attributions.txt", "r") as a:
with io.TextIOWrapper(a) as at:
attributionstxt = csv.DictReader(at, delimiter=",", quotechar='"')
attribution["attributions"] = filter_duplicates(
map(
lambda contrib: {
"name": contrib["organization_name"],
"url": contrib.get("attribution_url"),
},
attributionstxt,
)
)

if (
"operators" in attribution
and len(attribution["operators"]) == 1
and len(attribution["operators"][0]) > 1
):
attribution["human_name"] = attribution["operators"][0]

attribution["region_code"] = region_code
attribution["region_name"] = region_name

if source_id not in attributions:
attributions[source_id] = attribution
else:
print("Warning: Found duplicate source name:", source_id)
attributions[source_id] |= attribution
match source:
case UrlSource() if source.spec == "gtfs-rt":
attribution = rt_attribution(source)

if source_id not in attributions:
attributions[source_id] = attribution
else:
attributions[source_id] |= attribution
case HttpSource():
http_attribution = http_source_attribution(source)
if not http_attribution:
continue

if source_id not in attributions:
attributions[source_id] = http_attribution
else:
print("Warning: Found duplicate source name:", source_id)
attributions[source_id] |= http_attribution

with open("out/license.json", "w") as outfile:
json.dump(
Expand Down

0 comments on commit 47dcb7a

Please sign in to comment.