From edffa122d7a8b0167706efef0122f3921d806a91 Mon Sep 17 00:00:00 2001 From: Sharath MK Date: Fri, 29 Sep 2023 14:46:40 +0100 Subject: [PATCH] New Plugin Node Labels Resources (#6) --- .github/workflows/integration.yaml | 1 + cmd/node-labels-resources/doc.go | 2 + cmd/node-labels-resources/main.go | 48 +++ .../cluster-role-binding.yaml | 13 + .../node-labels-resources/cluster-role.yaml | 13 + deploy/node-labels-resources/deployment.yaml | 42 +++ deploy/node-labels-resources/sa.yaml | 5 + deploy/node-labels-resources/service.yaml | 17 + go.mod | 24 +- go.sum | 18 +- pkg/node-labels-resources/doc.go | 2 + pkg/node-labels-resources/grpc_server.go | 353 ++++++++++++++++++ pkg/utils/args/resources.go | 51 +++ pkg/utils/clients/doc.go | 2 + pkg/utils/clients/kubernetes_core.go | 34 ++ 15 files changed, 617 insertions(+), 8 deletions(-) create mode 100644 cmd/node-labels-resources/doc.go create mode 100644 cmd/node-labels-resources/main.go create mode 100644 deploy/node-labels-resources/cluster-role-binding.yaml create mode 100644 deploy/node-labels-resources/cluster-role.yaml create mode 100644 deploy/node-labels-resources/deployment.yaml create mode 100644 deploy/node-labels-resources/sa.yaml create mode 100644 deploy/node-labels-resources/service.yaml create mode 100644 pkg/node-labels-resources/doc.go create mode 100644 pkg/node-labels-resources/grpc_server.go create mode 100644 pkg/utils/clients/doc.go create mode 100644 pkg/utils/clients/kubernetes_core.go diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 0793a59..56418c3 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -47,6 +47,7 @@ jobs: matrix: plugin: - fixed-resources + - node-labels-resources steps: - name: Set up QEMU diff --git a/cmd/node-labels-resources/doc.go b/cmd/node-labels-resources/doc.go new file mode 100644 index 0000000..9c5552c --- /dev/null +++ b/cmd/node-labels-resources/doc.go @@ -0,0 +1,2 @@ +// Package main contains the main function for the node-label-resource-plugin. +package main diff --git a/cmd/node-labels-resources/main.go b/cmd/node-labels-resources/main.go new file mode 100644 index 0000000..03139fc --- /dev/null +++ b/cmd/node-labels-resources/main.go @@ -0,0 +1,48 @@ +package main + +import ( + goflags "flag" + "os" + + "github.com/spf13/cobra" + "k8s.io/klog/v2" + + grpcserver "github.com/liqotech/liqo-resource-plugins/pkg/node-labels-resources" + monitorargs "github.com/liqotech/liqo-resource-plugins/pkg/utils/args" + clients "github.com/liqotech/liqo-resource-plugins/pkg/utils/clients" +) + +func main() { + fs := goflags.NewFlagSet("", goflags.PanicOnError) + klog.InitFlags(fs) + klog.SetOutput(os.Stdout) + var port int + nodeLabels := monitorargs.NodeLabelsMap{} + kubernetesClient, err := clients.CreateKubernetesCore() + if err != nil { + klog.Fatalf("error: unable to create kubernetes client: %s", err) + } + var rootCmd = &cobra.Command{ + Use: os.Args[0], + Short: "Liqo plugin which provides resources based on node selector to each remote cluster", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + return grpcserver.ListenAndServeGRPCServer(port, nodeLabels.NodeLabels, kubernetesClient) + }, + } + + rootCmd.Flags().Var(&nodeLabels, "node-label", "set a node label having format name=value. e.g.: --node-label=label1=v1 --node-label=label2=v2.") + rootCmd.PersistentFlags().IntVar(&port, "port", 6001, "set port where the server will listen on.") + rootCmd.Flags().AddGoFlagSet(fs) + + err = rootCmd.Root().MarkFlagRequired("node-label") + if err != nil { + klog.Fatalf("error: error during marking resource flag as required: %s", err) + } + err = rootCmd.Execute() + if err != nil { + klog.Flush() + os.Exit(1) + } + klog.Flush() +} diff --git a/deploy/node-labels-resources/cluster-role-binding.yaml b/deploy/node-labels-resources/cluster-role-binding.yaml new file mode 100644 index 0000000..db1bf76 --- /dev/null +++ b/deploy/node-labels-resources/cluster-role-binding.yaml @@ -0,0 +1,13 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + creationTimestamp: null + name: liqo-node-label-resource-plugin-role-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: liqo-node-label-resource-plugin-role +subjects: +- kind: ServiceAccount + name: liqo-node-labels-resource-plugin-sa + namespace: liqo \ No newline at end of file diff --git a/deploy/node-labels-resources/cluster-role.yaml b/deploy/node-labels-resources/cluster-role.yaml new file mode 100644 index 0000000..c361f4c --- /dev/null +++ b/deploy/node-labels-resources/cluster-role.yaml @@ -0,0 +1,13 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: liqo-node-label-resource-plugin-role +rules: +- apiGroups: + - "" + resources: + - nodes + - pods + verbs: + - list + - watch diff --git a/deploy/node-labels-resources/deployment.yaml b/deploy/node-labels-resources/deployment.yaml new file mode 100644 index 0000000..0dd5b26 --- /dev/null +++ b/deploy/node-labels-resources/deployment.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: liqo-node-labels-resources-plugin + labels: + app.kubernetes.io/name: liqo-node-labelsresource-plugins + app.kubernetes.io/instance: external-monitor-node-labels-resources + app.kubernetes.io/component: plugin + app.kubernetes.io/part-of: liqo + app.kubernetes.io/managed-by: manual +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: liqo-resource-plugins + app.kubernetes.io/instance: external-monitor-node-labels-resources + app.kubernetes.io/component: plugin + app.kubernetes.io/part-of: liqo + app.kubernetes.io/managed-by: manual + template: + metadata: + labels: + app.kubernetes.io/name: liqo-resource-plugins + app.kubernetes.io/instance: external-monitor-node-labels-resources + app.kubernetes.io/component: plugin + app.kubernetes.io/part-of: liqo + app.kubernetes.io/managed-by: manual + spec: + serviceAccount: liqo-node-labels-resource-plugin-sa + containers: + - name: external-monitor-container + image: + imagePullPolicy: Always + ports: + - name: grpc-api + containerPort: 6001 + args: + - --node-label=dedicated=liqo + - -v=4 + nodeSelector: {} + tolerations: [] + affinity: {} diff --git a/deploy/node-labels-resources/sa.yaml b/deploy/node-labels-resources/sa.yaml new file mode 100644 index 0000000..6494db9 --- /dev/null +++ b/deploy/node-labels-resources/sa.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + creationTimestamp: null + name: liqo-node-labels-resource-plugin-sa \ No newline at end of file diff --git a/deploy/node-labels-resources/service.yaml b/deploy/node-labels-resources/service.yaml new file mode 100644 index 0000000..38a82e9 --- /dev/null +++ b/deploy/node-labels-resources/service.yaml @@ -0,0 +1,17 @@ +apiVersion: v1 +kind: Service +metadata: + name: liqo-node-labels-resources-plugin +spec: + ports: + - name: grpc-api + port: 6001 + protocol: TCP + targetPort: grpc-api + selector: + app.kubernetes.io/name: liqo-resource-plugins + app.kubernetes.io/instance: external-monitor-node-labels-resources + app.kubernetes.io/component: plugin + app.kubernetes.io/part-of: liqo + app.kubernetes.io/managed-by: manual + type: ClusterIP \ No newline at end of file diff --git a/go.mod b/go.mod index 38fefcf..5bc0b4f 100644 --- a/go.mod +++ b/go.mod @@ -4,20 +4,30 @@ go 1.19 require ( github.com/liqotech/liqo v0.9.1 + github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.7.0 google.golang.org/grpc v1.58.0-dev + k8s.io/api v0.27.4 k8s.io/apimachinery v0.27.4 + k8s.io/client-go v0.27.4 + k8s.io/klog/v2 v2.100.1 + k8s.io/kubectl v0.27.4 + sigs.k8s.io/controller-runtime v0.15.1 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.1 // indirect github.com/go-openapi/swag v0.22.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -28,10 +38,14 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.4.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/virtual-kubelet/virtual-kubelet v1.10.0 // indirect golang.org/x/crypto v0.11.0 // indirect @@ -43,20 +57,18 @@ require ( golang.org/x/text v0.12.0 // indirect golang.org/x/time v0.3.0 // indirect golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220504211119-3d4a969bb56b // indirect + gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.27.4 // indirect - k8s.io/client-go v0.27.4 // indirect - k8s.io/klog/v2 v2.100.1 // indirect + k8s.io/apiextensions-apiserver v0.27.4 // indirect + k8s.io/component-base v0.27.4 // indirect k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect - k8s.io/kubectl v0.27.4 // indirect k8s.io/metrics v0.27.4 // indirect k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect - sigs.k8s.io/controller-runtime v0.15.1 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/go.sum b/go.sum index 573bc2b..0fce492 100644 --- a/go.sum +++ b/go.sum @@ -4,11 +4,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -27,10 +28,12 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -47,6 +50,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -95,7 +99,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -105,6 +109,7 @@ github.com/liqotech/liqo v0.9.1/go.mod h1:ZdqWl00HX8X0xYaLpxOpiOfm4CXrxhEEeK9dkA github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -121,10 +126,14 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= +github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -152,6 +161,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -202,6 +212,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -232,6 +243,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220504211119-3d4a969bb56b h1:9JncmKXcUwE918my+H6xmjBdhK2jM/UTUNXxhRG1BAk= golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220504211119-3d4a969bb56b/go.mod h1:yp4gl6zOlnDGOZeWeDfMwQcsdOIQnMdhuPx9mwwWBL4= gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= +gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= @@ -286,11 +298,13 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh k8s.io/api v0.27.4 h1:0pCo/AN9hONazBKlNUdhQymmnfLRbSZjd5H5H3f0bSs= k8s.io/api v0.27.4/go.mod h1:O3smaaX15NfxjzILfiln1D8Z3+gEYpjEpiNA/1EVK1Y= k8s.io/apiextensions-apiserver v0.27.4 h1:ie1yZG4nY/wvFMIR2hXBeSVq+HfNzib60FjnBYtPGSs= +k8s.io/apiextensions-apiserver v0.27.4/go.mod h1:KHZaDr5H9IbGEnSskEUp/DsdXe1hMQ7uzpQcYUFt2bM= k8s.io/apimachinery v0.27.4 h1:CdxflD4AF61yewuid0fLl6bM4a3q04jWel0IlP+aYjs= k8s.io/apimachinery v0.27.4/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E= k8s.io/client-go v0.27.4 h1:vj2YTtSJ6J4KxaC88P4pMPEQECWMY8gqPqsTgUKzvjk= k8s.io/client-go v0.27.4/go.mod h1:ragcly7lUlN0SRPk5/ZkGnDjPknzb37TICq07WhI6Xc= k8s.io/component-base v0.27.4 h1:Wqc0jMKEDGjKXdae8hBXeskRP//vu1m6ypC+gwErj4c= +k8s.io/component-base v0.27.4/go.mod h1:hoiEETnLc0ioLv6WPeDt8vD34DDeB35MfQnxCARq3kY= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= diff --git a/pkg/node-labels-resources/doc.go b/pkg/node-labels-resources/doc.go new file mode 100644 index 0000000..e4a9b5f --- /dev/null +++ b/pkg/node-labels-resources/doc.go @@ -0,0 +1,2 @@ +// Package nodeselectorresources contains the logic for the node-label-resource-plugin. +package nodeselectorresources diff --git a/pkg/node-labels-resources/grpc_server.go b/pkg/node-labels-resources/grpc_server.go new file mode 100644 index 0000000..19ae778 --- /dev/null +++ b/pkg/node-labels-resources/grpc_server.go @@ -0,0 +1,353 @@ +package nodeselectorresources + +import ( + "context" + "fmt" + "log" + "net" + "reflect" + "sync" + "time" + + "google.golang.org/grpc" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + resourcehelper "k8s.io/kubectl/pkg/util/resource" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/liqotech/liqo/pkg/consts" + resourcemonitors "github.com/liqotech/liqo/pkg/liqo-controller-manager/resource-request-controller/resource-monitors" + "github.com/liqotech/liqo/pkg/utils" +) + +// NodeDetails stores details of the Node. +type NodeDetails struct { + Schedulable bool + Allocatable corev1.ResourceList + Pods map[string]corev1.ResourceList +} + +type nodeLabelsMonitor struct { + Server *grpc.Server + resourcemonitors.ResourceReaderServer + subscribers sync.Map + nodeLabels map[string]string + k8sNodeClient v1.NodeInterface + k8sPodClient v1.PodInterface + allocatable corev1.ResourceList + nodeMutex sync.RWMutex + ctx context.Context + resourceLists map[string]NodeDetails +} + +// ListenAndServeGRPCServer creates the gRPC server and makes it listen on the given port. +func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset *kubernetes.Clientset) error { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + resyncPeriod := 10 * time.Hour + ctx := ctrl.SetupSignalHandler() + if err != nil { + return fmt.Errorf("failed to listen on port %d: %w", port, err) + } + + // this function is used to filter and ignore virtual nodes at informer level. + var noVirtualNodesFilter = func(options *metav1.ListOptions) { + labelSelector := labels.NewSelector() + for k, v := range nodeLabels { + req, err := labels.NewRequirement(k, selection.Equals, []string{v}) + utilruntime.Must(err) + labelSelector = labelSelector.Add(*req) + } + req, err := labels.NewRequirement(consts.TypeLabel, selection.NotEquals, []string{consts.TypeNode}) + utilruntime.Must(err) + labelSelector = labelSelector.Add(*req) + klog.V(1).Infof("Node selector resource monitor label selector: %s", labelSelector.String()) + options.LabelSelector = labelSelector.String() + } + + // this function is used to filter and ignore shadow pods at informer level. + var noShadowPodsFilter = func(options *metav1.ListOptions) { + req, err := labels.NewRequirement(consts.LocalPodLabelKey, selection.NotEquals, []string{consts.LocalPodLabelValue}) + utilruntime.Must(err) + options.LabelSelector = labels.NewSelector().Add(*req).String() + options.FieldSelector = fields.OneTermEqualSelector("status.phase", string(corev1.PodRunning)).String() + } + + nodeFactory := informers.NewSharedInformerFactoryWithOptions( + clientset, resyncPeriod, informers.WithTweakListOptions(noVirtualNodesFilter), + ) + nodeInformer := nodeFactory.Core().V1().Nodes().Informer() + podFactory := informers.NewSharedInformerFactoryWithOptions( + clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter), + ) + podInformer := podFactory.Core().V1().Pods().Informer() + s := nodeLabelsMonitor{ + Server: grpc.NewServer(), + nodeLabels: nodeLabels, + allocatable: corev1.ResourceList{}, + k8sNodeClient: clientset.CoreV1().Nodes(), + k8sPodClient: clientset.CoreV1().Pods(corev1.NamespaceAll), + ctx: ctx, + resourceLists: map[string]NodeDetails{}, + } + _, err = nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: s.onNodeAdd, + UpdateFunc: s.onNodeUpdate, + DeleteFunc: s.onNodeDelete, + }) + utilruntime.Must(err) + _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: s.onPodAdd, + // We do not care about update events, since resources are immutable. + DeleteFunc: s.onPodDelete, + }) + utilruntime.Must(err) + nodeFactory.Start(ctx.Done()) + nodeFactory.WaitForCacheSync(ctx.Done()) + podFactory.Start(ctx.Done()) + podFactory.WaitForCacheSync(ctx.Done()) + + resourcemonitors.RegisterResourceReaderServer(s.Server, &s) + if err := s.Server.Serve(lis); err != nil { + return fmt.Errorf("grpc server failed to serve: %w", err) + } + log.Printf("info: node selector resource monitor listening at %v", lis.Addr()) + return nil +} + +// react to a Node Creation/First informer run. +func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) { + node := obj.(*corev1.Node) + toAdd := &node.Status.Allocatable + klog.V(4).Infof("Adding Node %s", node.Name) + nlm.resourceLists[node.Name] = NodeDetails{ + Allocatable: *toAdd, + Pods: make(map[string]corev1.ResourceList), + Schedulable: utils.IsNodeReady(node) && !node.Spec.Unschedulable, + } + pods, err := nlm.k8sPodClient.List(nlm.ctx, metav1.ListOptions{FieldSelector: "spec.nodeName=" + node.Name}) + if err != nil { + klog.Errorf("Failed to list pods for node %s: %v", node.Name, err) + return + } + for i := range pods.Items { + nlm.onPodAdd(&pods.Items[i]) + } + nlm.writeClusterResources() +} + +// react to a Node Update. +func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + newNodeResources := newNode.Status.Allocatable + klog.V(4).Infof("Updating Node %s", oldNode.Name) + if utils.IsNodeReady(newNode) && !newNode.Spec.Unschedulable { + nodeDetail, ok := nlm.resourceLists[newNode.Name] + if !ok { + nlm.resourceLists[newNode.Name] = NodeDetails{ + Allocatable: newNodeResources, + Pods: make(map[string]corev1.ResourceList), + Schedulable: true, + } + } else { + nodeDetail.Schedulable = true + nlm.resourceLists[newNode.Name] = nodeDetail + } + } else if !utils.IsNodeReady(newNode) || newNode.Spec.Unschedulable { + nodeDetail, ok := nlm.resourceLists[newNode.Name] + if ok { + klog.V(4).Infof("Marking Node %s as Unscheduable", newNode.Name) + nodeDetail.Schedulable = false + nlm.resourceLists[newNode.Name] = nodeDetail + } + } + nlm.writeClusterResources() +} + +// react to a Node Delete. +func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) { + node := obj.(*corev1.Node) + toDelete := &node.Status.Allocatable + nodeResourceList, ok := nlm.resourceLists[node.Name] + if ok { + if !reflect.DeepEqual(nodeResourceList, *toDelete) { + klog.Warningf("Node %s resources changed while it was terminating, ignoring", node.Name) + } + klog.V(4).Infof("Deleting Node %s", node.Name) + delete(nlm.resourceLists, node.Name) + } + nlm.writeClusterResources() +} + +func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) { + // Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running. + podAdded, ok := obj.(*corev1.Pod) + if !ok { + klog.Error("OnPodAdd: Failed to cast to *corev1.Pod type") + return + } + podResources := extractPodResources(podAdded) + podNodeName := podAdded.Spec.NodeName + nodeDetail, ok := nlm.resourceLists[podNodeName] + if ok { + _, podOk := nodeDetail.Pods[podAdded.Name] + if !podOk { + nodeDetail.Pods[podAdded.Name] = podResources + nlm.resourceLists[podNodeName] = nodeDetail + } + } else { + klog.V(4).Infof("OnPodAdd: Failed to find node %s in resourceLists", podNodeName) + } + nlm.writeClusterResources() +} + +func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) { + // Thanks to the filters at the informer level, delete events are received only when + // pods previously running on a physical node are no longer running. + podDeleted, ok := obj.(*corev1.Pod) + if !ok { + klog.Errorf("OnPodDelete: Failed to cast to *corev1.Pod type") + return + } + podNodeName := podDeleted.Spec.NodeName + nodeDetail, ok := nlm.resourceLists[podNodeName] + if ok { + delete(nodeDetail.Pods, podDeleted.Name) + nlm.resourceLists[podNodeName] = nodeDetail + } else { + klog.V(4).Infof("OnPodDelete: Failed to find node %s in resourceLists", podNodeName) + } + nlm.writeClusterResources() +} + +func (nlm *nodeLabelsMonitor) writeClusterResources() { + podResourceUsage := corev1.ResourceList{} + nodeAllocatable := corev1.ResourceList{} + for _, nodeDetail := range nlm.resourceLists { + if !nodeDetail.Schedulable { + continue + } + addResources(nodeAllocatable, nodeDetail.Allocatable) + for _, podResource := range nodeDetail.Pods { + addResources(podResourceUsage, podResource) + } + } + subResources(nodeAllocatable, podResourceUsage) + nlm.nodeMutex.Lock() + nlm.allocatable = nodeAllocatable.DeepCopy() + klog.V(4).Infof("Cluster resources: %v", nlm.allocatable) + nlm.nodeMutex.Unlock() + err := nlm.NotifyChange(nlm.ctx, &resourcemonitors.ClusterIdentity{ClusterID: resourcemonitors.AllClusterIDs}) + if err != nil { + log.Printf("error: error during sending notification to liqo: %s", err) + } +} + +func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList { + resourcesToExtract, _ := resourcehelper.PodRequestsAndLimits(podToExtract) + return resourcesToExtract +} + +// ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin +// the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple +// times it has to be idempotent. +func (nlm *nodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.PoolResourceList, error) { + klog.V(4).Infof("info: reading resources for cluster %s", req.ClusterID) + nlm.nodeMutex.RLock() + defer nlm.nodeMutex.RUnlock() + allocatable := make(map[string]*resource.Quantity) + for resName, quantity := range nlm.allocatable { + val := quantity.DeepCopy() + allocatable[resName.String()] = &val + } + klog.V(4).Infof("Cluster resources: %v", allocatable) + + resourceList := []*resourcemonitors.ResourceList{{Resources: allocatable}} + return &resourcemonitors.PoolResourceList{ResourceLists: resourceList}, nil +} + +// Subscribe is quite standard in this implementation so the only thing that it does is to notify liqo immediately. +func (nlm *nodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.ResourceReader_SubscribeServer) error { + klog.V(1).Infof("info: liqo controller manager subscribed") + + // Store the stream. Using req as key since each request will have a different req object. + nlm.subscribers.Store(req, srv) + ctx := srv.Context() + + // This notification is useful since you can edit the resources declared in the deployment and apply it to the cluster when one or more + // foreign clusters are already peered so this broadcast notification will update the resources for those clusters. + err := nlm.NotifyChange(ctx, &resourcemonitors.ClusterIdentity{ClusterID: resourcemonitors.AllClusterIDs}) + if err != nil { + klog.V(1).Infof("error: error during sending notification to liqo: %s", err) + } + + <-ctx.Done() + nlm.subscribers.Delete(req) + klog.V(1).Infof("info: liqo controller manager disconnected") + return nil +} + +// NotifyChange uses the cached streams to notify liqo that some resources changed. This method receives a clusterID inside req +// which can be a real clusterID or resourcemonitors.AllClusterIDs which tells to liqo to refresh all the resources +// of all the peered clusters. +func (nlm *nodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error { + klog.V(1).Infof("info: sending notification to liqo controller manager for cluster %q", req.ClusterID) + var err error + + nlm.subscribers.Range(func(key, value interface{}) bool { + stream := value.(resourcemonitors.ResourceReader_SubscribeServer) + + err = stream.Send(req) + if err != nil { + err = fmt.Errorf("error: error during sending a notification %w", err) + } + return true + }) + if err != nil { + fmt.Printf("%s", err) + return err + } + + klog.V(1).Infof("info: notification sent to liqo controller manager for cluster %q", req.ClusterID) + return err +} + +// RemoveCluster is useful to clean cluster's information if it exists when a cluster is upeered. This method receives +// a clusterID which identifies the cluster that has been removed. We believe that this method is useful in custom +// implementation, for example where a database is involved in the implementation. +func (nlm *nodeLabelsMonitor) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) { + klog.V(1).Infof("info: removing cluster %s", req.ClusterID) + return &resourcemonitors.Empty{}, nil +} + +// addResources is a utility function to add resources. +func addResources(currentResources, toAdd corev1.ResourceList) { + for resourceName, quantity := range toAdd { + if value, exists := currentResources[resourceName]; exists { + value.Add(quantity) + currentResources[resourceName] = value + } else { + currentResources[resourceName] = quantity + } + } +} + +// subResources is an utility function to subtract resources. +func subResources(currentResources, toSub corev1.ResourceList) { + for resourceName, quantity := range toSub { + if value, exists := currentResources[resourceName]; exists { + value.Sub(quantity) + currentResources[resourceName] = value + } + } +} diff --git a/pkg/utils/args/resources.go b/pkg/utils/args/resources.go index b8e3dfc..97711f5 100644 --- a/pkg/utils/args/resources.go +++ b/pkg/utils/args/resources.go @@ -78,3 +78,54 @@ func parseQuantity(str string) (string, *resource.Quantity, error) { return res[0], &quantity, nil } + +// NodeLabelsMap contains labels. +type NodeLabelsMap struct { + StringValues liqoargs.StringList + NodeLabels map[string]string +} + +// Set function sets the label. +func (n *NodeLabelsMap) Set(str string) error { + if n.NodeLabels == nil { + n.NodeLabels = make(map[string]string) + } + + if err := n.StringValues.Set(str); err != nil { + return err + } + + for _, entry := range n.StringValues.StringList { + key, value, err := parseNodeLabel(entry) + if err != nil { + return err + } + n.NodeLabels[key] = value + } + + return nil +} + +// String returns the stringified map entries. +func (n *NodeLabelsMap) String() string { + return n.StringValues.String() +} + +// Type return the type name. +func (n *NodeLabelsMap) Type() string { + return "nodeLabelList" +} + +func parseNodeLabel(str string) (labelKey, labelValue string, err error) { + res := strings.Split(str, "=") + + if len(res) != 2 { + return "", "", fmt.Errorf("invalid node label format %s", str) + } + + if res[0] == "" || res[1] == "" { + return "", "", fmt.Errorf("invalid node label format %s", str) + } + + return res[0], res[1], nil +} diff --git a/pkg/utils/clients/doc.go b/pkg/utils/clients/doc.go new file mode 100644 index 0000000..549f1b6 --- /dev/null +++ b/pkg/utils/clients/doc.go @@ -0,0 +1,2 @@ +// Package client contains the kubernetes realted function. +package client diff --git a/pkg/utils/clients/kubernetes_core.go b/pkg/utils/clients/kubernetes_core.go new file mode 100644 index 0000000..3af8c7b --- /dev/null +++ b/pkg/utils/clients/kubernetes_core.go @@ -0,0 +1,34 @@ +package client + +import ( + "log" + "path/filepath" + + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" +) + +// CreateKubernetesCore create Kuberentes client. +func CreateKubernetesCore() (*kubernetes.Clientset, error) { + restConfig, err := rest.InClusterConfig() + if err != nil { + log.Print("k8s client failed in-cluster mode now defaulting to in-local mode") + kubeconfig := filepath.Join( + homedir.HomeDir(), ".kube", "config", + ) + restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } + if err != nil { + return nil, errors.Wrap(err, "failed to initialize the RestConfig") + } + restConfig.QPS = float32(50) + restConfig.Burst = 50 + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize kubernetes client set") + } + return clientSet, nil +}