diff --git a/src/generate-attribution.py b/src/generate-attribution.py index a2de52eb..2e2fb1f8 100755 --- a/src/generate-attribution.py +++ b/src/generate-attribution.py @@ -10,7 +10,7 @@ import pycountry from pathlib import Path -from metadata import Source, TransitlandSource, Region +from metadata import TransitlandSource, Region, UrlSource, HttpSource from zipfile import ZipFile from typing import Optional @@ -30,12 +30,98 @@ def filter_duplicates(elems): return out +def http_source_attribution(source: HttpSource) -> Optional[dict]: + attribution: dict = {} + + if source.license: + if source.license.spdx_identifier: + attribution["spdx_license_identifier"] = \ + source.license.spdx_identifier + if source.license.url: + attribution["license_url"] = source.license.url + + attribution["operators"] = [] + attribution["source"] = source.url + + feed_path = Path(f"out/{source_id}.gtfs.zip") + attribution["filename"] = feed_path.name + + human_name: str = ( + feed_path.name.replace(".gtfs.zip", "").split("_")[1].replace("-", " ") + ) + human_name = " ".join( + map(lambda w: w[0].upper() + w[1:] if len(w) > 0 else w, human_name.split(" ")) + ) + attribution["human_name"] = human_name + + if not feed_path.exists(): + print(f"Info: {feed_path} does not exist, skipping…") + return None + + with ZipFile(feed_path) as z: + with z.open("agency.txt", "r") as a: + with io.TextIOWrapper(a) as at: + agencyreader = csv.DictReader(at, delimiter=",", quotechar='"') + for row in agencyreader: + attribution["operators"].append(row["agency_name"]) + if "feed_info.txt" in z.namelist(): + with z.open("feed_info.txt", "r") as i: + with io.TextIOWrapper(i) as it: + inforeader = csv.DictReader(it, delimiter=",", quotechar='"') + publisher = next(inforeader) + attribution["publisher"] = {} + attribution["publisher"]["name"] = publisher["feed_publisher_name"] + attribution["publisher"]["url"] = publisher["feed_publisher_url"] + if "attributions.txt" in z.namelist(): + with z.open("attributions.txt", "r") as a: + with io.TextIOWrapper(a) as at: + attributionstxt = csv.DictReader(at, delimiter=",", quotechar='"') + attribution["attributions"] = filter_duplicates( + map( + lambda contrib: { + "name": contrib["organization_name"], + "url": contrib.get("attribution_url"), + }, + attributionstxt, + ) + ) + + if ( + "operators" in attribution + and len(attribution["operators"]) == 1 + and len(attribution["operators"][0]) > 1 + ): + attribution["human_name"] = attribution["operators"][0] + + attribution["region_code"] = region_code + attribution["region_name"] = region_name + + return attribution + + +def rt_attribution(source: UrlSource) -> dict: + attribution = {} + if source.license: + if source.license.spdx_identifier: + attribution[ + "rt_spdx_license_identifier" + ] = source.license.spdx_identifier + if source.license.url: + attribution["rt_license_url"] = \ + source.license.url + attribution["rt_source"] = source.url + + return attribution + + + + if __name__ == "__main__": feed_dir = Path("feeds/") transitland_atlas = transitland.Atlas.load(Path("transitland-atlas/")) - attributions = {} + attributions: dict[str, dict] = {} for feed in sorted(feed_dir.glob("*.json")): parsed = {} @@ -60,7 +146,7 @@ def filter_duplicates(elems): for source in region.sources: source_id = f"{region_code_lower}_{source.name}" - if type(source) == TransitlandSource: + if source is TransitlandSource: source = transitland_atlas.source_by_id(source) if not source: continue @@ -68,88 +154,24 @@ def filter_duplicates(elems): if source.skip: continue - if source.spec == "gtfs-rt": - if not source_id in attributions: - attributions[source_id] = {} - if source.license: - if source.license.spdx_identifier: - attributions[source_id][ - "rt_spdx_license_identifier" - ] = source.license.spdx_identifier - if source.license.url: - attributions[source_id]["rt_license_url"] = source.license.url - attributions[source_id]["rt_source"] = source.url - continue - - attribution: dict = {} - - if source.license: - if source.license.spdx_identifier: - attribution["spdx_license_identifier"] = source.license.spdx_identifier - if source.license.url: - attribution["license_url"] = source.license.url - - attribution["operators"] = [] - attribution["source"] = source.url - - feed_path = Path(f"out/{source_id}.gtfs.zip") - attribution["filename"] = feed_path.name - - human_name: str = ( - feed_path.name.replace(".gtfs.zip", "").split("_")[1].replace("-", " ") - ) - human_name = " ".join( - map(lambda w: w[0].upper() + w[1:] if len(w) > 0 else w, human_name.split(" ")) - ) - attribution["human_name"] = human_name - - if not feed_path.exists(): - print(f"Info: {feed_path} does not exist, skipping…") - continue - - with ZipFile(feed_path) as z: - with z.open("agency.txt", "r") as a: - with io.TextIOWrapper(a) as at: - agencyreader = csv.DictReader(at, delimiter=",", quotechar='"') - for row in agencyreader: - attribution["operators"].append(row["agency_name"]) - if "feed_info.txt" in z.namelist(): - with z.open("feed_info.txt", "r") as i: - with io.TextIOWrapper(i) as it: - inforeader = csv.DictReader(it, delimiter=",", quotechar='"') - publisher = next(inforeader) - attribution["publisher"] = {} - attribution["publisher"]["name"] = publisher["feed_publisher_name"] - attribution["publisher"]["url"] = publisher["feed_publisher_url"] - if "attributions.txt" in z.namelist(): - with z.open("attributions.txt", "r") as a: - with io.TextIOWrapper(a) as at: - attributionstxt = csv.DictReader(at, delimiter=",", quotechar='"') - attribution["attributions"] = filter_duplicates( - map( - lambda contrib: { - "name": contrib["organization_name"], - "url": contrib.get("attribution_url"), - }, - attributionstxt, - ) - ) - - if ( - "operators" in attribution - and len(attribution["operators"]) == 1 - and len(attribution["operators"][0]) > 1 - ): - attribution["human_name"] = attribution["operators"][0] - - attribution["region_code"] = region_code - attribution["region_name"] = region_name - - if source_id not in attributions: - attributions[source_id] = attribution - else: - print("Warning: Found duplicate source name:", source_id) - attributions[source_id] |= attribution + match source: + case UrlSource() if source.spec == "gtfs-rt": + attribution = rt_attribution(source) + + if source_id not in attributions: + attributions[source_id] = attribution + else: + attributions[source_id] |= attribution + case HttpSource(): + http_attribution = http_source_attribution(source) + if not http_attribution: + continue + + if source_id not in attributions: + attributions[source_id] = http_attribution + else: + print("Warning: Found duplicate source name:", source_id) + attributions[source_id] |= http_attribution with open("out/license.json", "w") as outfile: json.dump(