Skip to content

Commit

Permalink
Merge pull request #138 from liip/feat/handle-error-deleting-old-reso…
Browse files Browse the repository at this point in the history
…urce

Feat/handle error deleting old resource
  • Loading branch information
bellisk authored Dec 4, 2024
2 parents 406c244 + 41f66a3 commit e3e2b0d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 49 deletions.
130 changes: 82 additions & 48 deletions ckanext/switzerland/harvester/base_sbb_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.harvest.model import HarvestObject
from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory
from ckanext.switzerland.helpers import resource_filename

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object):
)
return False

def _fetch_stage(self, harvest_object): # noqa
def _fetch_stage(self, harvest_object): # noqa: C901
"""
Fetching of resources. Runs once for each gathered resource.
Expand Down Expand Up @@ -603,7 +602,7 @@ def import_stage(self, harvest_object):
)
return False

def _import_stage(self, harvest_object): # noqa
def _import_stage(self, harvest_object): # noqa: C901
"""
Importing the fetched files into CKAN storage.
Runs once for each fetched resource.
Expand Down Expand Up @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa
if old_resource_id:
log.info("Deleting old resource: %s", old_resource_id)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": old_resource_id, "force": True}
)
self._fully_delete_resource(context, old_resource_meta)
except NotFound:
pass # Sometimes importing the data into the datastore fails

get_action("resource_delete")(context, {"id": old_resource_id})
self._save_object_error(
f"Error deleting old resource {old_resource_id} for "
f"filename {file_name}. This could be due to a failed "
f"connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

Session.commit()

Expand Down Expand Up @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package):

return ordered_resources, unmatched_resources

def finalize(self, harvest_object, harvest_object_data):
context = {"model": model, "session": Session, "user": self._get_user_name()}
def finalize(self, harvest_object, harvest_object_data): # noqa: C901
# TODO: Simplify this method.
user_name = self._get_user_name()
context = {"model": model, "session": Session, "user": user_name}
stage = "Import"

log.info("Running finalizing tasks:")
Expand Down Expand Up @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data):
log.exception(message)
self._save_object_error(message, harvest_object, "Import")

return True
return False

ordered_resources, unmatched_resources = self._get_ordered_resources(package)
try:
ordered_resources, unmatched_resources = self._get_ordered_resources(
package
)
except NotFound:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

# ----------------------------------------------------------------------------
# delete old resources
Expand All @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data):

for resource in ordered_resources[max_resources:]:
try:
self._delete_version(
context, package, resource_filename(resource["url"])
)
# We need a new context each time: otherwise, if there is an
# exception deleting the resource, there will be auth data left in
# the context that won't get deleted. Then all subsequent calls to
# resource_delete will seem unauthorized and fail.
delete_context = {
"model": model,
"session": Session,
"user": user_name,
}
self._fully_delete_resource(delete_context, resource)
except Exception as e:
self._save_object_error(
"Error deleting resource {} in finalizing tasks: {}".format(
Expand Down Expand Up @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data):
},
)

# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
try:
# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
except ValidationError:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

Session.query(HarvestObject).filter(
HarvestObject.package_id == package["id"]
Expand All @@ -1108,27 +1144,25 @@ def finalize(self, harvest_object, harvest_object_data):

search.rebuild(package["id"])

def _delete_version(self, context, package, filename):
"""Fully delete the resource with the given filename"""
for resource in package["resources"]:
if resource_filename(resource["url"]) == filename:
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], filename
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)
def _fully_delete_resource(self, context, resource):
"""Fully delete a resource and its file."""
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], resource["url"]
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails
# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails

# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
2 changes: 1 addition & 1 deletion ckanext/switzerland/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def index_language_specific_values(search_data, validated_dict):
def get_request_language():
try:
return tk.request.environ["CKAN_LANG"]
except (RuntimeError, TypeError):
except (KeyError, RuntimeError, TypeError):
return tk.config.get("ckan.locale_default", "en")


Expand Down

0 comments on commit e3e2b0d

Please sign in to comment.