Skip to content

Commit

Permalink
feat: Log traffic generator progress periodically
Browse files Browse the repository at this point in the history
Bug: N/A
Change-Id: Id65bb187c3c4131764d3f3e17bdde244aff85d35
GitOrigin-RevId: f0258f1282e8090eec8ab385b52439d9d0ca5430
  • Loading branch information
Privacy Sandbox Team authored and copybara-github committed Dec 23, 2024
1 parent 63fe850 commit ff502fe
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/roma/byob/benchmark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ cc_binary(
"//src/roma/byob/config",
"//src/roma/byob/interface:roma_service",
"//src/roma/byob/sample_udf:sample_byob_sdk_cc_proto",
"//src/util:periodic_closure",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
"@com_google_absl//absl/log:flags",
"@com_google_absl//absl/log:globals",
"@com_google_absl//absl/log:initialize",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
Expand Down
33 changes: 30 additions & 3 deletions src/roma/byob/benchmark/traffic_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "absl/log/globals.h"
#include "absl/log/initialize.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/notification.h"
#include "absl/time/time.h"
Expand All @@ -33,6 +34,7 @@
#include "src/roma/byob/sample_udf/sample_udf_interface.pb.h"
#include "src/roma/config/function_binding_object_v2.h"
#include "src/util/execution_token.h"
#include "src/util/periodic_closure.h"
#include "src/util/status_macro/status_macros.h"

ABSL_FLAG(int, num_workers, 84, "Number of pre-created workers");
Expand All @@ -57,6 +59,7 @@ using Mode = ::privacy_sandbox::server_common::byob::Mode;
using ::privacy_sandbox::roma_byob::example::FUNCTION_HELLO_WORLD;
using ::privacy_sandbox::roma_byob::example::FUNCTION_PRIME_SIEVE;
using ::privacy_sandbox::roma_byob::example::SampleResponse;
using ::privacy_sandbox::server_common::PeriodicClosure;

} // namespace

Expand Down Expand Up @@ -86,20 +89,41 @@ int main(int argc, char** argv) {
::privacy_sandbox::roma_byob::example::SampleRequest request;
request.set_function(FUNCTION_HELLO_WORLD);

const auto rpc_func = [&roma_service, code_token = *code_token, &request](
const std::int64_t expected_completions = num_queries * burst_size;
std::atomic<std::int64_t> completions = 0;

std::unique_ptr<PeriodicClosure> periodic = PeriodicClosure::Create();
if (absl::Status s = periodic->StartDelayed(
absl::Seconds(1),
[&completions, expected_completions]() {
static int64_t previous = 0;
const int64_t curr_val = completions;
if (previous != expected_completions) {
LOG(INFO) << "completions: " << curr_val
<< ", increment: " << curr_val - previous;
}
previous = curr_val;
});
!s.ok()) {
LOG(FATAL) << s;
}
const auto rpc_func = [&roma_service, code_token = *code_token, &request,
&completions](
privacy_sandbox::server_common::Stopwatch stopwatch,
absl::StatusOr<absl::Duration>* duration) {
absl::StatusOr<google::scp::roma::ExecutionToken> exec_token =
roma_service->ProcessRequest<SampleResponse>(
code_token, request, google::scp::roma::DefaultMetadata(),
[stopwatch = std::move(stopwatch),
duration](absl::StatusOr<SampleResponse> response) {
[stopwatch = std::move(stopwatch), duration,
&completions](absl::StatusOr<SampleResponse> response) {
*duration = stopwatch.GetElapsedTime();
completions++;
CHECK_OK(response);
});
// CHECK_OK(exec_token) << "FAIL";
if (!exec_token.ok()) {
*duration = exec_token.status();
completions++;
}
};
using ::privacy_sandbox::server_common::byob::BurstGenerator;
Expand All @@ -109,7 +133,10 @@ int main(int argc, char** argv) {
const BurstGenerator::Stats stats = burst_gen.Run();
// RomaService must be cleaned up before stats are reported, to ensure the
// service's work is completed
LOG(INFO) << "Shutting down Roma";
privacy_sandbox::server_common::Stopwatch stopwatch;
roma_service.reset();
LOG(INFO) << "Roma shutdown duration: " << stopwatch.GetElapsedTime();
LOG(INFO) << stats.ToString() << std::endl;
return 0;
}

0 comments on commit ff502fe

Please sign in to comment.