-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][fn] Record Pulsar Function processing time properly for asynchronous functions #23811
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change doesn't seem to be related to the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, uploading these file changes was a mistake. I've force-pushed in a later commit to remove those changes.
CompletableFuture<InstanceObserver> future = CompletableFuture.supplyAsync(() -> { | ||
JavaExecutionResult result; | ||
InstanceObserver instanceObserver = new InstanceObserver(); | ||
instanceObserver.setStartTime(System.nanoTime()); | ||
// process the message | ||
Thread.currentThread().setContextClassLoader(functionClassLoader); | ||
result = javaInstance.handleMessage( | ||
currentRecord, | ||
currentRecord.getValue(), | ||
asyncResultConsumer, | ||
asyncErrorHandler); | ||
Thread.currentThread().setContextClassLoader(instanceClassLoader); | ||
instanceObserver.setJavaExecutionResult(result); | ||
return instanceObserver; | ||
}).whenComplete((res, ex) -> { | ||
stats.processTimeEnd(res.getStartTime()); | ||
}); | ||
|
||
JavaExecutionResult result = future.join().getJavaExecutionResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make sense and is completely unnecessary.
Instead, you should simply pass the starting time nanos as an argument that gets added to the JavaExecutionResult
instance field (add a new field startTimeNanos
to JavaExecutionResult).
The result can be recorded in the handleResult
method. The processTimeStart
method should be completely removed from the ComponentStatsManager
class and the processTimeEnd
method should accept the starting time as a parameter. The value gets taken from the startTimeNanos
field of the JavaExecutionResult
instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I have completed these tasks.
58d57af
to
3511c82
Compare
It looks like due to code formatting, many unintended changes were introduced. I added |
Do I need to create a new PR to remove the unnecessary line changes caused by formatting? @ @lhotari |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start, going in the right direction!
...instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
Outdated
Show resolved
Hide resolved
...nctions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaExecutionResult.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
Outdated
Show resolved
Hide resolved
...ctions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
Outdated
Show resolved
Hide resolved
It makes it slightly harder to read the diff when there are formatting changes. However there's the "hide whitespace" option to address that in GitHub UI. |
…onResult from AsyncFuncRequest in processAsyncResultsInInputOrder
I didn't set the correct code formatting in IDEA. Therefore, I will close this PR and open a new one to fix the issue.see #23820 |
@walkinggo There shouldn't be a need to create new PRs in this type of cases. You could have simply reformatted the code. All commits in the PR will get squashed into a single commit when the PR gets merged. |
@lhotari Alright, I've used another branch to solve the issue. I'll now overwrite the current branch with the other one and close the next PR. |
5775db1
to
0344726
Compare
...-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
Outdated
Show resolved
Hide resolved
...-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
Outdated
Show resolved
Hide resolved
long beforeTime = System.nanoTime() - 500_000_000L; | ||
assertTrue(Math.abs(beforeTime - result.getStartTime()) <= 20_000_000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the logic behind this assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is to give a task that takes 500ms, and then compare the difference between the task's completion time and the recorded time. If the difference is less than 20ms, we can consider the recorded time as accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems pointless to test that at all in this unit test class. It would essentially test that result.getStartTime()
was set before the execution started and that execution took more than 500ms. Tests should be about testing expected behavior.
In this PR, we are more interested that the processing time gets properly recorded. That would have to be tested at a different level since JavaInstanceTest is at the level of the JavaInstance class. The processing time handling is in JavaInstanceRunnable. I think that tests for this PR should reside in JavaInstanceRunnable and possibly also in one of the end-to-end tests (for example pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
).
btw. One tip about assertions. For newly added tests, I'd recommend using AssertJ fluent assertions for more complex assertions. We have AssertJ already available as a test dependency in Pulsar. It's not that I'm saying that AssertJ must be used, it's just that if there are more complex assertions, it's a better option than TestNG's assertions. AssertJ assertions are easier to read and when there are failures, the error messages are really great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should I run PulsarFunctionE2ETest.java
located in pulsar-broker/src/test/java/org/apache/pulsar/io
? I tried running it locally in IDEA, but encountered an error. Is there any related documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should I run
PulsarFunctionE2ETest.java
located inpulsar-broker/src/test/java/org/apache/pulsar/io
? I tried running it locally in IDEA, but encountered an error. Is there any related documentation?
Most likely you will first have to compile Pulsar on the command line with maven.
either
build core-modules:
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true
or build everything, skipping some checks to speed it up:
mvn -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test program only runnable on Linux or macOS? I ran it on Windows using the command you provided, but encountered an error indicating that I don't have permission to delete certain files and that the path characters are incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test program only runnable on Linux or macOS? I ran it on Windows using the command you provided, but encountered an error indicating that I don't have permission to delete certain files and that the path characters are incorrect.
@walkinggo I'd recommend using Windows Subsystem for Linux (WSL) on Windows with Ubuntu for developing Pulsar. We don't support Windows for Pulsar broker at all. It might work, but the support is only for Linux and macOS. For the Pulsar client, we also support Windows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ has also special support for WSL on Windows: https://www.jetbrains.com/help/idea/how-to-use-wsl-development-environment-in-product.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your patience. I have successfully run the program on Ubuntu. @lhotari
@@ -309,4 +317,25 @@ public void testAsyncFunctionMaxPendingVoidResult() throws Exception { | |||
log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); | |||
instance.close(); | |||
} | |||
|
|||
@Test | |||
public void testAsyncFunctionTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be testing an async function since it's a Function<String, String>
that is the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I should have used the CompletableFuture
class for testing. Is that acceptable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test can be removed completely. More context in #23811 (comment) . Tests for this PR should go in different classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I will remove this test case. Should I add additional test cases in the other test files you mentioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
Should I modify the related code logic based on the following error that appeared in my local CI check? @lhotari [ERROR] Medium: org.apache.pulsar.functions.instance.JavaInstance$AsyncFuncRequest.getResult() may expose internal representation by returning JavaInstance$AsyncFuncRequest.result [org.apache.pulsar.functions.instance.JavaInstance$AsyncFuncRequest] At JavaInstance.java:[line 50] EI_EXPOSE_REP |
Fixes #23705
Motivation
See #23705
In the realm of asynchronous processing, precise timing and performance metrics are essential for effective monitoring and optimization. Apache Pulsar Functions, as a distributed compute platform, relies heavily on asynchronous operations to process and transform data streams. However, the current implementation lacks a robust mechanism for accurately capturing and reporting the execution times of these asynchronous functions.
Modifications
This Pull Request introduces several modifications to the Apache Pulsar Functions project aimed at improving the accuracy and reliability of asynchronous function execution time statistics. The primary goal is to enhance monitoring and analysis capabilities for function performance. Here’s a breakdown of the key changes:
Removal of processTimeStart Method: The processTimeStart method in ComponentStatsManager has been removed. This method was previously used to record the start time of asynchronous function execution. The start time is now recorded in the JavaExecutionResult object, providing a more centralized and consistent approach.
Modification of JavaInstanceRunnable: The run method in JavaInstanceRunnable has been updated to reflect the removal of processTimeStart. The stats.processTimeStart() call has been deleted, and the stats.processTimeEnd() method now accepts the start time as a parameter to calculate the total execution time.
Update to FunctionStatsManager: The processTimeEnd method in FunctionStatsManager has been modified to remove the processTimeStart member variable and accept the start time as a parameter. This allows for accurate calculation of the function’s execution time.
Changes to SinkStatsManager and SourceStatsManager: The processTimeEnd methods in SinkStatsManager and SourceStatsManager have been updated to accept the start time as a parameter. However, since these classes do not record processing time, the method bodies remain empty.
Addition of startTime in JavaExecutionResult: The JavaExecutionResult class now includes a startTime member variable to store the start time of asynchronous function execution. This allows for accurate calculation of execution time within the handleResult method of JavaInstanceRunnable.
Modification of AsyncFuncRequest: The AsyncFuncRequest class in JavaInstance now includes a result member variable of type JavaExecutionResult. This change ensures that the processAsyncResultsInInputOrder method uses the existing JavaExecutionResult object instead of creating a new one, maintaining consistency and avoiding duplication.
Use of Same ExecutionResult in Non-asyncPreserveInputOrder Mode: In scenarios where asyncPreserveInputOrderForOutputMessages is disabled, the same executionResult object is now used to avoid unnecessary object creation and potential issues with result assignment.
Fix for Result and Exception Handling: Two patches address potential bugs related to result and exception handling in JavaInstance. The processAsyncResultsInInputOrder method now ensures that the result and userException fields of JavaExecutionResult are properly set, improving the reliability of the execution result.
Addition of Test Case: A new test case, testAsyncFunctionTime, has been added to verify the accuracy of asynchronous function execution time recording and calculation. This test ensures that the start time recorded in JavaExecutionResult is within an acceptable range of the actual start time.
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: