Skip to content

Commit

Permalink
feat: granule links via event subscription (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
ceholden authored Dec 17, 2024
1 parent 7a62b00 commit ddcdf3a
Show file tree
Hide file tree
Showing 29 changed files with 2,674 additions and 853 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ $ npm install # This installs any node packages that are within package.json (CD
$ make install # This calls `pipenv install --dev` on the repo root and any of the directories that contain a Makefile with `install`
```

_**Note** you might have an issue installing `psycopg2` - I found [this](https://github.com/pypa/pipenv/issues/3991#issuecomment-564645309) helpful_

A file named `.env` is expected in the root of the repository, the expected values are:

```bash
Expand Down
2 changes: 1 addition & 1 deletion alembic_migration/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ sqlalchemy = "==1.4.0"
db = {editable = true, path = "./../layers/db"}
pytest = "==7.4.3"
moto = "==5.0.17"
psycopg2 = "==2.9.10"
psycopg2-binary = "==2.9.10"
pytest-docker = "==2.0.1"
assertpy = "==1.1"
pytest-cov = "==4.1.0"
Expand Down
334 changes: 196 additions & 138 deletions alembic_migration/Pipfile.lock

Large diffs are not rendered by default.

77 changes: 66 additions & 11 deletions cdk/downloader_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
Duration,
RemovalPolicy,
Stack,
aws_apigatewayv2,
aws_apigatewayv2_integrations,
aws_cloudwatch,
aws_ec2,
aws_events,
aws_events_targets,
aws_iam,
aws_lambda,
aws_s3,
)
from aws_cdk import aws_lambda_python_alpha as aws_lambda_python
from aws_cdk import aws_logs, aws_rds, aws_secretsmanager, aws_sqs, aws_ssm
from aws_cdk import aws_logs, aws_rds, aws_s3, aws_secretsmanager, aws_sqs, aws_ssm
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as tasks
from constructs import Construct
Expand Down Expand Up @@ -255,7 +256,7 @@ def __init__(
self,
id=f"{identifier}-link-fetcher",
entry="lambdas/link_fetcher",
index="handler.py",
index="app/search_handler.py",
handler="handler",
layers=[
db_layer,
Expand Down Expand Up @@ -286,6 +287,58 @@ def __init__(
threshold=1,
)

link_subscription = aws_lambda_python.PythonFunction(
self,
id=f"{identifier}-link-subscription",
entry="lambdas/link_fetcher",
index="app/subscription_handler.py",
handler="handler",
layers=[
db_layer,
],
memory_size=200,
timeout=Duration.minutes(15),
runtime=aws_lambda.Runtime.PYTHON_3_11,
environment=link_fetcher_environment_vars,
)

aws_logs.LogGroup(
self,
id=f"{identifier}-link-subscription-log-group",
log_group_name=f"/aws/lambda/{link_subscription.function_name}",
removal_policy=RemovalPolicy.DESTROY
if removal_policy_destroy
else RemovalPolicy.RETAIN,
retention=aws_logs.RetentionDays.ONE_DAY
if removal_policy_destroy
else aws_logs.RetentionDays.TWO_WEEKS,
)

aws_cloudwatch.Alarm(
self,
id=f"{identifier}-link-subscription-errors-alarm",
metric=link_fetcher.metric_errors(),
evaluation_periods=3,
threshold=1,
)

forwarder_api = aws_apigatewayv2.HttpApi(
self,
"EsaPushSubscriptionHandlerApi",
api_name="EsaPushSubscriptionHandlerApi",
default_integration=aws_apigatewayv2_integrations.HttpLambdaIntegration(
"EsaPushSubscriptionHandlerApi-Integration",
handler=link_subscription,
),
)

aws_ssm.StringParameter(
self,
id=f"{identifier}-link-subscription-endpoint-url",
string_value=forwarder_api.url,
parameter_name=f"/hls-s2-downloader-serverless/{identifier}/link_subscription_endpoint_url",
)

downloader_environment_vars = {
"STAGE": identifier,
"DB_CONNECTION_SECRET_ARN": downloader_rds_secret.secret_arn,
Expand Down Expand Up @@ -338,7 +391,7 @@ def __init__(
self,
id=f"{identifier}-downloader-role-arn",
string_value=self.downloader.role.role_arn,
parameter_name=(f"/integration_tests/{identifier}/downloader_role_arn"),
parameter_name=f"/integration_tests/{identifier}/downloader_role_arn",
)

self.downloader.role.add_managed_policy(lambda_insights_policy)
Expand All @@ -351,15 +404,9 @@ def __init__(
downloader_bucket.grant_write(self.downloader)

downloader_rds_secret.grant_read(link_fetcher)
downloader_rds_secret.grant_read(link_subscription)
downloader_rds_secret.grant_read(self.downloader)

scihub_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-scihub-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/scihub-credentials",
)
scihub_credentials.grant_read(self.downloader)

copernicus_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-copernicus-credentials",
Expand All @@ -368,9 +415,17 @@ def __init__(
copernicus_credentials.grant_read(self.downloader)
copernicus_credentials.grant_read(self.token_rotator)

esa_subscription_credentials = aws_secretsmanager.Secret.from_secret_name_v2(
self,
id=f"{identifier}-esa-subscription-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials",
)
esa_subscription_credentials.grant_read(link_subscription)

token_parameter.grant_read(self.downloader)

to_download_queue.grant_send_messages(link_fetcher)
to_download_queue.grant_send_messages(link_subscription)
to_download_queue.grant_consume_messages(self.downloader)

# We must resort to using CfnEventSourceMapping to set the maximum concurrency
Expand Down
18 changes: 9 additions & 9 deletions cdk/integration_stack.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
from typing import Optional

from aws_cdk import Duration, RemovalPolicy, Stack, aws_apigateway, aws_lambda
from aws_cdk import Duration, RemovalPolicy, Stack, aws_apigateway, aws_iam, aws_lambda
from aws_cdk import aws_lambda_python_alpha as aws_lambda_python
from aws_cdk import aws_iam, aws_logs, aws_s3, aws_secretsmanager, aws_ssm
from aws_cdk import aws_logs, aws_s3, aws_secretsmanager, aws_ssm
from constructs import Construct


Expand All @@ -27,16 +27,16 @@ def __init__(
)
)

# TODO remove this, along with other references to it, but leaving for
# now, just in case removing it would break the downloader lambda
aws_secretsmanager.Secret(
self,
id=f"{identifier}-integration-scihub-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/scihub-credentials",
description="Dummy values for the Mock SciHub API credentials",
id=f"{identifier}-integration-esa-subscription-credentials",
secret_name=f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials",
description="Dummy values for the ESA 'push' subscription authentication",
generate_secret_string=aws_secretsmanager.SecretStringGenerator(
secret_string_template=json.dumps({"username": "test-user"}),
generate_string_key="password",
secret_string_template=json.dumps(
{"notification_username": "test-user"}
),
generate_string_key="notification_password",
),
)

Expand Down
Binary file added images/hls-s2-downloader-link-subscription.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def db_session(monkeypatch, ssm_parameter: Callable[[str], str]) -> Iterable[Ses
session.commit()


@pytest.fixture
def link_subscription_endpoint_url(ssm_client: SSMClient, identifier: str):
qname = f"/hls-s2-downloader-serverless/{identifier}/link_subscription_endpoint_url"
result = ssm_client.get_parameter(Name=qname)
value = result["Parameter"].get("Value")
assert value is not None, f"No such SSM parameter: {qname}"
return value


@pytest.fixture
def step_function_arn(ssm_parameter: Callable[[str], str]):
return ssm_parameter("link_fetcher_step_function_arn")
Expand Down
149 changes: 149 additions & 0 deletions integration_tests/test_link_push_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import datetime as dt
import json
from pathlib import Path
from typing import Callable
from uuid import uuid4

import boto3
import polling2
import pytest
import requests
from db.models.granule import Granule
from mypy_boto3_sqs import SQSClient
from sqlalchemy.orm import Session


def check_sqs_message_count(sqs_client, queue_url, count):
queue_attributes = sqs_client.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=["ApproximateNumberOfMessages"]
)
return int(queue_attributes["Attributes"]["ApproximateNumberOfMessages"]) == count


def _format_dt(datetime: dt.datetime) -> str:
"""Format datetime into string used by ESA's payload"""
return datetime.isoformat().replace("+00:00", "Z")


@pytest.fixture
def recent_event_s2_created() -> dict:
"""Create a recent Sentinel-2 "Created" event from ESA's push subscription
This message contains two types of fields,
* Message metadata (event type, subscription ID, ack ID, notification date, etc)
* Message "body" - `(.value)`
"""
# Reusing example from ESA as a template
data = (
Path(__file__).parents[1]
/ "lambdas"
/ "link_fetcher"
/ "tests"
/ "data"
/ "push-granule-created-s2-n1.json"
)
payload = json.loads(data.read_text())

# Update relevant parts of message payload to be "recent"
# where recent is <30 days from today as we're not currently
# reprocessing historical scenes that ESA has reprocessed
now = dt.datetime.now(tz=dt.timezone.utc)

payload["NotificationDate"] = _format_dt(now)
payload["value"]["OriginDate"] = _format_dt(now - dt.timedelta(seconds=7))
payload["value"]["PublicationDate"] = _format_dt(now - dt.timedelta(seconds=37))
payload["value"]["ModificationDate"] = _format_dt(now - dt.timedelta(seconds=1))
payload["value"]["ContentDate"] = {
"Start": _format_dt(now - dt.timedelta(hours=3, seconds=3)),
"End": _format_dt(now - dt.timedelta(hours=3)),
}
# We're not using fields in `payload["value"]["Attributes"]` but there's duplicate
# datetime information in there following OData conventions

# Randomize ID of message to ensure each fixture's return is unique according
# to our DB (which uses granule ID as primary key)
payload["value"]["Id"] = str(uuid4())

return payload


@pytest.fixture
def link_subscription_credentials(
identifier: str, ssm_parameter: Callable[[str], str]
) -> tuple[str, str]:
"""Return user/pass credentials for subscription endpoint"""
secrets_manager_client = boto3.client("secretsmanager")
secret = json.loads(
secrets_manager_client.get_secret_value(
SecretId=(
f"hls-s2-downloader-serverless/{identifier}/esa-subscription-credentials"
)
)["SecretString"]
)

return (
secret["notification_username"],
secret["notification_password"],
)


@pytest.mark.parametrize("notification_count", [1, 2])
def test_link_push_subscription_handles_event(
recent_event_s2_created: dict,
link_subscription_endpoint_url: str,
link_subscription_credentials: tuple[str, str],
db_session: Session,
sqs_client: SQSClient,
queue_url: str,
notification_count: int,
):
"""Test that we handle a new granule created notification
We have occasionally observed duplicate granule IDs being
sent to our API endpoint and we want to only process one,
so this test includes a parametrized "notification_count"
to replicate this reality.
"""
for _ in range(notification_count):
resp = requests.post(
f"{link_subscription_endpoint_url}events",
auth=link_subscription_credentials,
json=recent_event_s2_created,
)

# ensure correct response (204)
assert resp.status_code == 204

# ensure we have SQS message
polling2.poll(
check_sqs_message_count,
args=(sqs_client, queue_url, 1),
step=5,
timeout=120,
)

# ensure we have 1 granule for this ID
granules = (
db_session.query(Granule).filter(
Granule.id == recent_event_s2_created["value"]["Id"]
)
).all()
assert len(granules) == 1


def test_link_push_subscription_user_auth_rejects_incorrect(
link_subscription_endpoint_url: str,
):
"""Test that we reject incorrect authentication"""
url = f"{link_subscription_endpoint_url}events"
resp = requests.post(
url,
auth=(
"foo",
"bar",
),
json={},
)

# ensure correct response (401 Unauthorized)
assert resp.status_code == 401
2 changes: 1 addition & 1 deletion lambdas/downloader/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ db = {editable = true, path = "./../../layers/db"}
pytest-docker = "==2.0.1"
alembic = "==1.12.1"
moto = "==5.0.17"
psycopg2 = "==2.9.10"
psycopg2-binary = "==2.9.10"
assertpy = "==1.1"
responses = "==0.23.1"
freezegun = "==1.0.0"
Expand Down
Loading

0 comments on commit ddcdf3a

Please sign in to comment.