Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nfd-worker: use single http port for metrics and healthz #1929

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions cmd/nfd-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ func main() {
klog.InfoS("version not set! Set -ldflags \"-X sigs.k8s.io/node-feature-discovery/pkg/version.version=`git describe --tags --dirty --always --match 'v*'`\" during build or run.")
}

// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()

// Get new NfdWorker instance
instance, err := worker.NewNfdWorker(worker.WithArgs(args))
if err != nil {
Expand Down Expand Up @@ -111,10 +108,8 @@ func initFlags(flagset *flag.FlagSet) (*worker.Args, *worker.ConfigOverrideArgs)
"Kubeconfig to use")
flagset.BoolVar(&args.Oneshot, "oneshot", false,
"Do not publish feature labels")
flagset.IntVar(&args.MetricsPort, "metrics", 8081,
"Port on which to expose metrics.")
flagset.IntVar(&args.GrpcHealthPort, "grpc-health", 8082,
"Port on which to expose the grpc health endpoint.")
flagset.IntVar(&args.Port, "port", 8080,
"Port on which to metrics and healthz endpoints are served")
flagset.StringVar(&args.Options, "options", "",
"Specify config options from command line. Config options are specified "+
"in the same format as in the config file (i.e. json or yaml). These options")
Expand Down
14 changes: 8 additions & 6 deletions deployment/base/worker-daemonset/worker-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ spec:
image: gcr.io/k8s-staging-nfd/node-feature-discovery:master
imagePullPolicy: Always
livenessProbe:
grpc:
port: 8082
httpGet:
path: /healthz
port: http
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 8082
httpGet:
path: /healthz
port: http
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 10
Expand All @@ -40,5 +42,5 @@ spec:
cpu: 5m
memory: 64Mi
ports:
- name: metrics
containerPort: 8081
- name: http
containerPort: 8080
19 changes: 9 additions & 10 deletions deployment/helm/node-feature-discovery/templates/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
livenessProbe:
grpc:
port: {{ .Values.worker.healthPort | default "8082" }}
httpGet:
path: /healthz
port: http
{{- with .Values.worker.livenessProbe.initialDelaySeconds }}
initialDelaySeconds: {{ . }}
{{- end }}
Expand All @@ -62,8 +63,9 @@ spec:
timeoutSeconds: {{ . }}
{{- end }}
readinessProbe:
grpc:
port: {{ .Values.worker.healthPort | default "8082" }}
httpGet:
path: /healthz
port: http
{{- with .Values.worker.readinessProbe.initialDelaySeconds }}
initialDelaySeconds: {{ . }}
{{- end }}
Expand Down Expand Up @@ -104,16 +106,13 @@ spec:
{{- range $key, $value := .Values.featureGates }}
- "-feature-gates={{ $key }}={{ $value }}"
{{- end }}
- "-metrics={{ .Values.worker.metricsPort | default "8081"}}"
- "-grpc-health={{ .Values.worker.healthPort | default "8082" }}"
- "-port={{ .Values.worker.port | default "8080"}}"
{{- with .Values.gc.extraArgs }}
{{- toYaml . | nindent 8 }}
{{- end }}
ports:
- containerPort: {{ .Values.worker.metricsPort | default "8081"}}
name: metrics
- containerPort: {{ .Values.worker.healthPort | default "8082" }}
name: health
- containerPort: {{ .Values.worker.port | default "8080"}}
name: http
volumeMounts:
- name: host-boot
mountPath: "/host-boot"
Expand Down
7 changes: 1 addition & 6 deletions deployment/helm/node-feature-discovery/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ worker:
# matchName: {op: In, value: ["SWAP", "X86", "ARM"]}
### <NFD-WORKER-CONF-END-DO-NOT-REMOVE>

metricsPort: 8081
healthPort: 8082
port: 8080
daemonsetAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
Expand All @@ -431,15 +430,11 @@ worker:
# runAsUser: 1000

livenessProbe:
grpc:
port: 8082
initialDelaySeconds: 10
# failureThreshold: 3
# periodSeconds: 10
# timeoutSeconds: 1
readinessProbe:
grpc:
port: 8082
initialDelaySeconds: 5
failureThreshold: 10
# periodSeconds: 10
Expand Down
3 changes: 1 addition & 2 deletions docs/deployment/helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ API's you need to install the prometheus operator in your cluster.
| `worker.*` | dict | | NFD worker daemonset configuration |
| `worker.enable` | bool | true | Specifies whether nfd-worker should be deployed |
| `worker.hostNetwork` | bool | false | Specifies whether to enable or disable running the container in the host's network namespace |
| `worker.metricsPort` | int | 8081 | Port on which to expose metrics from components to prometheus operator. **DEPRECATED**: will be replaced by `worker.port` in NFD v0.18. |
| `worker.healthPort` | int | 8082 | Port on which to expose the grpc health endpoint, will be also used for the probes. **DEPRECATED**: will be replaced by `worker.port` in NFD v0.18. |
| `worker.port` | int | 8080 | Port on which to serve http for metrics and healthz endpoints. |
| `worker.config` | dict | | NFD worker [configuration](../reference/worker-configuration-reference) |
| `worker.podSecurityContext` | dict | {} | [PodSecurityContext](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod) holds pod-level security attributes and common container settins |
| `worker.securityContext` | dict | {} | Container [security settings](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-container) |
Expand Down
86 changes: 29 additions & 57 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ package nfdworker
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/exp/maps"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -92,14 +91,13 @@ type Labels map[string]string

// Args are the command line arguments of NfdWorker.
type Args struct {
ConfigFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
MetricsPort int
GrpcHealthPort int
NoOwnerRefs bool
ConfigFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
Port int
NoOwnerRefs bool

Overrides ConfigOverrideArgs
}
Expand All @@ -117,7 +115,6 @@ type nfdWorker struct {
configFilePath string
config *NFDConfig
kubernetesNamespace string
healthServer *grpc.Server
k8sClient k8sclient.Interface
nfdClient nfdclient.Interface
stop chan struct{} // channel for signaling stop
Expand Down Expand Up @@ -205,6 +202,10 @@ func newDefaultConfig() *NFDConfig {
}
}

func (w *nfdWorker) Healthz(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusOK)
}

func (i *infiniteTicker) Reset(d time.Duration) {
switch {
case d > 0:
Expand All @@ -216,29 +217,6 @@ func (i *infiniteTicker) Reset(d time.Duration) {
}
}

func (w *nfdWorker) startGrpcHealthServer(errChan chan<- error) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", w.args.GrpcHealthPort))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}

s := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(s, health.NewServer())
klog.InfoS("gRPC health server serving", "port", w.args.GrpcHealthPort)

go func() {
defer func() {
lis.Close()
}()
if err := s.Serve(lis); err != nil {
errChan <- fmt.Errorf("gRPC health server exited with an error: %w", err)
}
klog.InfoS("gRPC health server stopped")
}()
w.healthServer = s
return nil
}

// Run feature discovery.
func (w *nfdWorker) runFeatureDiscovery() error {
discoveryStart := time.Now()
Expand Down Expand Up @@ -320,15 +298,13 @@ func (w *nfdWorker) Run() error {
labelTrigger.Reset(w.config.Core.SleepInterval.Duration)
defer labelTrigger.Stop()

httpMux := http.NewServeMux()

// Register to metrics server
if w.args.MetricsPort > 0 {
m := utils.CreateMetricsServer(w.args.MetricsPort,
buildInfo,
featureDiscoveryDuration)
go m.Run()
registerVersion(version.Get())
defer m.Stop()
}
promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(buildInfo, featureDiscoveryDuration)
httpMux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}))
registerVersion(version.Get())

err = w.runFeatureDiscovery()
if err != nil {
Expand All @@ -340,20 +316,19 @@ func (w *nfdWorker) Run() error {
return nil
}

grpcErr := make(chan error)
// Register health endpoint (at this point we're "ready and live")
httpMux.HandleFunc("/healthz", w.Healthz)

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
if err := w.startGrpcHealthServer(grpcErr); err != nil {
return fmt.Errorf("failed to start gRPC health server: %w", err)
}
}
// Start HTTP server
httpServer := http.Server{Addr: fmt.Sprintf(":%d", w.args.Port), Handler: httpMux}
go func() {
klog.InfoS("http server starting", "port", httpServer.Addr)
klog.InfoS("http server stopped", "exitCode", httpServer.ListenAndServe())
}()
defer httpServer.Close()

for {
select {
case err := <-grpcErr:
return fmt.Errorf("error in serving gRPC: %w", err)

case <-labelTrigger.C:
err = w.runFeatureDiscovery()
if err != nil {
Expand All @@ -362,9 +337,6 @@ func (w *nfdWorker) Run() error {

case <-w.stop:
klog.InfoS("shutting down nfd-worker")
if w.healthServer != nil {
w.healthServer.GracefulStop()
}
return nil
}
}
Expand Down