From 3adcf92417bb46be53290e0255e4438fb4d88b9a Mon Sep 17 00:00:00 2001 From: Estefania Castro Vizoso Date: Sun, 15 Nov 2020 21:07:02 +0100 Subject: [PATCH] creating kafka operator, working on #32 --- k8s/go.mod | 1 + .../_test/{cluster.yml => psql-cluster.yml} | 0 k8s/k8ssetup/database.go | 20 +- k8s/k8ssetup/database_test.go | 19 +- k8s/k8ssetup/k8ssetup.go | 67 +-- k8s/k8ssetup/k8ssetup_test.go | 276 +-------- k8s/k8ssetup/kafka.go | 126 ++++ k8s/k8ssetup/kafka_test.go | 549 ++++++++++++++++++ k8s/pets-infrastructure.go | 7 +- k8s/pets-infrastructure_test.go | 38 +- 10 files changed, 754 insertions(+), 349 deletions(-) rename k8s/k8ssetup/_test/{cluster.yml => psql-cluster.yml} (100%) create mode 100644 k8s/k8ssetup/kafka.go create mode 100644 k8s/k8ssetup/kafka_test.go diff --git a/k8s/go.mod b/k8s/go.mod index c912730..6f97b51 100644 --- a/k8s/go.mod +++ b/k8s/go.mod @@ -6,5 +6,6 @@ require ( github.com/go-git/go-git/v5 v5.1.0 golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/tools v0.0.0-20200918232735-d647fc253266 // indirect + gopkg.in/yaml.v2 v2.2.4 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 ) diff --git a/k8s/k8ssetup/_test/cluster.yml b/k8s/k8ssetup/_test/psql-cluster.yml similarity index 100% rename from k8s/k8ssetup/_test/cluster.yml rename to k8s/k8ssetup/_test/psql-cluster.yml diff --git a/k8s/k8ssetup/database.go b/k8s/k8ssetup/database.go index 1d57074..666100a 100644 --- a/k8s/k8ssetup/database.go +++ b/k8s/k8ssetup/database.go @@ -6,9 +6,9 @@ import ( "io/ioutil" "log" "os" - "regexp" + "strings" - "gopkg.in/yaml.v3" + "gopkg.in/yaml.v2" ) // DatabaseYml is used to read a yml file in order to get the cluster name @@ -20,12 +20,7 @@ type DatabaseYml struct { } func (k k8sSetUpImpl) isDatabaseCreated(cluster string) (bool, error) { - log.Printf("Checking if database cluster %q is already created ...", cluster) - if _, err := k.kubectl("describe", "postgresql/"+cluster); err != nil { - return false, err - } - - return true, nil + return k.isResourceCreated("postgresql", cluster, "default") } func (k k8sSetUpImpl) createDatabase(fileName string) error { @@ -57,18 +52,11 @@ func (k k8sSetUpImpl) getClusterName(fileName string) (clusterName string, err e func (k k8sSetUpImpl) isDatabaseRunning(cluster string) (bool, error) { log.Printf("Checking if database cluster %q is already running ...", cluster) output, err := k.kubectl("get", "postgresql/"+cluster, "-o", "jsonpath={.status}") - status := "" if err != nil { return false, err } - var re = regexp.MustCompile(`(?m).*:(.*)]`) - match := re.FindStringSubmatch(output) - if len(match) > 1 { - status = match[1] - } - - return status == "Running", nil + return strings.Contains(output, "Running"), nil } func (k k8sSetUpImpl) waitDatabaseCreation(cluster string) { diff --git a/k8s/k8ssetup/database_test.go b/k8s/k8ssetup/database_test.go index 18e49fd..aa261b4 100644 --- a/k8s/k8ssetup/database_test.go +++ b/k8s/k8ssetup/database_test.go @@ -7,13 +7,14 @@ import ( "testing" ) + func Test_getClusterName(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) t.Run("must return the cluster name", func(t *testing.T) { expect := "cluster" var expectErr error = nil - got, gotErr := k8sImpl.getClusterName(getFilePath("cluster.yml")) + got, gotErr := k8sImpl.getClusterName(getFilePath("psql-cluster.yml")) if gotErr != expectErr { t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) @@ -102,7 +103,7 @@ func Test_isDatabaseCreated(t *testing.T) { func Test_createDatabase(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - t.Run("must return true if database is created", func(t *testing.T) { + t.Run("must return no error if database is created", func(t *testing.T) { k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { return "", nil } @@ -115,7 +116,7 @@ func Test_createDatabase(t *testing.T) { } }) - t.Run("must return false if database is not created", func(t *testing.T) { + t.Run("must return error if database is not created", func(t *testing.T) { var errInvalid = errors.New("invalid") k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { return "", errInvalid @@ -130,7 +131,6 @@ func Test_createDatabase(t *testing.T) { }) } -// map[PostgresClusterStatus:Running] func Test_isDatabaseRunning(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -138,7 +138,6 @@ func Test_isDatabaseRunning(t *testing.T) { k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { return "map[PostgresClusterStatus:Running]", nil } - expect := true var expectErr error = nil got, gotErr := k8sImpl.isDatabaseRunning("cluster") @@ -243,7 +242,7 @@ func Test_DatabaseCreation(t *testing.T) { } var expect error = nil - got := k8sImpl.DatabaseCreation("cluster.yml") + got := k8sImpl.DatabaseCreation("psql-cluster.yml") if got != expect { t.Fatalf("Got error %v, expect error %v", got, expect) } @@ -267,7 +266,7 @@ func Test_DatabaseCreation(t *testing.T) { } expect := "already exists" - got := k8sImpl.DatabaseCreation("cluster.yml") + got := k8sImpl.DatabaseCreation("psql-cluster.yml") if !strings.Contains(got.Error(), expect) { t.Fatalf("Got error %v, expect error %v", got, expect) } @@ -278,14 +277,14 @@ func Test_DatabaseCreation(t *testing.T) { if params[0] == "describe" && params[1] == "postgresql/cluster" { return "error", errors.New("error kubectl describe") } - if params[0] == "create" && params[1] == "-f" && params[2] == "cluster.yml" { + if params[0] == "create" && params[1] == "-f" && params[2] == "psql-cluster.yml" { return "error", errors.New("error kubectl create") } return "map[PostgresClusterStatus:Running]", nil } expect := "error creating database cluster" - got := k8sImpl.DatabaseCreation("cluster.yml") + got := k8sImpl.DatabaseCreation("psql-cluster.yml") if !strings.Contains(got.Error(), expect) { t.Fatalf("Got error %v, expect error %v", got, expect) } @@ -303,7 +302,7 @@ func Test_DatabaseCreation(t *testing.T) { } expect := "error creating job for cluster" - got := k8sImpl.DatabaseCreation("cluster.yml") + got := k8sImpl.DatabaseCreation("psql-cluster.yml") if !strings.Contains(got.Error(), expect) { t.Fatalf("Got error %v, expect error %v", got, expect) } diff --git a/k8s/k8ssetup/k8ssetup.go b/k8s/k8ssetup/k8ssetup.go index 69c50fd..3259f98 100644 --- a/k8s/k8ssetup/k8ssetup.go +++ b/k8s/k8ssetup/k8ssetup.go @@ -18,7 +18,8 @@ type K8sSetUp interface { Initialize() error InstallPostgresqlOperator() error DatabaseCreation(fileName string) error - InstallKafkaOperator() error + CheckKudoInstallation() error + KafkaClusterCreation(fileName string) error } type k8sSetUpImpl struct { @@ -51,20 +52,13 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error { return nil } -func (k k8sSetUpImpl) InstallKafkaOperator() error { - log.Println("Installing Kafka operator ...") +func (k k8sSetUpImpl) CheckKudoInstallation() error { + log.Println("Checking kudo installation ...") - if installed := k.isKafkaOperatorInstalled(); !installed { - log.Println("Kafka operator not installed ...") - if err := k.doKafkaOperatorInstallation(); err != nil { - return fmt.Errorf("error installing Kafka operator: %v", err) - } - k.waiKafkaOperatorRunning() - - } else { - log.Println("Kafka operator is installed ...") + if _, err := k.kubectl("kudo", "version"); err != nil { + return fmt.Errorf("kudo is not installed: %v", err) } - + log.Println("Kudo is installed ...") return nil } @@ -77,16 +71,7 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() { log.Print("Psql operator is running") } -func (k k8sSetUpImpl) waiKafkaOperatorRunning() { - cnt := true - for cnt { - ready, err := k.isKafkaOperatorRunning() - cnt = !(err == nil && ready) - } - log.Print("Kafka operator is running") -} - -func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, err error) { +func (k k8sSetUpImpl) isPodRunning(name, namespace string) (running bool, err error) { log.Printf("Checking if %s operator is already running ...", name) var podNames, output string @@ -105,11 +90,7 @@ func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, e } func (k k8sSetUpImpl) isPsqlOperatorRunning() (bool, error) { - return k.isOperatorRunning("postgres-operator", "default") -} - -func (k k8sSetUpImpl) isKafkaOperatorRunning() (bool, error) { - return k.isOperatorRunning("strimzi-cluster-operator", "kafka") + return k.isPodRunning("postgres-operator", "default") } func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool { @@ -121,15 +102,6 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool { return true } -func (k k8sSetUpImpl) isKafkaOperatorInstalled() bool { - log.Println("Checking if kafka operator is already installed ...") - if _, err := k.kubectl("describe", "deployment.apps/strimzi-cluster-operator", "-n", "kafka"); err != nil { - return false - } - - return true -} - func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error { log.Println("Installing postgreSQL operator ...") dir, err := ioutil.TempDir("", "pets-go-infra") @@ -164,18 +136,6 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error { return nil } -func (k *k8sSetUpImpl) doKafkaOperatorInstallation() error { - log.Println("Installing kafka operator ...") - if _, err := k.kubectl("create", "namespace", "kafka"); err != nil { - return fmt.Errorf("error in kubectl create: %v", err) - } - if _, err := k.kubectl("apply", "-f", "https://strimzi.io/install/latest?namespace=kafka", "-n", "kafka"); err != nil { - return fmt.Errorf("error in kubectl apply: %v", err) - } - - return nil -} - func (k k8sSetUpImpl) kubectl(params ...string) (output string, err error) { return k.executeCommand(k.kubectlPath, params...) } @@ -200,6 +160,15 @@ func (k k8sSetUpImpl) defaultExecuteCommand(cmdName string, params ...string) (o return } +func (k k8sSetUpImpl) isResourceCreated(rtype, name, namespace string) (bool, error) { + log.Printf("Checking if resource %q name %q is already created ...", rtype, name) + if _, err := k.kubectl("describe", rtype+"/"+name, "-n", namespace); err != nil { + return false, err + } + + return true, nil +} + // NewK8sSetUp returns a K8sSetUp interface func NewK8sSetUp() K8sSetUp { impl := &k8sSetUpImpl{ diff --git a/k8s/k8ssetup/k8ssetup_test.go b/k8s/k8ssetup/k8ssetup_test.go index ab5dd1e..bc562b1 100644 --- a/k8s/k8ssetup/k8ssetup_test.go +++ b/k8s/k8ssetup/k8ssetup_test.go @@ -132,42 +132,6 @@ func Test_doPsqlOperatorInstallation(t *testing.T) { }) } -func Test_doKafkaOperatorInstallation(t *testing.T) { - k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - - t.Run("install kafka operator runs ok", func(t *testing.T) { - k8sImpl.kubectlPath = getFilePath(okCommand) - var expect error = nil - got := k8sImpl.doKafkaOperatorInstallation() - if got != expect { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) - - t.Run("install kafka operator fails when create namespace fails", func(t *testing.T) { - k8sImpl.kubectlPath = getFilePath(koCommand) - expect := "error in kubectl create" - got := k8sImpl.doKafkaOperatorInstallation() - if !strings.Contains(got.Error(), expect) { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) - - t.Run("install kafka operator fails when create operator fails", func(t *testing.T) { - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "apply" && params[1] == "-f" { - return "error", errors.New("some error") - } - return "", nil - } - expect := "error in kubectl apply" - got := k8sImpl.doKafkaOperatorInstallation() - if !strings.Contains(got.Error(), expect) { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) -} - func Test_InstallPostgresqlOperator(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -215,53 +179,6 @@ func Test_InstallPostgresqlOperator(t *testing.T) { }) } -func Test_InstallKafkOperator(t *testing.T) { - k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - - t.Run("install kafka operator runs ok", func(t *testing.T) { - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "describe" { - return "error", errors.New("some error") - } - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - return "'true'", nil - } - } - return "", nil - } - var expect error = nil - got := k8sImpl.InstallKafkaOperator() - if got != expect { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) - - t.Run("install kafka operator ok if its installed", func(t *testing.T) { - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - return "", nil - } - var expect error = nil - got := k8sImpl.InstallKafkaOperator() - if got != expect { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) - - t.Run("install kafka operator runs ko when installation fails", func(t *testing.T) { - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - return "error", errors.New("some error") - } - expect := "error installing Kafka operator" - got := k8sImpl.InstallKafkaOperator() - if !strings.Contains(got.Error(), expect) { - t.Fatalf("Got error %v, expect error %v", got, expect) - } - }) -} - func Test_waitPsqlOperatorRunning(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -317,61 +234,6 @@ func Test_waitPsqlOperatorRunning(t *testing.T) { }) } -func Test_waitKafkaOperatorRunning(t *testing.T) { - k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - - t.Run("must run until kafka operator is ready", func(t *testing.T) { - count := 0 - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - count++ - if count == 3 { - return "'true'", nil - } - return "false", nil - } - } - return "", nil - } - - k8sImpl.waiKafkaOperatorRunning() - expect := 3 - got := count - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) - - t.Run("must run until there is no error and ready", func(t *testing.T) { - var errInvalid = errors.New("invalid") - count := 0 - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - count++ - if count == 2 { - return "'true'", nil - } - return "", errInvalid - } - } - return "", nil - } - - k8sImpl.waiKafkaOperatorRunning() - expect := 2 - got := count - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) -} - func Test_isPsqlOperatorRunning(t *testing.T) { k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) @@ -494,146 +356,38 @@ func Test_isPsqlOperatorRunning(t *testing.T) { }) } -func Test_isKafkaOperatorRunning(t *testing.T) { +func Test_CheckKudoInstallation(t *testing.T) { + var errInvalid error = errors.New("error") k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - t.Run("must return true if kafka operator is running", func(t *testing.T) { + t.Run("kudo is installed", func(t *testing.T) { k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - return "'true'", nil - } + if params[0] == "kudo" && params[1] == "version" { + return "", nil } - return "", nil + return "error", errInvalid } - expect := true var expectErr error = nil - got, gotErr := k8sImpl.isKafkaOperatorRunning() - + gotErr := k8sImpl.CheckKudoInstallation() if gotErr != expectErr { - t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) - } - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) + t.Fatalf("Got %v, expect %v", gotErr, expectErr) } }) - t.Run("must return false if kafka operator does not exist", func(t *testing.T) { + t.Run("kudo is not installed", func(t *testing.T) { k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/test-1234", nil - } + if params[0] == "kudo" && params[1] == "version" { + return "error", errInvalid } return "", nil } - expect := false - var expectErr error = nil - got, gotErr := k8sImpl.isKafkaOperatorRunning() - - if gotErr != expectErr { - t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) - } - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) - - t.Run("must return false if kafka operator is in progress", func(t *testing.T) { - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - return "false", nil - } - } - return "", nil - } - - expect := false - var expectErr error = nil - got, gotErr := k8sImpl.isKafkaOperatorRunning() - - if gotErr != expectErr { - t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) - } - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) - - t.Run("must return false if get pods fails", func(t *testing.T) { - var errInvalid = errors.New("invalid") - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "", errInvalid - } - } - return "", nil - } - - expect := false - expectErr := errInvalid - got, gotErr := k8sImpl.isKafkaOperatorRunning() - - if gotErr != expectErr { - t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) - } - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) - - t.Run("must return false if kafka operator ready check fails", func(t *testing.T) { - var errInvalid = errors.New("invalid") - k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { - if params[0] == "get" { - if params[1] == "pod" { - return "pod/strimzi-cluster-operator-59bb89464c-dx2rs", nil - } else if params[1] == "pod/strimzi-cluster-operator-59bb89464c-dx2rs" { - return "", errInvalid - } - } - return "", nil - } - - expect := false - expectErr := errInvalid - got, gotErr := k8sImpl.isKafkaOperatorRunning() - - if gotErr != expectErr { - t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) - } - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) -} - -func Test_isKafkaOperatorInstalled(t *testing.T) { - k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) - - t.Run("kafka operator is installed", func(t *testing.T) { - expect := true - k8sImpl.kubectlPath = getFilePath(okCommand) - got := k8sImpl.isKafkaOperatorInstalled() - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) + expectErr := "kudo is not installed" + gotErr := k8sImpl.CheckKudoInstallation() + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got %v, expect %v", gotErr, expectErr) } }) - t.Run("kafka operator is not installed", func(t *testing.T) { - expect := false - k8sImpl.kubectlPath = getFilePath(koCommand) - got := k8sImpl.isKafkaOperatorInstalled() - if got != expect { - t.Fatalf("Got %v, expect %v", got, expect) - } - }) } diff --git a/k8s/k8ssetup/kafka.go b/k8s/k8ssetup/kafka.go new file mode 100644 index 0000000..847b5c4 --- /dev/null +++ b/k8s/k8ssetup/kafka.go @@ -0,0 +1,126 @@ +package k8ssetup + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "strings" +) + +type KudoInstances []struct { + Metadata struct { + Name string `json:"name"` + } `json:"metadata"` + Spec struct { + OperatorVersion struct { + Name string `json:"name"` + } `json:"operatorVersion"` + } `json:"spec"` +} + +func (k k8sSetUpImpl) isKafkaClusterCreated(cluster string) (bool, error) { + var output string + var err error + if output, err = k.kubectl("kudo", "get", "instances", "-o", "json"); err != nil { + return false, fmt.Errorf("error getting kudo instances: %v", err) + } + var jsonBytes []byte = []byte(output) + data := KudoInstances{} + if err = json.Unmarshal(jsonBytes, &data); err == nil { + var kf, zf bool + for _, v := range data { + if v.Metadata.Name == "kafka-"+cluster && + strings.Contains(v.Spec.OperatorVersion.Name, "kafka") { + kf = true + } else if v.Metadata.Name == "zookeeper-"+cluster && + strings.Contains(v.Spec.OperatorVersion.Name, "zookeeper") { + zf = true + } + if kf && zf { + break + } + } + if kf && zf { + return true, nil + } + } + + if err != nil { + return false, fmt.Errorf("Error not found kafka and zookeeper, invalid json: %v", err) + } + + return false, errors.New("Error not found kafka and zookeeper") +} + +func (k k8sSetUpImpl) createZookeeperCluster(name string) error { + log.Println("Installing zookeper cluster ...") + if _, err := k.kubectl("kudo", "install", "zookeeper", "--instance", fmt.Sprintf("\"zookeeper-%s\"", name)); err != nil { + return fmt.Errorf("Error creating zookeeper cluster: %v", err) + } + k.waitZookeeperRunning(name) + return nil +} + +func (k k8sSetUpImpl) waitZookeeperRunning(name string) { + cnt := true + for cnt { + allReady := true + for i := 0; i < 3; i++ { + ready, err := k.isPodRunning(fmt.Sprintf("zookeeper-%s-%d", name, i), "default") + if err == nil { + allReady = allReady && ready + } + } + cnt = !allReady + } + log.Print("Zookeeper operator is running") +} + +func (k k8sSetUpImpl) createKafkaCluster(name string) error { + log.Println("Installing kafka cluster ...") + if _, err := k.kubectl("kudo", "install", "kafka", "--instance", fmt.Sprintf("\"kafka-%s\"", name), "-p", "ZOOKEEPER_URI=\"zookeeper-pets-zookeeper-0.zookeeper-pets-hs:2181,zookeeper-pets-zookeeper-1.zookeeper-pets-hs:2181,zookeeper-pets-zookeeper-2.zookeeper-pets-hs:2181\""); err != nil { + return fmt.Errorf("Error creating kafka cluster: %v", err) + } + + k.waitKafkaClusterCreation(name) + return nil +} + +func (k k8sSetUpImpl) waitKafkaClusterCreation(name string) { + cnt := true + for cnt { + allReady := true + for i := 0; i < 3; i++ { + ready, err := k.isPodRunning(fmt.Sprintf("kafka-%s-%d", name, i), "default") + if err == nil { + allReady = allReady && ready + } + } + cnt = !allReady + } + log.Print("Kafka operator is running") +} + +func (k *k8sSetUpImpl) KafkaClusterCreation(clusterName string) error { + log.Printf("Creating kafka with name %q ...", clusterName) + var err error + + if created, err := k.isKafkaClusterCreated(clusterName); err == nil && created { + return fmt.Errorf("kafka cluster %q already exists", clusterName) + } + + if err = k.createZookeeperCluster(clusterName); err == nil { + log.Printf("Zookeeper cluster %q created ...", clusterName) + } else { + return fmt.Errorf("error creating zookeeper cluster %q: %v", clusterName, err) + } + + if err = k.createKafkaCluster(clusterName); err == nil { + log.Printf("Kafka cluster %q created ...", clusterName) + } else { + return fmt.Errorf("error creating kafka cluster %q: %v", clusterName, err) + } + + return nil +} diff --git a/k8s/k8ssetup/kafka_test.go b/k8s/k8ssetup/kafka_test.go new file mode 100644 index 0000000..66d4b87 --- /dev/null +++ b/k8s/k8ssetup/kafka_test.go @@ -0,0 +1,549 @@ +package k8ssetup + +import ( + "errors" + "strings" + "testing" +) + +const ( + KudoInstancesFound = `[ + { + "metadata": { + "name": "zookeeper-pets" + }, + "spec": { + "operatorVersion": { + "name": "zookeeper-3.4.14-0.3.1" + } + } + }, + { + "metadata": { + "name": "kafka-pets" + }, + "spec": { + "operatorVersion": { + "name": "kafka-2.5.1-1.3.3" + } + } + } + ]` + + KudoKafkaInstanceNotFound = `[ + { + "metadata": { + "name": "zookeeper-instance" + }, + "spec": { + "operatorVersion": { + "name": "zookeeper-3.4.14-0.3.1" + } + } + } + ]` + + KudoZookeeperInstanceNotFound = `[ + { + "metadata": { + "name": "kafka-instance" + }, + "spec": { + "operatorVersion": { + "name": "kafka-2.5.1-1.3.3" + } + } + } + ]` +) + +func Test_isKafkaClusterCreated(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + t.Run("must return true if kafka and zookeper clusters are created", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return KudoInstancesFound, nil + } + + expect := true + var expectErr error = nil + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if kafka cluster is not created", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return KudoKafkaInstanceNotFound, nil + } + + expect := false + expectErr := "Error not found kafka and zookeeper" + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if zookeeper cluster is not created", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return KudoZookeeperInstanceNotFound, nil + } + + expect := false + expectErr := "Error not found kafka and zookeeper" + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if zookeeper nor kafka clusters are created", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "[]", nil + } + + expect := false + expectErr := "Error not found kafka and zookeeper" + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if returned json is invalid", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "error", nil + } + + expect := false + expectErr := "Error not found kafka and zookeeper, invalid json" + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) + + t.Run("must return false if getting instances fails", func(t *testing.T) { + invalidErr := errors.New("invalid") + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" && params[1] == "get" && params[2] == "instances" { + return "", invalidErr + } + return "", nil + } + + expect := false + expectErr := "error getting kudo instances" + got, gotErr := k8sImpl.isKafkaClusterCreated("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + if got != expect { + t.Fatalf("Got %v, expect %v", got, expect) + } + }) +} + +func Test_createZookeeperCluster(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + var invalidErr = errors.New("invalid") + + type podStatus struct { + name string + ready bool + } + var pods = []podStatus{ + { + name: "pod/zookeeper-pets-0", + ready: true, + }, + { + name: "pod/zookeeper-pets-1", + ready: true, + }, + { + name: "pod/zookeeper-pets-2", + ready: true, + }, + } + + t.Run("must return no error if zookeeper cluster is created", func(t *testing.T) { + + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" { + return "", nil + } + + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + return "'true'", nil + } + return "", nil + } + + var expectErr error = nil + gotErr := k8sImpl.createZookeeperCluster("pets") + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + }) + + t.Run("must return error if zookeeper installation fails", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "error", invalidErr + } + + expectErr := "Error creating zookeeper cluster" + gotErr := k8sImpl.createZookeeperCluster("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + }) +} + +func Test_waitZookeeperRunning(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + type podStatus struct { + name string + ready bool + } + + var pods = []podStatus{ + { + name: "pod/zookeeper-pets-0", + ready: false, + }, + { + name: "pod/zookeeper-pets-1", + ready: false, + }, + { + name: "pod/zookeeper-pets-2", + ready: false, + }, + } + + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + for _, v := range pods { + if v.name == params[1] { + if v.ready { + return "'true'", nil + } + } + } + if strings.Contains(params[1], pods[2].name) { + for k := range pods { + pods[k].ready = true + } + } + return "'false'", nil + } + return "", nil + } + + k8sImpl.waitZookeeperRunning("pets") + for _, v := range pods { + if !v.ready { + t.Fatalf("Got %v, expect %v", false, true) + } + } +} + +func Test_createKafkaCluster(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + var invalidErr = errors.New("invalid") + + type podStatus struct { + name string + ready bool + } + var pods = []podStatus{ + { + name: "pod/kafka-pets-0", + ready: true, + }, + { + name: "pod/kafka-pets-1", + ready: true, + }, + { + name: "pod/kafka-pets-2", + ready: true, + }, + } + + t.Run("must return no error if kafka cluster is created", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" { + return "", nil + } + + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + return "'true'", nil + } + return "", nil + } + + var expectErr error = nil + gotErr := k8sImpl.createKafkaCluster("pets") + if gotErr != expectErr { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + }) + + t.Run("must return error if kafka installation fails", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + return "error", invalidErr + } + + expectErr := "Error creating kafka cluster" + gotErr := k8sImpl.createKafkaCluster("pets") + if !strings.Contains(gotErr.Error(), expectErr) { + t.Fatalf("Got error %v, expect error %v", gotErr, expectErr) + } + }) +} + +func Test_waitKafkaClusterCreation(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + + type podStatus struct { + name string + ready bool + } + + var pods = []podStatus{ + { + name: "pod/kafka-pets-0", + ready: false, + }, + { + name: "pod/kafka-pets-1", + ready: false, + }, + { + name: "pod/kafka-pets-2", + ready: false, + }, + } + + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + for _, v := range pods { + if v.name == params[1] { + if v.ready { + return "'true'", nil + } + } + } + if strings.Contains(params[1], pods[2].name) { + for k := range pods { + pods[k].ready = true + } + } + return "'false'", nil + } + return "", nil + } + + k8sImpl.waitKafkaClusterCreation("pets") + for _, v := range pods { + if !v.ready { + t.Fatalf("Got %v, expect %v", false, true) + } + } +} + +func Test_KafkaClusterCreation(t *testing.T) { + k8sImpl := NewK8sSetUp().(*k8sSetUpImpl) + invalidErr := errors.New("invalid") + type podStatus struct { + name string + ready bool + } + + var pods = []podStatus{ + { + name: "pod/zookeeper-pets-0", + ready: false, + }, + { + name: "pod/zookeeper-pets-1", + ready: false, + }, + { + name: "pod/zookeeper-pets-2", + ready: false, + }, + { + name: "pod/kafka-pets-0", + ready: false, + }, + { + name: "pod/kafka-pets-1", + ready: false, + }, + { + name: "pod/kafka-pets-2", + ready: false, + }, + } + + t.Run("we should create the kafka cluster", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" && params[2] == "instances" && params[3] == "-o" && params[4] == "json" { + return KudoKafkaInstanceNotFound, nil + } + if params[0] == "kudo" && params[1] == "install" { + return "", nil + } + + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + return "'true'", nil + } + return "", nil + } + + var expect error = nil + got := k8sImpl.KafkaClusterCreation("pets") + if got != expect { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("we should return an error when kafka and zookeeper already exist", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" && params[2] == "instances" && params[3] == "-o" && params[4] == "json" { + return KudoInstancesFound, nil + } + if params[0] == "kudo" && params[1] == "install" { + return "", nil + } + + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + return "'true'", nil + } + return "", nil + } + + expect := "already exists" + got := k8sImpl.KafkaClusterCreation("pets") + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("we should return an error when creating zookeeper cluster", func(t *testing.T) { + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" && params[2] == "instances" && params[3] == "-o" && params[4] == "json" { + return KudoKafkaInstanceNotFound, nil + } + if params[0] == "kudo" && params[1] == "install" { + return "error", invalidErr + } + + return "", nil + } + + expect := "error creating zookeeper cluster" + got := k8sImpl.KafkaClusterCreation("pets") + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) + + t.Run("we should return an error when creating kafka cluster", func(t *testing.T) { + // TODO - pending + k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) { + if params[0] == "kudo" && params[2] == "instances" && params[3] == "-o" && params[4] == "json" { + return KudoKafkaInstanceNotFound, nil + } + if params[0] == "kudo" && params[1] == "install" && params[2] == "zookeeper" { + return "", nil + } + + if params[0] == "kudo" && params[1] == "install" && params[2] == "kafka" { + return "error", invalidErr + } + + if params[0] == "get" { + if params[1] == "pod" { + var result = "" + for _, v := range pods { + result = result + v.name + "\n" + } + return result, nil + } + return "'true'", nil + } + return "", nil + } + + expect := "error creating kafka cluster" + got := k8sImpl.KafkaClusterCreation("pets") + if !strings.Contains(got.Error(), expect) { + t.Fatalf("Got error %v, expect error %v", got, expect) + } + }) +} diff --git a/k8s/pets-infrastructure.go b/k8s/pets-infrastructure.go index c06874a..e70c6d9 100644 --- a/k8s/pets-infrastructure.go +++ b/k8s/pets-infrastructure.go @@ -16,8 +16,11 @@ func run(stp k8ssetup.K8sSetUp) error { if err := stp.DatabaseCreation("pets-db.yml"); err != nil { return fmt.Errorf("error installing database, %v", err) } - if err := stp.InstallKafkaOperator(); err != nil { - return fmt.Errorf("error installing Kafka operator, %v", err) + if err := stp.CheckKudoInstallation(); err != nil { + return fmt.Errorf("error checking kudo installation, %v", err) + } + if err := stp.KafkaClusterCreation("pets"); err != nil { + return fmt.Errorf("error installing Kafka cluster, %v", err) } return nil } diff --git a/k8s/pets-infrastructure_test.go b/k8s/pets-infrastructure_test.go index 8204cea..21fa878 100644 --- a/k8s/pets-infrastructure_test.go +++ b/k8s/pets-infrastructure_test.go @@ -10,14 +10,16 @@ type k8sSetUpFake struct { failOnInitialize bool failOnInstallPostgresqlOperator bool failOnDatabaseCreation bool - failOnInstallKafkaOperator bool + failOnKafkaClusterCreation bool + failOnCheckKudoInstallation bool } var ( - errorInit = errors.New("error on initialize") - errorInstallPsqlOperator = errors.New("error on installing postgresql operator") - errorDBCreation = errors.New("error on database creation") - errorInstallKafkaOperator = errors.New("error on installing kafka operator") + errorInit = errors.New("error on initialize") + errorInstallPsqlOperator = errors.New("error on installing postgresql operator") + errorDBCreation = errors.New("error on database creation") + errorKafkaClusterCreation = errors.New("error on kafka cluster creation") + errorCheckKudoInstallation = errors.New("error on kudo checking kudo installation") ) func (k k8sSetUpFake) Initialize() error { @@ -41,9 +43,16 @@ func (k k8sSetUpFake) DatabaseCreation(fileName string) error { return nil } -func (k k8sSetUpFake) InstallKafkaOperator() error { - if k.failOnInstallKafkaOperator { - return errorInstallKafkaOperator +func (k k8sSetUpFake) KafkaClusterCreation(fileName string) error { + if k.failOnKafkaClusterCreation { + return errorKafkaClusterCreation + } + return nil +} + +func (k k8sSetUpFake) CheckKudoInstallation() error { + if k.failOnCheckKudoInstallation { + return errorCheckKudoInstallation } return nil } @@ -83,11 +92,18 @@ func Test_run(t *testing.T) { expect: fmt.Errorf("error installing database, %v", errorDBCreation), }, { - name: "should run error when install kafka operator fails", + name: "should run error when checking kudo installation fails", + stp: k8sSetUpFake{ + failOnCheckKudoInstallation: true, + }, + expect: fmt.Errorf("error checking kudo installation, %v", errorCheckKudoInstallation), + }, + { + name: "should run error when creation kafka cluster fails", stp: k8sSetUpFake{ - failOnInstallKafkaOperator: true, + failOnKafkaClusterCreation: true, }, - expect: fmt.Errorf("error installing Kafka operator, %v", errorInstallKafkaOperator), + expect: fmt.Errorf("error installing Kafka cluster, %v", errorKafkaClusterCreation), }, }