Skip to content

Commit

Permalink
creating kafka operator, working on #32
Browse files Browse the repository at this point in the history
  • Loading branch information
FanyCastro committed Nov 15, 2020
1 parent 442d67a commit 3adcf92
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 349 deletions.
1 change: 1 addition & 0 deletions k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ require (
github.com/go-git/go-git/v5 v5.1.0
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/tools v0.0.0-20200918232735-d647fc253266 // indirect
gopkg.in/yaml.v2 v2.2.4
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
)
File renamed without changes.
20 changes: 4 additions & 16 deletions k8s/k8ssetup/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"io/ioutil"
"log"
"os"
"regexp"
"strings"

"gopkg.in/yaml.v3"
"gopkg.in/yaml.v2"
)

// DatabaseYml is used to read a yml file in order to get the cluster name
Expand All @@ -20,12 +20,7 @@ type DatabaseYml struct {
}

func (k k8sSetUpImpl) isDatabaseCreated(cluster string) (bool, error) {
log.Printf("Checking if database cluster %q is already created ...", cluster)
if _, err := k.kubectl("describe", "postgresql/"+cluster); err != nil {
return false, err
}

return true, nil
return k.isResourceCreated("postgresql", cluster, "default")
}

func (k k8sSetUpImpl) createDatabase(fileName string) error {
Expand Down Expand Up @@ -57,18 +52,11 @@ func (k k8sSetUpImpl) getClusterName(fileName string) (clusterName string, err e
func (k k8sSetUpImpl) isDatabaseRunning(cluster string) (bool, error) {
log.Printf("Checking if database cluster %q is already running ...", cluster)
output, err := k.kubectl("get", "postgresql/"+cluster, "-o", "jsonpath={.status}")
status := ""
if err != nil {
return false, err
}

var re = regexp.MustCompile(`(?m).*:(.*)]`)
match := re.FindStringSubmatch(output)
if len(match) > 1 {
status = match[1]
}

return status == "Running", nil
return strings.Contains(output, "Running"), nil
}

func (k k8sSetUpImpl) waitDatabaseCreation(cluster string) {
Expand Down
19 changes: 9 additions & 10 deletions k8s/k8ssetup/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"testing"
)


func Test_getClusterName(t *testing.T) {
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)

t.Run("must return the cluster name", func(t *testing.T) {
expect := "cluster"
var expectErr error = nil
got, gotErr := k8sImpl.getClusterName(getFilePath("cluster.yml"))
got, gotErr := k8sImpl.getClusterName(getFilePath("psql-cluster.yml"))

if gotErr != expectErr {
t.Fatalf("Got error %v, expect error %v", gotErr, expectErr)
Expand Down Expand Up @@ -102,7 +103,7 @@ func Test_isDatabaseCreated(t *testing.T) {
func Test_createDatabase(t *testing.T) {
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)

t.Run("must return true if database is created", func(t *testing.T) {
t.Run("must return no error if database is created", func(t *testing.T) {
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
return "", nil
}
Expand All @@ -115,7 +116,7 @@ func Test_createDatabase(t *testing.T) {
}
})

t.Run("must return false if database is not created", func(t *testing.T) {
t.Run("must return error if database is not created", func(t *testing.T) {
var errInvalid = errors.New("invalid")
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
return "", errInvalid
Expand All @@ -130,15 +131,13 @@ func Test_createDatabase(t *testing.T) {
})
}

// map[PostgresClusterStatus:Running]
func Test_isDatabaseRunning(t *testing.T) {
k8sImpl := NewK8sSetUp().(*k8sSetUpImpl)

t.Run("must return true if database is running", func(t *testing.T) {
k8sImpl.executeCommand = func(cmdName string, params ...string) (string, error) {
return "map[PostgresClusterStatus:Running]", nil
}

expect := true
var expectErr error = nil
got, gotErr := k8sImpl.isDatabaseRunning("cluster")
Expand Down Expand Up @@ -243,7 +242,7 @@ func Test_DatabaseCreation(t *testing.T) {
}

var expect error = nil
got := k8sImpl.DatabaseCreation("cluster.yml")
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
if got != expect {
t.Fatalf("Got error %v, expect error %v", got, expect)
}
Expand All @@ -267,7 +266,7 @@ func Test_DatabaseCreation(t *testing.T) {
}

expect := "already exists"
got := k8sImpl.DatabaseCreation("cluster.yml")
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
if !strings.Contains(got.Error(), expect) {
t.Fatalf("Got error %v, expect error %v", got, expect)
}
Expand All @@ -278,14 +277,14 @@ func Test_DatabaseCreation(t *testing.T) {
if params[0] == "describe" && params[1] == "postgresql/cluster" {
return "error", errors.New("error kubectl describe")
}
if params[0] == "create" && params[1] == "-f" && params[2] == "cluster.yml" {
if params[0] == "create" && params[1] == "-f" && params[2] == "psql-cluster.yml" {
return "error", errors.New("error kubectl create")
}
return "map[PostgresClusterStatus:Running]", nil
}

expect := "error creating database cluster"
got := k8sImpl.DatabaseCreation("cluster.yml")
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
if !strings.Contains(got.Error(), expect) {
t.Fatalf("Got error %v, expect error %v", got, expect)
}
Expand All @@ -303,7 +302,7 @@ func Test_DatabaseCreation(t *testing.T) {
}

expect := "error creating job for cluster"
got := k8sImpl.DatabaseCreation("cluster.yml")
got := k8sImpl.DatabaseCreation("psql-cluster.yml")
if !strings.Contains(got.Error(), expect) {
t.Fatalf("Got error %v, expect error %v", got, expect)
}
Expand Down
67 changes: 18 additions & 49 deletions k8s/k8ssetup/k8ssetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type K8sSetUp interface {
Initialize() error
InstallPostgresqlOperator() error
DatabaseCreation(fileName string) error
InstallKafkaOperator() error
CheckKudoInstallation() error
KafkaClusterCreation(fileName string) error
}

type k8sSetUpImpl struct {
Expand Down Expand Up @@ -51,20 +52,13 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error {
return nil
}

func (k k8sSetUpImpl) InstallKafkaOperator() error {
log.Println("Installing Kafka operator ...")
func (k k8sSetUpImpl) CheckKudoInstallation() error {
log.Println("Checking kudo installation ...")

if installed := k.isKafkaOperatorInstalled(); !installed {
log.Println("Kafka operator not installed ...")
if err := k.doKafkaOperatorInstallation(); err != nil {
return fmt.Errorf("error installing Kafka operator: %v", err)
}
k.waiKafkaOperatorRunning()

} else {
log.Println("Kafka operator is installed ...")
if _, err := k.kubectl("kudo", "version"); err != nil {
return fmt.Errorf("kudo is not installed: %v", err)
}

log.Println("Kudo is installed ...")
return nil
}

Expand All @@ -77,16 +71,7 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() {
log.Print("Psql operator is running")
}

func (k k8sSetUpImpl) waiKafkaOperatorRunning() {
cnt := true
for cnt {
ready, err := k.isKafkaOperatorRunning()
cnt = !(err == nil && ready)
}
log.Print("Kafka operator is running")
}

func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, err error) {
func (k k8sSetUpImpl) isPodRunning(name, namespace string) (running bool, err error) {
log.Printf("Checking if %s operator is already running ...", name)

var podNames, output string
Expand All @@ -105,11 +90,7 @@ func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, e
}

func (k k8sSetUpImpl) isPsqlOperatorRunning() (bool, error) {
return k.isOperatorRunning("postgres-operator", "default")
}

func (k k8sSetUpImpl) isKafkaOperatorRunning() (bool, error) {
return k.isOperatorRunning("strimzi-cluster-operator", "kafka")
return k.isPodRunning("postgres-operator", "default")
}

func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
Expand All @@ -121,15 +102,6 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
return true
}

func (k k8sSetUpImpl) isKafkaOperatorInstalled() bool {
log.Println("Checking if kafka operator is already installed ...")
if _, err := k.kubectl("describe", "deployment.apps/strimzi-cluster-operator", "-n", "kafka"); err != nil {
return false
}

return true
}

func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
log.Println("Installing postgreSQL operator ...")
dir, err := ioutil.TempDir("", "pets-go-infra")
Expand Down Expand Up @@ -164,18 +136,6 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
return nil
}

func (k *k8sSetUpImpl) doKafkaOperatorInstallation() error {
log.Println("Installing kafka operator ...")
if _, err := k.kubectl("create", "namespace", "kafka"); err != nil {
return fmt.Errorf("error in kubectl create: %v", err)
}
if _, err := k.kubectl("apply", "-f", "https://strimzi.io/install/latest?namespace=kafka", "-n", "kafka"); err != nil {
return fmt.Errorf("error in kubectl apply: %v", err)
}

return nil
}

func (k k8sSetUpImpl) kubectl(params ...string) (output string, err error) {
return k.executeCommand(k.kubectlPath, params...)
}
Expand All @@ -200,6 +160,15 @@ func (k k8sSetUpImpl) defaultExecuteCommand(cmdName string, params ...string) (o
return
}

func (k k8sSetUpImpl) isResourceCreated(rtype, name, namespace string) (bool, error) {
log.Printf("Checking if resource %q name %q is already created ...", rtype, name)
if _, err := k.kubectl("describe", rtype+"/"+name, "-n", namespace); err != nil {
return false, err
}

return true, nil
}

// NewK8sSetUp returns a K8sSetUp interface
func NewK8sSetUp() K8sSetUp {
impl := &k8sSetUpImpl{
Expand Down
Loading

0 comments on commit 3adcf92

Please sign in to comment.