Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aws entity support for Kinesis #1283

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 203 additions & 10 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ def extract_sqs(*args, **kwargs):
return queue_value.rsplit("/", 1)[-1]


def extract_kinesis(*args, **kwargs):
# The stream name can be passed as the StreamName or as part of the StreamARN.
stream_value = kwargs.get("StreamName", "Unknown")
if stream_value == "Unknown":
arn = kwargs.get("StreamARN", None)
if arn:
stream_value = arn.split("/", 1)[-1]
return stream_value


def extract_sqs_agent_attrs(*args, **kwargs):
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
agent_attrs = {}
Expand All @@ -75,6 +85,19 @@ def extract_sqs_agent_attrs(*args, **kwargs):
return agent_attrs


def extract_kinesis_agent_attrs(*args, **kwargs):
# Try to capture AWS Kinesis info as agent attributes. Log any exception to debug.
agent_attrs = {}
try:
stream_arn = kwargs.get("StreamARN", None)
if stream_arn:
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
agent_attrs["cloud.resource_id"] = stream_arn
except Exception as e:
_logger.debug("Failed to capture AWS Kinesis info.", exc_info=True)
return agent_attrs


def extract(argument_names, default=None):
def extractor_list(*args, **kwargs):
for argument_name in argument_names:
Expand Down Expand Up @@ -954,17 +977,61 @@ def _nr_dynamodb_datastore_trace_wrapper_(wrapped, instance, args, kwargs):
return _nr_dynamodb_datastore_trace_wrapper_


def sqs_message_trace(
def aws_function_trace(
operation,
destination_name,
params={},
terminal=False,
async_wrapper=None,
extract_agent_attrs=None,
library=None,
):
@function_wrapper
def _nr_aws_function_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
if not parent:
return wrapped(*args, **kwargs)
else:
parent = None

_destination_name = destination_name(*args, **kwargs)

trace = FunctionTrace(
name=_destination_name,
group=f"{library}/{operation}",
params=params,
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(*args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

with trace:
return wrapped(*args, **kwargs)

return _nr_aws_function_trace_wrapper_


def aws_message_trace(
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
library=None,
):
@function_wrapper
def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
Expand All @@ -973,7 +1040,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
else:
parent = None

_library = "SQS"
_library = library
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)
Expand All @@ -999,7 +1066,7 @@ def _nr_sqs_message_trace_wrapper_(wrapped, instance, args, kwargs):
with trace:
return wrapped(*args, **kwargs)

return _nr_sqs_message_trace_wrapper_
return _nr_aws_message_trace_wrapper_


def wrap_emit_api_params(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -1059,14 +1126,140 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("dynamodb", "delete_table"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
("dynamodb", "query"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "query"),
("dynamodb", "scan"): dynamodb_datastore_trace("DynamoDB", extract("TableName"), "scan"),
("sqs", "send_message"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("kinesis", "add_tags_to_stream"): aws_function_trace(
"add_tags_to_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "can_paginate"): aws_function_trace(
"can_paginate", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "close"): aws_function_trace(
"close", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "create_stream"): aws_function_trace(
"create_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "decrease_stream_retention_period"): aws_function_trace(
"decrease_stream_retention_period",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "delete_resource_policy"): aws_function_trace(
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "delete_stream"): aws_function_trace(
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "deregister_stream_consumer"): aws_function_trace(
"deregister_stream_consumer",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "describe_limits"): aws_function_trace(
"describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream"): aws_function_trace(
"describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream_consumer"): aws_function_trace(
"describe_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_stream_summary"): aws_function_trace(
"describe_stream_summary", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "disable_enhanced_monitoring"): aws_function_trace(
"disable_enhanced_monitoring",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "enable_enhanced_monitoring"): aws_function_trace(
"enable_enhanced_monitoring",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "generate_presigned_url"): aws_function_trace(
"generate_presigned_url", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_paginator"): aws_function_trace(
"get_paginator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_resource_policy"): aws_function_trace(
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_shard_iterator"): aws_function_trace(
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_waiter"): aws_function_trace(
"get_waiter", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "increase_stream_retention_period"): aws_function_trace(
"increase_stream_retention_period",
extract_kinesis,
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "list_shards"): aws_function_trace(
"list_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_stream_consumers"): aws_function_trace(
"list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace(
"list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_tags_for_stream"): aws_function_trace(
"list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "merge_shards"): aws_function_trace(
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace(
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "register_stream_consumer"): aws_function_trace(
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "remove_tags_from_stream"): aws_function_trace(
"remove_tags_from_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "split_shard"): aws_function_trace(
"split_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "start_stream_encryption"): aws_function_trace(
"start_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "stop_stream_encryption"): aws_function_trace(
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace(
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "update_shard_count"): aws_function_trace(
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "update_stream_mode"): aws_function_trace(
"update_stream_mode", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library=""
),
("kinesis", "put_record"): aws_message_trace(
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_records"): aws_message_trace(
"Produce", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_records"): aws_message_trace(
"Consume", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("sqs", "send_message"): aws_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("sqs", "send_message_batch"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("sqs", "send_message_batch"): aws_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("sqs", "receive_message"): sqs_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs
("sqs", "receive_message"): aws_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
Expand Down
Loading
Loading