From 442d67a3c44cf44cdd6cf127307c6fee6089d5e6 Mon Sep 17 00:00:00 2001 From: Estefania Castro Vizoso Date: Sun, 1 Nov 2020 12:18:51 +0100 Subject: [PATCH] kafka operator installation, working on #32 --- k8s/k8ssetup/k8ssetup.go | 70 +++++++- k8s/k8ssetup/k8ssetup_test.go | 292 +++++++++++++++++++++++++++++++- k8s/pets-infrastructure.go | 3 + k8s/pets-infrastructure_test.go | 22 ++- 4 files changed, 372 insertions(+), 15 deletions(-) diff --git a/k8s/k8ssetup/k8ssetup.go b/k8s/k8ssetup/k8ssetup.go index 8ae4924..69c50fd 100644 --- a/k8s/k8ssetup/k8ssetup.go +++ b/k8s/k8ssetup/k8ssetup.go @@ -18,6 +18,7 @@ type K8sSetUp interface { Initialize() error InstallPostgresqlOperator() error DatabaseCreation(fileName string) error + InstallKafkaOperator() error } type k8sSetUpImpl struct { @@ -50,6 +51,23 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error { return nil } +func (k k8sSetUpImpl) InstallKafkaOperator() error { + log.Println("Installing Kafka operator ...") + + if installed := k.isKafkaOperatorInstalled(); !installed { + log.Println("Kafka operator not installed ...") + if err := k.doKafkaOperatorInstallation(); err != nil { + return fmt.Errorf("error installing Kafka operator: %v", err) + } + k.waiKafkaOperatorRunning() + + } else { + log.Println("Kafka operator is installed ...") + } + + return nil +} + func (k k8sSetUpImpl) waitPsqlOperatorRunning() { cnt := true for cnt { @@ -59,17 +77,26 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() { log.Print("Psql operator is running") } -func (k k8sSetUpImpl) isPsqlOperatorRunning() (running bool, err error) { - log.Printf("Checking if postgres operator is already running ...") +func (k k8sSetUpImpl) waiKafkaOperatorRunning() { + cnt := true + for cnt { + ready, err := k.isKafkaOperatorRunning() + cnt = !(err == nil && ready) + } + log.Print("Kafka operator is running") +} + +func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, err error) { + log.Printf("Checking if %s operator is already running ...", name) var podNames, output string - if podNames, err = k.kubectl("get", "pod", "-o", "name"); err == nil { - for _, name := range strings.Split(podNames, "\n") { - if strings.Contains(name, "postgres-operator") { - if output, err = k.kubectl("get", name, "-o", "jsonpath='{.status.phase}'"); err != nil { + if podNames, err = k.kubectl("get", "pod", "-o", "name", "-n", namespace); err == nil { + for _, podName := range strings.Split(podNames, "\n") { + if strings.Contains(podName, name) { + if output, err = k.kubectl("get", podName, "-o", "jsonpath='{.status.containerStatuses[0].ready}'", "-n", namespace); err != nil { return false, err } - running = output == "'Running'" + running = output == "'true'" break } } @@ -77,6 +104,14 @@ func (k k8sSetUpImpl) isPsqlOperatorRunning() (running bool, err error) { return } +func (k k8sSetUpImpl) isPsqlOperatorRunning() (bool, error) { + return k.isOperatorRunning("postgres-operator", "default") +} + +func (k k8sSetUpImpl) isKafkaOperatorRunning() (bool, error) { + return k.isOperatorRunning("strimzi-cluster-operator", "kafka") +} + func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool { log.Println("Checking if postgresql operator is already installed ...") if _, err := k.kubectl("describe", "service/postgres-operator"); err != nil { @@ -86,6 +121,15 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool { return true } +func (k k8sSetUpImpl) isKafkaOperatorInstalled() bool { + log.Println("Checking if kafka operator is already installed ...") + if _, err := k.kubectl("describe", "deployment.apps/strimzi-cluster-operator", "-n", "kafka"); err != nil { + return false + } + + return true +} + func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error { log.Println("Installing postgreSQL operator ...") dir, err := ioutil.TempDir("", "pets-go-infra") @@ -120,6 +164,18 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error { return nil } +func (k *k8sSetUpImpl) doKafkaOperatorInstallation() error { + log.Println("Installing kafka operator ...") + if _, err := k.kubectl("create", "namespace", "kafka"); err != nil { + return fmt.Errorf("error in kubectl create: %v", err) + } + if _, err := k.kubectl("apply", "-f", "https://strimzi.io/install/latest?namespace=kafka", "-n", "kafka"); err != nil { + return fmt.Errorf("error in kubectl apply: %v", err) + } + + return nil +} + func (k k8sSetUpImpl) kubectl(params ...string) (output string, err error) { return k.executeCommand(k.kubectlPath, params...) } diff --git a/k8s/k8ssetup/k8ssetup_test.go b/k8s/k8ssetup/k8ssetup_test.go index d75d4c9..ab5dd1e 100644 --- a/k8s/k8ssetup/k8ssetup_test.go +++ b/k8s/k8ssetup/k8ssetup_test.go @@ -132,6 +132,42 @@ func Test_doPsqlOperatorInstallation(t *testing.T) { }) } +func Test_doKafkaOperatorInstallation(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("install kafka operator runs ok", func(t *testing.T) { + k8sImpl.kubectlPath = getFilePath(okCommand) + var expect error = nil + got := k8sImpl.doKafkaOperatorInstallation() + if got != expect { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("install kafka operator fails when create namespace fails", func(t *testing.T) { + k8sImpl.kubectlPath = getFilePath(koCommand) + expect := "error in kubectl create" + got := k8sImpl.doKafkaOperatorInstallation() + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("install kafka operator fails when create operator fails", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "apply" && params[1] == "-f" { + return "error", errors.New("some error") + } + return "", nil + } + expect := "error in kubectl apply" + got := k8sImpl.doKafkaOperatorInstallation() + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) +} + func Test_InstallPostgresqlOperator(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -144,7 +180,7 @@ func Test_InstallPostgresqlOperator(t *testing.T) { if params[1] == "pod" { return "pod/postgres-operator-59bb89464c-dx2rs", nil } else if params[1] == "pod/postgres-operator-59bb89464c-dx2rs" { - return "'Running'", nil + return "'true'", nil } } return "", nil @@ -179,6 +215,53 @@ func Test_InstallPostgresqlOperator(t *testing.T) { }) } +func Test_InstallKafkOperator(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("install kafka operator runs ok", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "describe" { + return "error", errors.New("some error") + } + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + return "'true'", nil + } + } + return "", nil + } + var expect error = nil + got := k8sImpl.InstallKafkaOperator() + if got != expect { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("install kafka operator ok if its installed", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "", nil + } + var expect error = nil + got := k8sImpl.InstallKafkaOperator() + if got != expect { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("install kafka operator runs ko when installation fails", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "error", errors.New("some error") + } + expect := "error installing Kafka operator" + got := k8sImpl.InstallKafkaOperator() + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) +} + func Test_waitPsqlOperatorRunning(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -191,9 +274,9 @@ func Test_waitPsqlOperatorRunning(t *testing.T) { } else if params[1] == "pod/postgres-operator-59bb89464c-dx2rs" { count++ if count == 3 { - return "'Running'", nil + return "'true'", nil } - return "'Waiting'", nil + return "'false'", nil } } return "", nil @@ -217,7 +300,7 @@ func Test_waitPsqlOperatorRunning(t *testing.T) { } else if params[1] == "pod/postgres-operator-59bb89464c-dx2rs" { count++ if count == 2 { - return "'Running'", nil + return "'true'", nil } return "", errInvalid } @@ -234,6 +317,61 @@ func Test_waitPsqlOperatorRunning(t *testing.T) { }) } +func Test_waitKafkaOperatorRunning(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("must run until kafka operator is ready", func(t *testing.T) { + count := 0 + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + count++ + if count == 3 { + return "'true'", nil + } + return "false", nil + } + } + return "", nil + } + + k8sImpl.waiKafkaOperatorRunning() + expect := 3 + got := count + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must run until there is no error and ready", func(t *testing.T) { + var errInvalid = errors.New("invalid") + count := 0 + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + count++ + if count == 2 { + return "'true'", nil + } + return "", errInvalid + } + } + return "", nil + } + + k8sImpl.waiKafkaOperatorRunning() + expect := 2 + got := count + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) +} + func Test_isPsqlOperatorRunning(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -243,7 +381,7 @@ func Test_isPsqlOperatorRunning(t *testing.T) { if params[1] == "pod" { return "pod/postgres-operator-59bb89464c-dx2rs", nil } else if params[1] == "pod/postgres-operator-59bb89464c-dx2rs" { - return "'Running'", nil + return "'true'", nil } } return "", nil @@ -355,3 +493,147 @@ func Test_isPsqlOperatorRunning(t *testing.T) { } }) } + +func Test_isKafkaOperatorRunning(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("must return true if kafka operator is running", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + return "'true'", nil + } + } + return "", nil + } + + expect := true + var expectErr error = nil + got, gotErr := k8sImpl.isKafkaOperatorRunning() + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if kafka operator does not exist", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/test-1234", nil + } + } + return "", nil + } + + expect := false + var expectErr error = nil + got, gotErr := k8sImpl.isKafkaOperatorRunning() + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if kafka operator is in progress", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + return "false", nil + } + } + return "", nil + } + + expect := false + var expectErr error = nil + got, gotErr := k8sImpl.isKafkaOperatorRunning() + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if get pods fails", func(t *testing.T) { + var errInvalid = errors.New("invalid") + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "", errInvalid + } + } + return "", nil + } + + expect := false + expectErr := errInvalid + got, gotErr := k8sImpl.isKafkaOperatorRunning() + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if kafka operator ready check fails", func(t *testing.T) { + var errInvalid = errors.New("invalid") + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil + } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { + return "", errInvalid + } + } + return "", nil + } + + expect := false + expectErr := errInvalid + got, gotErr := k8sImpl.isKafkaOperatorRunning() + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) +} + +func Test_isKafkaOperatorInstalled(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("kafka operator is installed", func(t *testing.T) { + expect := true + k8sImpl.kubectlPath = getFilePath(okCommand) + got := k8sImpl.isKafkaOperatorInstalled() + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("kafka operator is not installed", func(t *testing.T) { + expect := false + k8sImpl.kubectlPath = getFilePath(koCommand) + got := k8sImpl.isKafkaOperatorInstalled() + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) +} diff --git a/k8s/pets-infrastructure.go b/k8s/pets-infrastructure.go index cc33b34..c06874a 100644 --- a/k8s/pets-infrastructure.go +++ b/k8s/pets-infrastructure.go @@ -16,6 +16,9 @@ func run(stp k8ssetup.K8sSetUp) error { if err := stp.DatabaseCreation("pets-db.yml"); err != nil { return fmt.Errorf("error installing database, %v", err) } + if err := stp.InstallKafkaOperator(); err != nil { + return fmt.Errorf("error installing Kafka operator, %v", err) + } return nil } diff --git a/k8s/pets-infrastructure_test.go b/k8s/pets-infrastructure_test.go index 002bc57..8204cea 100644 --- a/k8s/pets-infrastructure_test.go +++ b/k8s/pets-infrastructure_test.go @@ -10,12 +10,14 @@ type k8sSetUpFake struct { failOnInitialize bool failOnInstallPostgresqlOperator bool failOnDatabaseCreation bool + failOnInstallKafkaOperator bool } var ( errorInit = errors.New("error on initialize") - errorInstallOperator = errors.New("error on installing postgresql operator") + errorInstallPsqlOperator = errors.New("error on installing postgresql operator") errorDBCreation = errors.New("error on database creation") + errorInstallKafkaOperator = errors.New("error on installing kafka operator") ) func (k k8sSetUpFake) Initialize() error { @@ -27,7 +29,7 @@ func (k k8sSetUpFake) Initialize() error { func (k k8sSetUpFake) InstallPostgresqlOperator() error { if k.failOnInstallPostgresqlOperator { - return errorInstallOperator + return errorInstallPsqlOperator } return nil } @@ -39,6 +41,13 @@ func (k k8sSetUpFake) DatabaseCreation(fileName string) error { return nil } +func (k k8sSetUpFake) InstallKafkaOperator() error { + if k.failOnInstallKafkaOperator { + return errorInstallKafkaOperator + } + return nil +} + func Test_run(t *testing.T) { type TestCase struct { name string @@ -64,7 +73,7 @@ func Test_run(t *testing.T) { stp: k8sSetUpFake{ failOnInstallPostgresqlOperator: true, }, - expect: fmt.Errorf("error installing PostgreSQL operator, %v", errorInstallOperator), + expect: fmt.Errorf("error installing PostgreSQL operator, %v", errorInstallPsqlOperator), }, { name: "should run error when creation database fails", @@ -73,6 +82,13 @@ func Test_run(t *testing.T) { }, expect: fmt.Errorf("error installing database, %v", errorDBCreation), }, + { + name: "should run error when install kafka operator fails", + stp: k8sSetUpFake{ + failOnInstallKafkaOperator: true, + }, + expect: fmt.Errorf("error installing Kafka operator, %v", errorInstallKafkaOperator), + }, } for _, tt := range cases {