From 7be29dd2eee77142c2d3dee2ed93cf12d5d1c138 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 24 Jan 2025 13:20:14 -0500 Subject: [PATCH] Add an experimental pipeline option for custom logging libs (#33743) * Add an experimental pipeline option for custom logging libs. * Move sleeping inside Fatalf. * Add a logging message before sleeping. --- sdks/go/container/tools/logging.go | 5 +++++ sdks/java/container/boot.go | 25 ++++++++++++++++++------- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/sdks/go/container/tools/logging.go b/sdks/go/container/tools/logging.go index ced13b744d45..7bd68299814d 100644 --- a/sdks/go/container/tools/logging.go +++ b/sdks/go/container/tools/logging.go @@ -119,5 +119,10 @@ func (l *Logger) Errorf(ctx context.Context, format string, args ...any) { // Fatalf logs the message with Critical severity, and then calls os.Exit(1). func (l *Logger) Fatalf(ctx context.Context, format string, args ...any) { l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, fmt.Sprintf(format, args...)) + // Allow additional time for other background processes (e.g., log agent) to + // complete before exiting. This ensures crucial information is captured + // before the worker process terminates. + l.Log(ctx, fnpb.LogEntry_Severity_CRITICAL, "Completing background processes before exiting...") + time.Sleep(15 * time.Second) os.Exit(1) } diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 7fd87b3524e2..ab04243fc277 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -135,14 +135,25 @@ func main() { const jarsDir = "/opt/apache/beam/jars" const javaHarnessJar = "beam-sdks-java-harness.jar" - cp := []string{ - filepath.Join(jarsDir, "slf4j-api.jar"), - filepath.Join(jarsDir, "slf4j-jdk14.jar"), - filepath.Join(jarsDir, "jcl-over-slf4j.jar"), - filepath.Join(jarsDir, "log4j-over-slf4j.jar"), - filepath.Join(jarsDir, "log4j-to-slf4j.jar"), - filepath.Join(jarsDir, javaHarnessJar), + defaultLoggingJars := []string{ + "slf4j-api.jar", + "slf4j-jdk14.jar", + "jcl-over-slf4j.jar", + "log4j-over-slf4j.jar", + "log4j-to-slf4j.jar", } + cp := []string{} + if strings.Contains(options, "use_custom_logging_libraries") { + // In this case, the logging libraries will be provided from the staged + // artifacts. + logger.Warnf(ctx, "Skipping default slf4j dependencies in classpath") + } else { + logger.Printf(ctx, "Using default slf4j dependencies in classpath") + for _, jar := range defaultLoggingJars { + cp = append(cp, filepath.Join(jarsDir, jar)) + } + } + cp = append(cp, filepath.Join(jarsDir, javaHarnessJar)) var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") for _, a := range artifacts {