Skip to content

Commit

Permalink
Provide full resources in archived items finished webhooks (#1308)
Browse files Browse the repository at this point in the history
Fixes #1306 

- Include full `resources` with expireAt (as string) in crawlFinished
and uploadFinished webhook notifications rather than using the
`downloadUrls` field (this is retained for collections).
- Set default presigned duration to one minute short of 1 week and enforce
maximum supported by S3
- Add 'storage_presign_duration_minutes' commented out to helm values.yaml
- Update tests

---------
Co-authored-by: Ilya Kreymer <[email protected]>
  • Loading branch information
tw4l authored Oct 24, 2023
1 parent 2e5952a commit d58747d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 33 deletions.
47 changes: 25 additions & 22 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
""" base crawl type """

import asyncio
import uuid
import os
from datetime import timedelta
Expand Down Expand Up @@ -44,6 +43,12 @@
ALL_CRAWL_STATES = (*RUNNING_AND_STARTING_STATES, *NON_RUNNING_STATES)


# Presign duration must be less than 604800 seconds (one week),
# so set this one minute short of a week.
PRESIGN_MINUTES_MAX = 10079
PRESIGN_MINUTES_DEFAULT = PRESIGN_MINUTES_MAX


# ============================================================================
# pylint: disable=too-many-instance-attributes
class BaseCrawlOps:
Expand All @@ -62,8 +67,12 @@ def __init__(
self.colls = colls
self.storage_ops = storage_ops

presign_duration_minutes = int(
os.environ.get("PRESIGN_DURATION_MINUTES") or PRESIGN_MINUTES_DEFAULT
)

self.presign_duration_seconds = (
int(os.environ.get("PRESIGN_DURATION_MINUTES", 60)) * 60
min(presign_duration_minutes, PRESIGN_MINUTES_MAX) * 60
)

async def get_crawl_raw(
Expand Down Expand Up @@ -362,7 +371,6 @@ async def _resolve_signed_urls(

delta = timedelta(seconds=self.presign_duration_seconds)

updates = []
out_files = []

for file_ in files:
Expand All @@ -374,17 +382,20 @@ async def _resolve_signed_urls(
presigned_url = await self.storage_ops.get_presigned_url(
org, file_, self.presign_duration_seconds
)
updates.append(
(
{"files.filename": file_.filename},
{
"$set": {
"files.$.presignedUrl": presigned_url,
"files.$.expireAt": exp,
}
},
)
await self.crawls.find_one_and_update(
{"files.filename": file_.filename},
{
"$set": {
"files.$.presignedUrl": presigned_url,
"files.$.expireAt": exp,
}
},
)
file_.expireAt = exp

expire_at_str = ""
if file_.expireAt:
expire_at_str = file_.expireAt.isoformat()

out_files.append(
CrawlFileOut(
Expand All @@ -393,20 +404,12 @@ async def _resolve_signed_urls(
hash=file_.hash,
size=file_.size,
crawlId=crawl_id,
expireAt=expire_at_str,
)
)

if updates:
asyncio.create_task(self._update_presigned(updates))

# print("presigned", out_files)

return out_files

async def _update_presigned(self, updates):
for update in updates:
await self.crawls.find_one_and_update(*update)

@contextlib.asynccontextmanager
async def get_redis(self, crawl_id):
"""get redis url for crawl id"""
Expand Down
3 changes: 3 additions & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ class CrawlFileOut(BaseModel):
path: str
hash: str
size: int

crawlId: Optional[str]
expireAt: Optional[str]


# ============================================================================
Expand Down Expand Up @@ -1053,6 +1055,7 @@ class BaseArchivedItemBody(WebhookNotificationBody):
"""Webhook notification POST body for when archived item is started or finished"""

itemId: str
resources: Optional[List[CrawlFileOut]] = None


# ============================================================================
Expand Down
7 changes: 1 addition & 6 deletions backend/btrixcloud/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ async def _create_item_finished_notification(
print(f"Crawl {crawl_id} not found, skipping event webhook", flush=True)
return

download_urls = []
for resource in crawl.resources:
download_url = f"{org.origin}{resource.path}"
download_urls.append(download_url)

body.downloadUrls = download_urls
body.resources = crawl.resources

notification = WebhookNotification(
id=uuid.uuid4(),
Expand Down
16 changes: 12 additions & 4 deletions backend/test/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ def test_get_webhook_event(admin_auth_headers, default_org_id):
assert event

if event in ("crawlFinished", "uploadFinished"):
assert len(body["downloadUrls"]) >= 1
assert len(body["resources"]) >= 1
assert len(body.get("downloadUrls", [])) == 0
assert body["itemId"]

elif event in ("crawlStarted"):
assert len(body["downloadUrls"]) == 0
assert len(body.get("resources", [])) == 0
assert len(body.get("downloadUrls", [])) == 0
assert body["itemId"]

elif event in ("addedToCollection", "removedFromCollection"):
assert len(body.get("resources", [])) == 0
assert len(body["downloadUrls"]) == 1
assert body["collectionId"]
assert len(body["itemIds"]) >= 1
Expand Down Expand Up @@ -246,28 +249,33 @@ def test_webhooks_sent(
assert post["itemId"]
assert post["scheduled"] in (True, False)
assert post.get("downloadUrls") is None
assert post.get("resources") is None

elif event == "crawlFinished":
crawl_finished_count += 1
assert post["itemId"]
assert post["state"]
assert post["downloadUrls"]
assert post["resources"]
assert post.get("downloadUrls") is None

elif event == "uploadFinished":
upload_finished_count += 1
assert post["itemId"]
assert post["state"]
assert post["downloadUrls"]
assert post["resources"]
assert post.get("downloadUrls") is None

elif event == "addedToCollection":
added_to_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post.get("resources") is None
assert post["itemIds"]
assert post["collectionId"]

elif event == "removedFromCollection":
removed_from_collection_count += 1
assert post["downloadUrls"] and len(post["downloadUrls"]) == 1
assert post.get("resources") is None
assert post["itemIds"]
assert post["collectionId"]

Expand Down
2 changes: 1 addition & 1 deletion chart/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ data:

RERUN_FROM_MIGRATION: "{{ .Values.rerun_from_migration }}"

PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes | default 60 }}"
PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes }}"

FAST_RETRY_SECS: "{{ .Values.operator_fast_resync_secs | default 3 }}"

Expand Down
6 changes: 6 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ storages:
# shared_storage_profile:


# optional: duration in minutes for WACZ download links to be valid
# used by webhooks and replay
# max value = 10079 (one week minus one minute)
# storage_presign_duration_minutes: 10079


# Email Options
# =========================================
email:
Expand Down

0 comments on commit d58747d

Please sign in to comment.