Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for metrics in stream response handler #738

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ default int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return bodyBytesIn.length;
}

/**
* Called right before stream is complete, whether successful or unsuccessful.
* @param stream The HTTP stream to which the metrics apply
* @param metrics The [HttpStreamMetrics] containing metrics for the given stream
*/
default void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
/* Optional callback, nothing to do by default */
}
Comment on lines +82 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: can we move the metrics callback before the complete callback? So, that it's the same order as it gets invoked


/**
* Called from Native when the Response has completed.
*
*
* @param stream completed HttpStreamBase
* @param errorCode resultant errorCode for the response
*/
void onResponseComplete(HttpStreamBase stream, int errorCode);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.awssdk.crt.http;

/**
* Holds tracing metrics for an HTTP stream. Maps to `struct aws_http_stream_metrics` in **aws-c-http**'s
* **request_response.h**.
*/
public class HttpStreamMetrics {
private final long sendStartTimestampNs;
private final long sendEndTimestampNs;
private final long sendingDurationNs;
private final long receiveStartTimestampNs;
private final long receiveEndTimestampNs;
private final long receivingDurationNs;
private final int streamId;

HttpStreamMetrics(
long sendStartTimestampNs,
long sendEndTimestampNs,
long sendingDurationNs,
long receiveStartTimestampNs,
long receiveEndTimestampNs,
long receivingDurationNs,
int streamId
) {
this.sendStartTimestampNs = sendStartTimestampNs;
this.sendEndTimestampNs = sendEndTimestampNs;
this.sendingDurationNs = sendingDurationNs;
this.receiveStartTimestampNs = receiveStartTimestampNs;
this.receiveEndTimestampNs = receiveEndTimestampNs;
this.receivingDurationNs = receivingDurationNs;
this.streamId = streamId;
}

public long getSendStartTimestampNs() {
return sendStartTimestampNs;
}

public long getSendEndTimestampNs() {
return sendEndTimestampNs;
}

public long getSendingDurationNs() {
return sendingDurationNs;
}

public long getReceiveStartTimestampNs() {
return receiveStartTimestampNs;
}

public long getReceiveEndTimestampNs() {
return receiveEndTimestampNs;
}

public long getReceivingDurationNs() {
return receivingDurationNs;
}

public int getStreamId() {
return streamId;
}

@Override
public String toString() {
return "HttpStreamMetrics{" +
"sendStartTimestampNs=" + sendStartTimestampNs +
", sendEndTimestampNs=" + sendEndTimestampNs +
", sendingDurationNs=" + sendingDurationNs +
", receiveStartTimestampNs=" + receiveStartTimestampNs +
", receiveEndTimestampNs=" + receiveEndTimestampNs +
", receivingDurationNs=" + receivingDurationNs +
", streamId=" + streamId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ default int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
return bodyBytesIn.length;
}

/**
* Called right before stream is complete, whether successful or unsuccessful.
* @param stream The HTTP stream to which the metrics apply
* @param metrics The [HttpStreamMetrics] containing metrics for the given stream
*/
default void onMetrics(HttpStream stream, HttpStreamMetrics metrics) {
/* Optional callback, nothing to do by default */
}

/**
* Called from Native when the Response has completed.
* @param stream completed stream
* @param errorCode resultant errorCode for the response
*/
void onResponseComplete(HttpStream stream, int errorCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ int onResponseBody(HttpStreamBase stream, ByteBuffer bodyBytesIn) {
}
}

void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
if (this.responseBaseHandler != null) {
responseBaseHandler.onMetrics(stream, metrics);
} else {
responseHandler.onMetrics((HttpStream) stream, metrics);
}
}

void onResponseComplete(HttpStreamBase stream, int errorCode) {
if (this.responseBaseHandler != null) {
responseBaseHandler.onResponseComplete(stream, errorCode);
Expand Down
51 changes: 51 additions & 0 deletions src/native/http_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,56 @@ void aws_java_http_stream_on_stream_destroy_fn(void *user_data) {
/********** JNI ENV RELEASE **********/
}

void aws_java_http_stream_on_stream_metrics_fn(
struct aws_http_stream *stream,
const struct aws_http_stream_metrics *metrics,
void *user_data) {
struct http_stream_binding *binding = (struct http_stream_binding *)user_data;

/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}

/* Convert metrics to Java HttpStreamMetrics obj */
jobject jni_metrics = (*env)->NewObject(
env,
http_stream_metrics_properties.http_stream_metrics_class,
http_stream_metrics_properties.constructor_id,
(jlong)metrics->send_start_timestamp_ns,
(jlong)metrics->send_end_timestamp_ns,
(jlong)metrics->sending_duration_ns,
(jlong)metrics->receive_start_timestamp_ns,
(jlong)metrics->receive_end_timestamp_ns,
(jlong)metrics->receiving_duration_ns,

/* Stream IDs are 31-bit unsigned integers, which fits into Java's regular (signed) 32-bit int */
(jint)metrics->stream_id);

(*env)->CallVoidMethod(
env,
binding->java_http_response_stream_handler,
http_stream_response_handler_properties.onMetrics,
binding->java_http_stream_base,
jni_metrics);

/* Delete local reference to metrics object */
(*env)->DeleteLocalRef(env, jni_metrics);

if (aws_jni_check_and_clear_exception(env)) {
/* Close the Connection if the Java Callback throws an Exception */
aws_http_connection_close(aws_http_stream_get_connection(stream));

AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Received Exception from onMetrics", (void *)stream);
aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to manually delete the local reference created when it's called from a C thread, (*env)->DeleteLocalRef(env, jni_metrics);, otherwise, it will result in a leak. We had the similar leak recently, referring to #734

aws_jni_release_thread_env(binding->jvm, env);
/********** JNI ENV RELEASE **********/
}

jobjectArray aws_java_http_headers_from_native(JNIEnv *env, struct aws_http_headers *headers) {
(void)headers;
jobjectArray ret;
Expand Down Expand Up @@ -383,6 +433,7 @@ static jobject s_make_request_general(
.on_response_body = aws_java_http_stream_on_incoming_body_fn,
.on_complete = aws_java_http_stream_on_stream_complete_fn,
.on_destroy = aws_java_http_stream_on_stream_destroy_fn,
.on_metrics = aws_java_http_stream_on_stream_metrics_fn,
.user_data = stream_binding,
};

Expand Down
19 changes: 19 additions & 0 deletions src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ static void s_cache_http_stream_response_handler_native_adapter(JNIEnv *env) {
http_stream_response_handler_properties.onResponseComplete =
(*env)->GetMethodID(env, cls, "onResponseComplete", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;I)V");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onResponseComplete);

http_stream_response_handler_properties.onMetrics = (*env)->GetMethodID(
env,
cls,
"onMetrics",
"(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;Lsoftware/amazon/awssdk/crt/http/HttpStreamMetrics;)V");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onMetrics);
}

struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties;
Expand All @@ -535,6 +542,17 @@ static void s_cache_http_stream_write_chunk_completion_properties(JNIEnv *env) {
AWS_FATAL_ASSERT(http_stream_write_chunk_completion_properties.callback);
}

struct java_http_stream_metrics_properties http_stream_metrics_properties;

static void s_cache_http_stream_metrics_properties(JNIEnv *env) {
jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStreamMetrics");
AWS_FATAL_ASSERT(cls);
http_stream_metrics_properties.http_stream_metrics_class = (*env)->NewGlobalRef(env, cls);

http_stream_metrics_properties.constructor_id = (*env)->GetMethodID(env, cls, "<init>", "(JJJJJJI)V");
AWS_FATAL_ASSERT(http_stream_metrics_properties.constructor_id);
}

struct java_event_stream_server_listener_properties event_stream_server_listener_properties;

static void s_cache_event_stream_server_listener_properties(JNIEnv *env) {
Expand Down Expand Up @@ -2316,6 +2334,7 @@ static void s_cache_java_class_ids(void *user_data) {
s_cache_http2_stream(env);
s_cache_http_stream_response_handler_native_adapter(env);
s_cache_http_stream_write_chunk_completion_properties(env);
s_cache_http_stream_metrics_properties(env);
s_cache_event_stream_server_listener_properties(env);
s_cache_event_stream_server_listener_handler_properties(env);
s_cache_event_stream_server_connection_handler_properties(env);
Expand Down
8 changes: 8 additions & 0 deletions src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct java_http_stream_response_handler_native_adapter_properties {
jmethodID onResponseHeadersDone;
jmethodID onResponseBody;
jmethodID onResponseComplete;
jmethodID onMetrics;
};
extern struct java_http_stream_response_handler_native_adapter_properties http_stream_response_handler_properties;

Expand All @@ -245,6 +246,13 @@ struct java_http_stream_write_chunk_completion_properties {
};
extern struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties;

/* HtppStreamMetrics */
struct java_http_stream_metrics_properties {
jclass http_stream_metrics_class;
jmethodID constructor_id;
};
extern struct java_http_stream_metrics_properties http_stream_metrics_properties;

/* EventStreamServerListener */
struct java_event_stream_server_listener_properties {
jmethodID onShutdownComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
import org.junit.Assert;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.Http2ClientConnection;
import software.amazon.awssdk.crt.http.Http2Request;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.Http2Stream;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.crt.http.HttpStreamMetrics;
import software.amazon.awssdk.crt.http.HttpVersion;

import java.net.URI;
import java.nio.ByteBuffer;
Expand All @@ -31,6 +28,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class HttpRequestResponseFixture extends HttpClientTestFixture {

Expand Down Expand Up @@ -94,6 +92,7 @@ public TestHttpResponse getResponse(URI uri, HttpRequestBase request, byte[] chu
boolean actuallyConnected = false;

final CompletableFuture<Void> reqCompleted = new CompletableFuture<>();
final AtomicReference<HttpStreamMetrics> metricsRef = new AtomicReference<>(null);

final TestHttpResponse response = new TestHttpResponse();

Expand Down Expand Up @@ -126,6 +125,11 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return amountRead;
}

@Override
public void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
Assert.assertTrue(metricsRef.compareAndSet(null, metrics));
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
response.onCompleteErrorCode = errorCode;
Expand All @@ -149,6 +153,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
}

Assert.assertTrue(actuallyConnected);
Assert.assertNotNull(metricsRef.get());

shutdownComplete.get(60, TimeUnit.SECONDS);

Expand Down
Loading