Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/handle error deleting old resource #138

Merged
merged 10 commits into from
Dec 4, 2024
130 changes: 82 additions & 48 deletions ckanext/switzerland/harvester/base_sbb_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.harvest.model import HarvestObject
from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory
from ckanext.switzerland.helpers import resource_filename

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object):
)
return False

def _fetch_stage(self, harvest_object): # noqa
def _fetch_stage(self, harvest_object): # noqa: C901
stefina marked this conversation as resolved.
Show resolved Hide resolved
"""
Fetching of resources. Runs once for each gathered resource.

Expand Down Expand Up @@ -603,7 +602,7 @@ def import_stage(self, harvest_object):
)
return False

def _import_stage(self, harvest_object): # noqa
def _import_stage(self, harvest_object): # noqa: C901
"""
Importing the fetched files into CKAN storage.
Runs once for each fetched resource.
Expand Down Expand Up @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa
if old_resource_id:
log.info("Deleting old resource: %s", old_resource_id)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": old_resource_id, "force": True}
)
self._fully_delete_resource(context, old_resource_meta)
except NotFound:
pass # Sometimes importing the data into the datastore fails

get_action("resource_delete")(context, {"id": old_resource_id})
self._save_object_error(
f"Error deleting old resource {old_resource_id} for "
f"filename {file_name}. This could be due to a failed "
f"connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

Session.commit()

Expand Down Expand Up @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package):

return ordered_resources, unmatched_resources

def finalize(self, harvest_object, harvest_object_data):
context = {"model": model, "session": Session, "user": self._get_user_name()}
def finalize(self, harvest_object, harvest_object_data): # noqa: C901
# TODO: Simplify this method.
stefina marked this conversation as resolved.
Show resolved Hide resolved
user_name = self._get_user_name()
context = {"model": model, "session": Session, "user": user_name}
stage = "Import"

log.info("Running finalizing tasks:")
Expand Down Expand Up @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data):
log.exception(message)
self._save_object_error(message, harvest_object, "Import")

return True
return False

ordered_resources, unmatched_resources = self._get_ordered_resources(package)
try:
ordered_resources, unmatched_resources = self._get_ordered_resources(
package
)
except NotFound:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

# ----------------------------------------------------------------------------
# delete old resources
Expand All @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data):

for resource in ordered_resources[max_resources:]:
try:
self._delete_version(
context, package, resource_filename(resource["url"])
)
# We need a new context each time: otherwise, if there is an
# exception deleting the resource, there will be auth data left in
# the context that won't get deleted. Then all subsequent calls to
# resource_delete will seem unauthorized and fail.
delete_context = {
"model": model,
"session": Session,
"user": user_name,
}
self._fully_delete_resource(delete_context, resource)
except Exception as e:
self._save_object_error(
"Error deleting resource {} in finalizing tasks: {}".format(
Expand Down Expand Up @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data):
},
)

# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
try:
# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
except ValidationError:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

Session.query(HarvestObject).filter(
HarvestObject.package_id == package["id"]
Expand All @@ -1108,27 +1144,25 @@ def finalize(self, harvest_object, harvest_object_data):

search.rebuild(package["id"])

def _delete_version(self, context, package, filename):
"""Fully delete the resource with the given filename"""
for resource in package["resources"]:
if resource_filename(resource["url"]) == filename:
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], filename
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)
def _fully_delete_resource(self, context, resource):
"""Fully delete a resource and its file."""
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], resource["url"]
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails
# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails

# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
2 changes: 1 addition & 1 deletion ckanext/switzerland/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def index_language_specific_values(search_data, validated_dict):
def get_request_language():
try:
return tk.request.environ["CKAN_LANG"]
except (RuntimeError, TypeError):
except (KeyError, RuntimeError, TypeError):
return tk.config.get("ckan.locale_default", "en")


Expand Down
Loading