Skip to content

Commit

Permalink
cds-740 fix kafka-event bug (#48)
Browse files Browse the repository at this point in the history
* re-added kinesis event type and adjusted kafka event handle

* adjusted all tests to ensure that they call the deserializer, removed internal reverence to underlying types.

* change log

* adjust comments

* Add default to subsystem param

* Updated version

* align cargo.toml version with template version

---------

Co-authored-by: royfur <[email protected]>
Co-authored-by: Concourse <[email protected]>
  • Loading branch information
3 people authored Feb 1, 2024
1 parent 9a4d28b commit 162c5e5
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 108 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v0.0.13 Beta / 2024-02-01

### 🧰 Bug fixes 🧰
- Fix bug causing non-kafka events to show up as kafka event

## v0.0.12 Beta / 2024-01-31

### 🧰 Bug fixes 🧰
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coralogix-aws-shipper"
version = "0.0.12"
version = "0.0.13"
edition = "2021"

[dependencies]
Expand Down
22 changes: 19 additions & 3 deletions src/combined_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,32 @@ impl<'de> Deserialize<'de> for CombinedEvent {
return Ok(CombinedEvent::CloudWatchLogs(event));
}

if let Ok(event) = KafkaEvent::deserialize(&raw_value) {
tracing::debug!("kafka event detected");
return Ok(CombinedEvent::Kafka(event));
if let Ok(event) = KinesisEvent::deserialize(&raw_value) {
tracing::debug!("kinesis event detected");
return Ok(CombinedEvent::Kinesis(event));
}

if let Ok(event) = SqsEvent::deserialize(&raw_value) {
tracing::debug!("sqs event detected");
return Ok(CombinedEvent::Sqs(event));
}

// IMPORTANT: kafka must be evaluated last as it uses an arbitrary map to evaluate records.
// Since all other fields are optional, this map could potentially match any arbitrary JSON
// and result in empty values.
if let Ok(event) = KafkaEvent::deserialize(&raw_value) {
tracing::debug!("kafka event detected");

// kafka events triggering a lambda function should always have at least one record
// if not, it is likely an unsupport or bad event
if event.records.is_empty() {
return Err(de::Error::custom(format!(
"unsupported or bad event type: {raw_value}"
)));
}
return Ok(CombinedEvent::Kafka(event));
}

Err(de::Error::custom(format!(
"unsupported event type: {raw_value}"
)))
Expand Down
41 changes: 2 additions & 39 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Metadata:
- kinesis
- cloudfront
HomePageUrl: https://coralogix.com
SemanticVersion: 0.0.12
SemanticVersion: 0.0.13
SourceCodeUrl: https://github.com/coralogix/coralogix-aws-shipper

AWS::CloudFormation::Interface:
Expand All @@ -41,7 +41,6 @@ Metadata:
- SubsystemName
- ApiKey
- StoreAPIKeyInSecretsManager

- Label:
default: S3/CloudTrail/VpcFlow/S3Csv configuration
Parameters:
Expand All @@ -52,7 +51,6 @@ Metadata:
- SNSTopicArn
- SQSTopicArn
- CSVDelimiter

- Label:
default: Kafka & MSK configuration
Parameters:
Expand All @@ -61,7 +59,6 @@ Metadata:
- KafkaSubnets
- KafkaSecurityGroups
- MSKClusterArn

- Label:
default: Cloudwatch configuration
Parameters:
Expand All @@ -85,16 +82,13 @@ Metadata:
- BlockingPattern
- SamplingRate
- AddMetadata


- Label:
default: Lambda configuration
Parameters:
- FunctionMemorySize
- FunctionTimeout
- LogLevel
- LambdaLogRetention

- Label:
default: VPC configuration Optional
Parameters:
Expand Down Expand Up @@ -139,6 +133,7 @@ Parameters:
SubsystemName:
Type: String
Description: The subsystem name of your application (https://coralogix.com/docs/application-and-subsystem-names/)
Default: ''

NewlinePattern:
Type: String
Expand Down Expand Up @@ -396,7 +391,6 @@ Conditions:
UseKinesisStreamARN: !And
- !Not [ !Equals [ !Ref KinesisStreamArn, '' ] ]
- !Equals [ !Ref CloudWatchLogGroupName, '' ]

UseSNSTopicARN:
Fn::Or:
- !And
Expand All @@ -406,56 +400,45 @@ Conditions:
- !And
- !Not [ !Equals [ !Ref SNSIntegrationTopicArn, '' ] ]
- !Equals [ !Ref CloudWatchLogGroupName, '' ]

UseVpcConfig: !And
- !Not [ !Equals [ !Ref LambdaSubnetID, '' ] ]
- !Not [ !Equals [ !Ref LambdaSecurityGroupID, '' ] ]

UseCloudwatchLogs: !And
- !Not [ !Equals [ !Ref CloudWatchLogGroupName, '' ] ]
- !Not [ !Condition UseSNSTopicARN ]

UseDefault: !And
- !Not [ !Condition UseCloudwatchLogs ]
- !Not [ !Condition UseSNSTopicARN ]
- !Not [ !Condition UseSQSTopicARN ]
- !Not [ !Condition UseKinesisStreamARN ]
- !Not [ !Condition UseMSK ]
- !Not [ !Condition UseKafka ]

UseDefaultWithNotification: !And
- !Condition UseDefault
- !Condition IsNotificationEnabled

UseCloudwatchLogsWithNotification: !And
- !Condition UseCloudwatchLogs
- !Condition IsNotificationEnabled

UseKinesisTopicARNWithNotification: !And
- !Condition UseKinesisStreamARN
- !Condition IsNotificationEnabled
UseSQSTopicARNWithNotification: !And
- !Condition UseSQSTopicARN
- !Condition IsNotificationEnabled

UseSNSTopicARNWithNotification: !And
- !Condition UseSNSTopicARN
- !Condition IsNotificationEnabled

UseMSK: !And
- !Not [ !Equals [ !Ref MSKClusterArn, '' ] ]
- !Not [ !Equals [ !Ref KafkaTopic, '' ] ]
- !Equals [ !Ref IntegrationType, 'MSK' ]

UseKafka: !And
- !Not [ !Equals [ !Ref KafkaTopic, '' ] ]
- !Equals [ !Ref MSKClusterArn, '' ]
- !Equals [ !Ref IntegrationType, 'Kafka' ]

UseMSKWithNotification: !And
- !Condition UseMSK
- !Condition IsNotificationEnabled

UseKatkaTopicARNWithNotification: !And
- !Condition IsKafkaIntegration
- !Condition IsNotificationEnabled
Expand Down Expand Up @@ -653,7 +636,6 @@ Resources:
- 'secretsmanager:GetSecretValue'
Resource: '*'


LambdaLogGroupDefault:
Condition: UseDefault
DeletionPolicy: Retain
Expand Down Expand Up @@ -757,7 +739,6 @@ Resources:
- !Ref SNSIntegrationTopicArn
- !Ref SNSTopicArn


LambdaLogGroupWithSNSTopic:
Condition: UseSNSTopicARN
DeletionPolicy: Retain
Expand Down Expand Up @@ -1090,7 +1071,6 @@ Resources:
OnFailure:
Type: SNS


LambdaTrigger:
Type: Custom::KafkaTrigger
Condition: UseKafka
Expand Down Expand Up @@ -1169,7 +1149,6 @@ Resources:
try:
lambda_arn = event['ResourceProperties']['LambdaArn']
lambda_client = boto3.client('lambda')
if event['RequestType'] in ['Create', 'Update']:
StringlogGroupName = event['ResourceProperties']['CloudwatchGroup']
logGroupName = StringlogGroupName.split(',')
Expand Down Expand Up @@ -1200,7 +1179,6 @@ Resources:
- |
#!/usr/bin/python
# -*- coding: utf-8 -*-
import json, time, boto3
from urllib import request, parse, error
Expand Down Expand Up @@ -1228,14 +1206,11 @@ Resources:
'NoEcho': no_echo,
'Data': response_data
}
json_response_body = json.dumps(response_body).encode('utf-8')
headers = {
'content-type': '',
'content-length': str(len(json_response_body))
}
try:
req = request.Request(self.response_url, data=json_response_body, headers=headers, method='PUT')
with request.urlopen(req) as response:
Expand All @@ -1253,7 +1228,6 @@ Resources:
mappings = client.list_event_source_mappings(
FunctionName=function_name,
)["EventSourceMappings"]
for mapping in mappings:
# disable mapping
if mapping["State"] == "Enabled":
Expand All @@ -1271,14 +1245,12 @@ Resources:
responseStatus = CFNResponse.SUCCESS
physicalResourceId = event.get("PhysicalResourceId")
function_name = event["ResourceProperties"]["Function"]
try:
print("Request Type:", event["RequestType"])
if event["RequestType"] in ["Create", "Update"]:
if event["RequestType"] == "Update":
print('Update event detected, deleting previous mapping(s)')
delete_event_source_mappings(function_name)
response = client.create_event_source_mapping(
FunctionName=event["ResourceProperties"]["Function"],
BatchSize=int(event["ResourceProperties"]["BatchSize"]),
Expand All @@ -1303,20 +1275,16 @@ Resources:
} for securityGroupId in event["ResourceProperties"]["SecurityGroupIds"]
])
)
physicalResourceId = response["UUID"]
print(f"EventSourceMapping successfully created: {physicalResourceId}")
elif event["RequestType"] == "Delete":
delete_event_source_mappings(function_name)
print("EventSourceMapping successfully deleted")
except Exception as exc:
print("Failed to process:", exc)
responseStatus = CFNResponse.FAILED
finally:
cfn.send(responseStatus, {}, physical_resource_id=physicalResourceId)
- |
#!/usr/bin/python
# -*- coding: utf-8 -*-
Expand All @@ -1325,11 +1293,9 @@ Resources:
import boto3
import cfnresponse
print("Loading function")
s3 = boto3.client('s3')
def lambda_handler(event, context):
print("Received event:", json.dumps(event, indent=2))
bucket = event['ResourceProperties']['Bucket']
Expand All @@ -1349,7 +1315,6 @@ Resources:
BucketNotificationConfiguration['LambdaFunctionConfigurations']
)
)
if event['RequestType'] in ['Create', 'Update']:
BucketNotificationConfiguration['LambdaFunctionConfigurations'].append({
'Id': event.get('PhysicalResourceId', context.aws_request_id),
Expand All @@ -1372,10 +1337,8 @@ Resources:
's3:ObjectCreated:*'
]
})
if len(BucketNotificationConfiguration['LambdaFunctionConfigurations']) == 0:
BucketNotificationConfiguration.pop('LambdaFunctionConfigurations')
print(f'nofication configuration: {BucketNotificationConfiguration}')
s3.put_bucket_notification_configuration(
Bucket=bucket,
Expand Down
Loading

0 comments on commit 162c5e5

Please sign in to comment.