-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsigner.py
102 lines (77 loc) · 2.64 KB
/
signer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from base64 import b64encode
from configparser import ConfigParser
from hashlib import sha256
from http.cookies import SimpleCookie
from os import environ
from os import urandom
from textwrap import dedent
from urllib.parse import urlsplit
from urllib.parse import urlunsplit
import hmac
import json
from botocore.session import Session
from botocore.auth import S3SigV4QueryAuth
from botocore.awsrequest import AWSRequest
COOKIE_NAME = "secret"
HEADER_NAME = "cf-auth"
parser = ConfigParser()
parser.read(environ["LAMBDA_TASK_ROOT"] + "/config.ini")
config = parser["default"]
load_credentials = Session().get_component("credential_provider").load_credentials
def _replace_domain(url, domain):
split = urlsplit(url)
return urlunsplit(split._replace(netloc=domain))
def _authed_request(*, pepper, secret, region, bucket, path, domain):
credentials = load_credentials()
auth = S3SigV4QueryAuth(credentials, "s3", region)
mac = hmac.new(pepper.encode(), f"{secret}/{path}".encode(), sha256).digest()
headers = {HEADER_NAME: b64encode(mac).decode()}
url = f"https://{bucket}.s3.{region}.amazonaws.com/{path}"
# Synthesize a request and sign it.
synthetic_request = AWSRequest(method="GET", url=url, headers=headers)
auth.add_auth(synthetic_request)
return _replace_domain(synthetic_request.url, domain)
def _get_cookies(headers):
ret = dict()
try:
cookies = headers["cookie"][0]["value"]
except KeyError:
return ret
return dict((k, v.value) for k, v in SimpleCookie(cookies).items())
def _to_headers(d):
return {key: [dict(key=key, value=value)] for key, value in d.items()}
def _generate_token():
return b64encode(urandom(16)).decode().replace("=", "")
def lambda_handler(event, context):
request = event["Records"][0]["cf"]["request"]
cookies = _get_cookies(request["headers"])
headers = {"content-type": "text/html"}
token = cookies.get(COOKIE_NAME, None)
if token is None:
token = _generate_token()
headers["set-cookie"] = f"{COOKIE_NAME}={token}; Secure; HttpOnly; SameSite=Lax"
url = _authed_request(
pepper=config["pepper"],
secret=token,
region=config["s3_bucket_region"],
bucket=config["s3_bucket"],
path=config["s3_object"],
domain=config["cf_domain"],
)
headers["location"] = url
body = dedent(
f"""\
<!DOCTYPE html>
<html>
<body>
<a href="{url}">Click me!</a>
</body>
</html>
"""
)
return dict(
status=302,
statusDescription="Found",
headers=_to_headers(headers),
body=body,
)