Skip to content

Commit

Permalink
Correct behaviors of kinesis operations without stream targets
Browse files Browse the repository at this point in the history
  • Loading branch information
TimPansino committed Feb 4, 2025
1 parent 31781f1 commit 766baff
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 32 deletions.
32 changes: 10 additions & 22 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ def extract_sqs(*args, **kwargs):

def extract_kinesis(*args, **kwargs):
# The stream name can be passed as the StreamName or as part of the StreamARN.
stream_value = kwargs.get("StreamName", "Unknown")
if stream_value == "Unknown":
stream_value = kwargs.get("StreamName", None)
if stream_value is not None:
arn = kwargs.get("StreamARN", None)
if arn:
if arn is not None:
stream_value = arn.split("/", 1)[-1]
return stream_value

Expand Down Expand Up @@ -1080,7 +1080,7 @@ def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs):
_library = library
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)
_destination_name = destination_name(*args, **kwargs) or "Unknown"

trace = MessageTrace(
_library,
Expand Down Expand Up @@ -1175,9 +1175,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "delete_resource_policy"): aws_function_trace(
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "delete_resource_policy"): aws_function_trace("delete_resource_policy", library="Kinesis"),
("kinesis", "delete_stream"): aws_function_trace(
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1187,9 +1185,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "describe_limits"): aws_function_trace(
"describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_limits"): aws_function_trace("describe_limits", library="Kinesis"),
("kinesis", "describe_stream"): aws_function_trace(
"describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1211,9 +1207,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "get_resource_policy"): aws_function_trace(
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_resource_policy"): aws_function_trace("get_resource_policy", library="Kinesis"),
("kinesis", "get_shard_iterator"): aws_function_trace(
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1229,18 +1223,14 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "list_stream_consumers"): aws_function_trace(
"list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace(
"list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace("list_streams", library="Kinesis"),
("kinesis", "list_tags_for_stream"): aws_function_trace(
"list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "merge_shards"): aws_function_trace(
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace(
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace("put_resource_policy", library="Kinesis"),
("kinesis", "register_stream_consumer"): aws_function_trace(
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1256,9 +1246,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "stop_stream_encryption"): aws_function_trace(
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace(
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace("subscribe_to_shard", library="Kinesis"),
("kinesis", "update_shard_count"): aws_function_trace(
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand Down
28 changes: 18 additions & 10 deletions tests/external_botocore/test_boto3_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,48 @@
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
(f"External/{URL}/botocore/POST", 2),
(f"External/{URL}/botocore/POST", 3),
]
if BOTOCORE_VERSION < (1, 29, 0):
_kinesis_scoped_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
(f"External/{URL}/botocore/POST", 4),
(f"External/{URL}/botocore/POST", 5),
]

_kinesis_rollup_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
("External/all", 4),
("External/allOther", 4),
(f"External/{URL}/all", 2),
(f"External/{URL}/botocore/POST", 2),
("External/all", 5),
("External/allOther", 5),
(f"External/{URL}/all", 3),
(f"External/{URL}/botocore/POST", 3),
]
if BOTOCORE_VERSION < (1, 29, 0):
_kinesis_rollup_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
("External/all", 4),
("External/allOther", 4),
(f"External/{URL}/all", 4),
(f"External/{URL}/botocore/POST", 4),
("External/all", 5),
("External/allOther", 5),
(f"External/{URL}/all", 5),
(f"External/{URL}/botocore/POST", 5),
]

_kinesis_scoped_metrics_error = [
Expand Down Expand Up @@ -146,6 +150,10 @@ def test_kinesis():
resp = client.create_stream(StreamName=TEST_STREAM, ShardCount=123, StreamModeDetails={"StreamMode": "on-demand"})
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# List streams
resp = client.list_streams(Limit=123)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Stream ARN is needed for rest of methods.
resp = client.describe_stream(
StreamName=TEST_STREAM,
Expand Down

0 comments on commit 766baff

Please sign in to comment.