diff --git a/ckanext/switzerland/harvester/base_sbb_harvester.py b/ckanext/switzerland/harvester/base_sbb_harvester.py index 4d377a69..4ad226e0 100644 --- a/ckanext/switzerland/harvester/base_sbb_harvester.py +++ b/ckanext/switzerland/harvester/base_sbb_harvester.py @@ -38,7 +38,6 @@ from ckanext.harvest.harvesters.base import HarvesterBase from ckanext.harvest.model import HarvestObject from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory -from ckanext.switzerland.helpers import resource_filename log = logging.getLogger(__name__) @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object): ) return False - def _fetch_stage(self, harvest_object): # noqa + def _fetch_stage(self, harvest_object): # noqa: C901 """ Fetching of resources. Runs once for each gathered resource. @@ -603,7 +602,7 @@ def import_stage(self, harvest_object): ) return False - def _import_stage(self, harvest_object): # noqa + def _import_stage(self, harvest_object): # noqa: C901 """ Importing the fetched files into CKAN storage. Runs once for each fetched resource. @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa if old_resource_id: log.info("Deleting old resource: %s", old_resource_id) - # delete the datastore table try: - get_action("datastore_delete")( - context, {"resource_id": old_resource_id, "force": True} - ) + self._fully_delete_resource(context, old_resource_meta) except NotFound: - pass # Sometimes importing the data into the datastore fails - - get_action("resource_delete")(context, {"id": old_resource_id}) + self._save_object_error( + f"Error deleting old resource {old_resource_id} for " + f"filename {file_name}. This could be due to a failed " + f"connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) Session.commit() @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package): return ordered_resources, unmatched_resources - def finalize(self, harvest_object, harvest_object_data): - context = {"model": model, "session": Session, "user": self._get_user_name()} + def finalize(self, harvest_object, harvest_object_data): # noqa: C901 + # TODO: Simplify this method. + user_name = self._get_user_name() + context = {"model": model, "session": Session, "user": user_name} stage = "Import" log.info("Running finalizing tasks:") @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data): log.exception(message) self._save_object_error(message, harvest_object, "Import") - return True + return False - ordered_resources, unmatched_resources = self._get_ordered_resources(package) + try: + ordered_resources, unmatched_resources = self._get_ordered_resources( + package + ) + except NotFound: + self._save_object_error( + f"Error reordering resources for dataset " + f"{harvest_object_data['dataset']}. " + f"This could be due to a failed connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) + + return False # ---------------------------------------------------------------------------- # delete old resources @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data): for resource in ordered_resources[max_resources:]: try: - self._delete_version( - context, package, resource_filename(resource["url"]) - ) + # We need a new context each time: otherwise, if there is an + # exception deleting the resource, there will be auth data left in + # the context that won't get deleted. Then all subsequent calls to + # resource_delete will seem unauthorized and fail. + delete_context = { + "model": model, + "session": Session, + "user": user_name, + } + self._fully_delete_resource(delete_context, resource) except Exception as e: self._save_object_error( "Error deleting resource {} in finalizing tasks: {}".format( @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data): }, ) - # reorder resources - # not matched resources come first in the list, then the ordered - get_action("package_resource_reorder")( - context, - { - "id": package["id"], - "order": [r["id"] for r in unmatched_resources + ordered_resources], - }, - ) + try: + # reorder resources + # not matched resources come first in the list, then the ordered + get_action("package_resource_reorder")( + context, + { + "id": package["id"], + "order": [r["id"] for r in unmatched_resources + ordered_resources], + }, + ) + except ValidationError: + self._save_object_error( + f"Error reordering resources for dataset " + f"{harvest_object_data['dataset']}. " + f"This could be due to a failed connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) + + return False Session.query(HarvestObject).filter( HarvestObject.package_id == package["id"] @@ -1108,27 +1144,25 @@ def finalize(self, harvest_object, harvest_object_data): search.rebuild(package["id"]) - def _delete_version(self, context, package, filename): - """Fully delete the resource with the given filename""" - for resource in package["resources"]: - if resource_filename(resource["url"]) == filename: - log.debug( - "Deleting resource {} with filename {}".format( - resource["id"], filename - ) - ) - # delete the file from the filestore - path = uploader.ResourceUpload(resource).get_path(resource["id"]) - if os.path.exists(path): - os.remove(path) + def _fully_delete_resource(self, context, resource): + """Fully delete a resource and its file.""" + log.debug( + "Deleting resource {} with filename {}".format( + resource["id"], resource["url"] + ) + ) + # delete the file from the filestore + path = uploader.ResourceUpload(resource).get_path(resource["id"]) + if os.path.exists(path): + os.remove(path) - # delete the datastore table - try: - get_action("datastore_delete")( - context, {"resource_id": resource["id"], "force": True} - ) - except NotFound: - pass # Sometimes importing the data into the datastore fails + # delete the datastore table + try: + get_action("datastore_delete")( + context, {"resource_id": resource["id"], "force": True} + ) + except NotFound: + pass # Sometimes importing the data into the datastore fails - # delete the resource itself - get_action("resource_delete")(context, {"id": resource["id"]}) + # delete the resource itself + get_action("resource_delete")(context, {"id": resource["id"]}) diff --git a/ckanext/switzerland/helpers.py b/ckanext/switzerland/helpers.py index c4de8578..42b4563d 100644 --- a/ckanext/switzerland/helpers.py +++ b/ckanext/switzerland/helpers.py @@ -471,7 +471,7 @@ def index_language_specific_values(search_data, validated_dict): def get_request_language(): try: return tk.request.environ["CKAN_LANG"] - except (RuntimeError, TypeError): + except (KeyError, RuntimeError, TypeError): return tk.config.get("ckan.locale_default", "en")