-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery-athena.py
54 lines (48 loc) · 1.68 KB
/
query-athena.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import boto3
from botocore.exceptions import ClientError
# Parameters
ROLE_ARN = "arn:aws:iam::<ACCT-ID-B>:role/<ROLE-A>"
DATABASE = '<DB>'
QUERY = 'SELECT * FROM "<DB>"."<TABLE>" limit 10;'
S3_OUTPUT = 's3://<BUCKET-NAME>/output/'
def assume_role(arn):
try:
# Create an STS client using the default credentials provided to the pod
sts_client = boto3.client('sts')
assumed_role = sts_client.assume_role(
RoleArn=arn,
RoleSessionName="AssumedRoleSession"
)
return assumed_role['Credentials']
except ClientError as e:
print(f"Failed to assume role: {e}")
return None
def query_athena(query, database, s3_output, credentials):
try:
# Create a new session using the assumed role's credentials
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
)
athena_client = session.client('athena')
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': s3_output
}
)
return response
except ClientError as e:
print(f"Failed to query Athena: {e}")
return None
def main():
credentials = assume_role(ROLE_ARN)
if credentials:
response = query_athena(QUERY, DATABASE, S3_OUTPUT, credentials)
print("Query execution ID:", response['QueryExecutionId'])
if __name__ == "__main__":
main()