From 6fc3229f38c58bd360ebf587de03dd6ff1506d40 Mon Sep 17 00:00:00 2001 From: Daneyon Hansen Date: Wed, 8 Jan 2025 23:43:48 +0000 Subject: [PATCH 1/2] Add health gRPC server and refactors main() - Introduced a health gRPC server to handle liveness and readiness probes. - Refactored main() to manage server goroutines. - Added graceful shutdown for servers and controller manager. - Improved logging consistency and ensured. - Validates CLI flags. Signed-off-by: Daneyon Hansen --- pkg/ext-proc/health.go | 59 +++++++++++ pkg/ext-proc/main.go | 193 +++++++++++++++++++++++------------- pkg/manifests/ext_proc.yaml | 18 +++- 3 files changed, 198 insertions(+), 72 deletions(-) create mode 100644 pkg/ext-proc/health.go diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go new file mode 100644 index 00000000..9278b9d7 --- /dev/null +++ b/pkg/ext-proc/health.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "fmt" + + "google.golang.org/grpc/codes" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + klog "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type healthServer struct { + client.Client +} + +func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + if err := s.checkResources(); err != nil { + klog.Infof("gRPC health check not serving: %s", in.String()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + klog.Infof("gRPC health check serving: %s", in.String()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watch is not implemented") +} + +// checkResources uses a client to list all InferenceModels in the configured namespace +// and gets the configured InferencePool by name and namespace. If any client calls fail, +// no InferenceModels exist, or the InferencePool does not exist, an error is returned. +func (s *healthServer) checkResources() error { + ctx := context.Background() + var infPool v1alpha1.InferencePool + if err := s.Client.Get( + ctx, + client.ObjectKey{Name: *poolName, Namespace: *poolNamespace}, + &infPool, + ); err != nil { + return fmt.Errorf("failed to get InferencePool %s/%s: %v", *poolNamespace, *poolName, err) + } + klog.Infof("Successfully retrieved InferencePool %s/%s", *poolNamespace, *poolName) + + var modelList v1alpha1.InferenceModelList + if err := s.Client.List(ctx, &modelList, client.InNamespace(*poolNamespace)); err != nil { + return fmt.Errorf("failed to list InferenceModels in namespace %s: %v", *poolNamespace, err) + } + + // Ensure at least 1 InferenceModel + if len(modelList.Items) == 0 { + return fmt.Errorf("no InferenceModels exist in namespace %s", *poolNamespace) + } + klog.Infof("Found %d InferenceModels in namespace %s", len(modelList.Items), *poolNamespace) + + return nil +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index e8a41667..6c3a94b5 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -1,20 +1,14 @@ package main import ( - "context" "flag" "fmt" "net" - "os" - "os/signal" - "syscall" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/grpc" - "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" @@ -29,10 +23,14 @@ import ( ) var ( - port = flag.Int( - "port", + grpcPort = flag.Int( + "grpcPort", 9002, - "gRPC port") + "The gRPC port used for communicating with Envoy proxy") + grpcHealthPort = flag.Int( + "grpcHealthPort", + 9003, + "The port used for gRPC liveness and readiness probes") targetPodHeader = flag.String( "targetPodHeader", "target-pod", @@ -65,32 +63,22 @@ var ( scheme = runtime.NewScheme() ) -type healthServer struct{} - -func (s *healthServer) Check( - ctx context.Context, - in *healthPb.HealthCheckRequest, -) (*healthPb.HealthCheckResponse, error) { - klog.Infof("Handling grpc Check request + %s", in.String()) - return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil -} - -func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { - return status.Error(codes.Unimplemented, "Watch is not implemented") -} - func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1alpha1.AddToScheme(scheme)) } func main() { - klog.InitFlags(nil) flag.Parse() ctrl.SetLogger(klog.TODO()) + // Validate flags + if err := validateFlags(); err != nil { + klog.Fatalf("flag validation failed: %v", err) + } + // Print all flag values flags := "Flags: " flag.VisitAll(func(f *flag.Flag) { @@ -98,22 +86,16 @@ func main() { }) klog.Info(flags) - klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port)) - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) + // Create a new manager to manage controllers + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme}) if err != nil { - klog.Fatalf("failed to listen: %v", err) + klog.Fatalf("failed to start manager: %v", err) } + // Create the data store used to cache watched resources datastore := backend.NewK8sDataStore() - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - }) - if err != nil { - klog.Error(err, "unable to start manager") - os.Exit(1) - } - + // Create the controllers and register them with the manager if err := (&backend.InferencePoolReconciler{ Datastore: datastore, Scheme: mgr.GetScheme(), @@ -124,7 +106,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferencePool"), }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up InferencePoolReconciler") + klog.Fatalf("Error setting up InferencePoolReconciler: %v", err) } if err := (&backend.InferenceModelReconciler{ @@ -137,7 +119,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferenceModel"), }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up InferenceModelReconciler") + klog.Fatalf("Error setting up InferenceModelReconciler: %v", err) } if err := (&backend.EndpointSliceReconciler{ @@ -148,53 +130,122 @@ func main() { ServiceName: *serviceName, Zone: *zone, }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up EndpointSliceReconciler") + klog.Fatalf("Error setting up EndpointSliceReconciler: %v", err) + } + + // Channel to handle error signals for goroutines + errChan := make(chan error, 1) + + // Start each component in its own goroutine + startControllerManager(mgr, errChan) + healthSvr := startHealthServer(mgr, errChan, *grpcHealthPort) + extProcSvr := startExternalProcessorServer( + errChan, + datastore, + *grpcPort, + *refreshPodsInterval, + *refreshMetricsInterval, + *targetPodHeader, + ) + + // Wait for first error from any goroutine + err = <-errChan + if err != nil { + klog.Errorf("goroutine failed: %v", err) + } else { + klog.Infof("Manager exited gracefully") } - errChan := make(chan error) + // Gracefully shutdown components + if healthSvr != nil { + klog.Info("Health server shutting down...") + healthSvr.GracefulStop() + } + if extProcSvr != nil { + klog.Info("Ext-proc server shutting down...") + extProcSvr.GracefulStop() + } + + klog.Info("All components stopped gracefully") +} + +// startControllerManager runs the controller manager in a goroutine. +func startControllerManager(mgr ctrl.Manager, errChan chan<- error) { go func() { + // Blocking and will return when shutdown is complete. if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.Error(err, "Error running manager") - errChan <- err + errChan <- fmt.Errorf("controller manager failed to start: %w", err) } + // Manager exited gracefully + klog.Info("Controller manager shutting down...") + errChan <- nil }() +} - s := grpc.NewServer() +// startHealthServer starts the gRPC health probe server in a goroutine. +func startHealthServer(mgr ctrl.Manager, errChan chan<- error, port int) *grpc.Server { + healthSvr := grpc.NewServer() + healthPb.RegisterHealthServer(healthSvr, &healthServer{Client: mgr.GetClient()}) - pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) - if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil { - klog.Fatalf("failed to initialize: %v", err) - } - extProcPb.RegisterExternalProcessorServer( - s, - handlers.NewServer( - pp, - scheduling.NewScheduler(pp), - *targetPodHeader, - datastore)) - healthPb.RegisterHealthServer(s, &healthServer{}) - - klog.Infof("Starting gRPC server on port :%v", *port) - - // shutdown - var gracefulStop = make(chan os.Signal, 1) - signal.Notify(gracefulStop, syscall.SIGTERM) - signal.Notify(gracefulStop, syscall.SIGINT) go func() { - select { - case sig := <-gracefulStop: - klog.Infof("caught sig: %+v", sig) - os.Exit(0) - case err := <-errChan: - klog.Infof("caught error in controller: %+v", err) - os.Exit(0) + healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + errChan <- fmt.Errorf("health server failed to listen: %w", err) } + klog.Infof("Health server listening on port: %d", port) + // Blocking and will return when shutdown is complete. + if serveErr := healthSvr.Serve(healthLis); serveErr != nil && serveErr != grpc.ErrServerStopped { + errChan <- fmt.Errorf("health server failed: %w", serveErr) + } }() + return healthSvr +} - err = s.Serve(lis) - if err != nil { - klog.Fatalf("Ext-proc failed with the err: %v", err) +// startExternalProcessorServer starts the Envoy external processor server in a goroutine. +func startExternalProcessorServer( + errChan chan<- error, + datastore *backend.K8sDatastore, + port int, + refreshPodsInterval, refreshMetricsInterval time.Duration, + targetPodHeader string, +) *grpc.Server { + extSvr := grpc.NewServer() + go func() { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + errChan <- fmt.Errorf("ext-proc server failed to listen: %w", err) + } + klog.Infof("Ext-proc server listening on port: %d", port) + + // Initialize backend provider + pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + errChan <- fmt.Errorf("failed to initialize backend provider: %w", err) + } + + // Register ext_proc handlers + extProcPb.RegisterExternalProcessorServer( + extSvr, + handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), + ) + + // Blocking and will return when shutdown is complete. + if serveErr := extSvr.Serve(lis); serveErr != nil && serveErr != grpc.ErrServerStopped { + errChan <- fmt.Errorf("ext-proc server failed: %w", serveErr) + } + }() + return extSvr +} + +func validateFlags() error { + if *poolName == "" { + return fmt.Errorf("required %q flag not set", "poolName") + } + + if *serviceName == "" { + return fmt.Errorf("required %q flag not set", "serviceName") } + return nil } diff --git a/pkg/manifests/ext_proc.yaml b/pkg/manifests/ext_proc.yaml index 7ef31825..a9141071 100644 --- a/pkg/manifests/ext_proc.yaml +++ b/pkg/manifests/ext_proc.yaml @@ -28,7 +28,6 @@ roleRef: kind: ClusterRole name: pod-read --- - apiVersion: apps/v1 kind: Deployment metadata: @@ -57,8 +56,25 @@ spec: - "3" - -serviceName - "vllm-llama2-7b-pool" + - -grpcPort + - "9002" + - -grpcHealthPort + - "9003" ports: - containerPort: 9002 + - containerPort: 9003 + livenessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 --- apiVersion: v1 kind: Service From a6e11e87febd29221a470233923505b9930442ea Mon Sep 17 00:00:00 2001 From: Daneyon Hansen Date: Fri, 10 Jan 2025 20:50:11 +0000 Subject: [PATCH 2/2] Refactors health server to use data store Signed-off-by: Daneyon Hansen --- pkg/ext-proc/backend/datastore.go | 11 +++- pkg/ext-proc/backend/datastore_test.go | 39 ++++++++++++ pkg/ext-proc/health.go | 37 +---------- pkg/ext-proc/main.go | 85 +++++++++++--------------- 4 files changed, 85 insertions(+), 87 deletions(-) diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index f1e6379d..70f000b8 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -52,8 +52,8 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) { func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) { ds.poolMu.RLock() defer ds.poolMu.RUnlock() - if ds.inferencePool == nil { - return nil, errors.New("InferencePool hasn't been initialized yet") + if !ds.HasSynced() { + return nil, errors.New("InferencePool is not initialized in data store") } return ds.inferencePool, nil } @@ -75,6 +75,13 @@ func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.I return } +// HasSynced returns true if InferencePool is set in the data store. +func (ds *K8sDatastore) HasSynced() bool { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.inferencePool != nil +} + func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { var weights int32 diff --git a/pkg/ext-proc/backend/datastore_test.go b/pkg/ext-proc/backend/datastore_test.go index 57204eb0..d4ad48e1 100644 --- a/pkg/ext-proc/backend/datastore_test.go +++ b/pkg/ext-proc/backend/datastore_test.go @@ -4,8 +4,47 @@ import ( "testing" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestHasSynced(t *testing.T) { + tests := []struct { + name string + inferencePool *v1alpha1.InferencePool + hasSynced bool + }{ + { + name: "Ready when InferencePool exists in data store", + inferencePool: &v1alpha1.InferencePool{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + }, + hasSynced: true, + }, + { + name: "Not ready when InferencePool is nil in data store", + inferencePool: nil, + hasSynced: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + datastore := NewK8sDataStore() + // Set the inference pool + if tt.inferencePool != nil { + datastore.setInferencePool(tt.inferencePool) + } + // Check if the data store has been initialized + hasSynced := datastore.HasSynced() + if hasSynced != tt.hasSynced { + t.Errorf("IsInitialized() = %v, want %v", hasSynced, tt.hasSynced) + } + }) + } +} + func TestRandomWeightedDraw(t *testing.T) { tests := []struct { name string diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go index 9278b9d7..488851eb 100644 --- a/pkg/ext-proc/health.go +++ b/pkg/ext-proc/health.go @@ -2,22 +2,20 @@ package main import ( "context" - "fmt" "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" - "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" klog "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" ) type healthServer struct { - client.Client + datastore *backend.K8sDatastore } func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { - if err := s.checkResources(); err != nil { + if !s.datastore.HasSynced() { klog.Infof("gRPC health check not serving: %s", in.String()) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil } @@ -28,32 +26,3 @@ func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckReques func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { return status.Error(codes.Unimplemented, "Watch is not implemented") } - -// checkResources uses a client to list all InferenceModels in the configured namespace -// and gets the configured InferencePool by name and namespace. If any client calls fail, -// no InferenceModels exist, or the InferencePool does not exist, an error is returned. -func (s *healthServer) checkResources() error { - ctx := context.Background() - var infPool v1alpha1.InferencePool - if err := s.Client.Get( - ctx, - client.ObjectKey{Name: *poolName, Namespace: *poolNamespace}, - &infPool, - ); err != nil { - return fmt.Errorf("failed to get InferencePool %s/%s: %v", *poolNamespace, *poolName, err) - } - klog.Infof("Successfully retrieved InferencePool %s/%s", *poolNamespace, *poolName) - - var modelList v1alpha1.InferenceModelList - if err := s.Client.List(ctx, &modelList, client.InNamespace(*poolNamespace)); err != nil { - return fmt.Errorf("failed to list InferenceModels in namespace %s: %v", *poolNamespace, err) - } - - // Ensure at least 1 InferenceModel - if len(modelList.Items) == 0 { - return fmt.Errorf("no InferenceModels exist in namespace %s", *poolNamespace) - } - klog.Infof("Found %d InferenceModels in namespace %s", len(modelList.Items), *poolNamespace) - - return nil -} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 6c3a94b5..e42d8e4f 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -76,7 +76,7 @@ func main() { // Validate flags if err := validateFlags(); err != nil { - klog.Fatalf("flag validation failed: %v", err) + klog.Fatalf("Failed to validate flags: %v", err) } // Print all flag values @@ -89,7 +89,7 @@ func main() { // Create a new manager to manage controllers mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme}) if err != nil { - klog.Fatalf("failed to start manager: %v", err) + klog.Fatalf("Failed to create controller manager: %v", err) } // Create the data store used to cache watched resources @@ -106,7 +106,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferencePool"), }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Error setting up InferencePoolReconciler: %v", err) + klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) } if err := (&backend.InferenceModelReconciler{ @@ -119,7 +119,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferenceModel"), }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Error setting up InferenceModelReconciler: %v", err) + klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) } if err := (&backend.EndpointSliceReconciler{ @@ -130,17 +130,12 @@ func main() { ServiceName: *serviceName, Zone: *zone, }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Error setting up EndpointSliceReconciler: %v", err) + klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) } - // Channel to handle error signals for goroutines - errChan := make(chan error, 1) - - // Start each component in its own goroutine - startControllerManager(mgr, errChan) - healthSvr := startHealthServer(mgr, errChan, *grpcHealthPort) + // Start health and ext-proc servers in goroutines + healthSvr := startHealthServer(datastore, *grpcHealthPort) extProcSvr := startExternalProcessorServer( - errChan, datastore, *grpcPort, *refreshPodsInterval, @@ -148,94 +143,82 @@ func main() { *targetPodHeader, ) - // Wait for first error from any goroutine - err = <-errChan - if err != nil { - klog.Errorf("goroutine failed: %v", err) - } else { - klog.Infof("Manager exited gracefully") + // Start the controller manager. Blocking and will return when shutdown is complete. + klog.Infof("Starting controller manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.Fatalf("Error starting controller manager: %v", err) } + klog.Info("Controller manager shutting down") - // Gracefully shutdown components + // Gracefully shutdown servers if healthSvr != nil { - klog.Info("Health server shutting down...") + klog.Info("Health server shutting down") healthSvr.GracefulStop() } if extProcSvr != nil { - klog.Info("Ext-proc server shutting down...") + klog.Info("Ext-proc server shutting down") extProcSvr.GracefulStop() } - klog.Info("All components stopped gracefully") -} - -// startControllerManager runs the controller manager in a goroutine. -func startControllerManager(mgr ctrl.Manager, errChan chan<- error) { - go func() { - // Blocking and will return when shutdown is complete. - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - errChan <- fmt.Errorf("controller manager failed to start: %w", err) - } - // Manager exited gracefully - klog.Info("Controller manager shutting down...") - errChan <- nil - }() + klog.Info("All components shutdown") } // startHealthServer starts the gRPC health probe server in a goroutine. -func startHealthServer(mgr ctrl.Manager, errChan chan<- error, port int) *grpc.Server { - healthSvr := grpc.NewServer() - healthPb.RegisterHealthServer(healthSvr, &healthServer{Client: mgr.GetClient()}) +func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server { + svr := grpc.NewServer() + healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds}) go func() { - healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - errChan <- fmt.Errorf("health server failed to listen: %w", err) + klog.Fatalf("Health server failed to listen: %v", err) } klog.Infof("Health server listening on port: %d", port) // Blocking and will return when shutdown is complete. - if serveErr := healthSvr.Serve(healthLis); serveErr != nil && serveErr != grpc.ErrServerStopped { - errChan <- fmt.Errorf("health server failed: %w", serveErr) + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Health server failed: %v", err) } + klog.Info("Health server shutting down") }() - return healthSvr + return svr } // startExternalProcessorServer starts the Envoy external processor server in a goroutine. func startExternalProcessorServer( - errChan chan<- error, datastore *backend.K8sDatastore, port int, refreshPodsInterval, refreshMetricsInterval time.Duration, targetPodHeader string, ) *grpc.Server { - extSvr := grpc.NewServer() + svr := grpc.NewServer() + go func() { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - errChan <- fmt.Errorf("ext-proc server failed to listen: %w", err) + klog.Fatalf("Ext-proc server failed to listen: %v", err) } klog.Infof("Ext-proc server listening on port: %d", port) // Initialize backend provider pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { - errChan <- fmt.Errorf("failed to initialize backend provider: %w", err) + klog.Fatalf("Failed to initialize backend provider: %v", err) } // Register ext_proc handlers extProcPb.RegisterExternalProcessorServer( - extSvr, + svr, handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), ) // Blocking and will return when shutdown is complete. - if serveErr := extSvr.Serve(lis); serveErr != nil && serveErr != grpc.ErrServerStopped { - errChan <- fmt.Errorf("ext-proc server failed: %w", serveErr) + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Ext-proc server failed: %v", err) } + klog.Info("Ext-proc server shutting down") }() - return extSvr + return svr } func validateFlags() error {