Skip to content

Commit

Permalink
kafka operator installation, working on #32
Browse files Browse the repository at this point in the history
  • Loading branch information
FanyCastro committed Nov 1, 2020
1 parent 4986a38 commit 442d67a
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 15 deletions.
70 changes: 63 additions & 7 deletions k8s/k8ssetup/k8ssetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type K8sSetUp interface {
Initialize() error
InstallPostgresqlOperator() error
DatabaseCreation(fileName string) error
InstallKafkaOperator() error
}

type k8sSetUpImpl struct {
Expand Down Expand Up @@ -50,6 +51,23 @@ func (k k8sSetUpImpl) InstallPostgresqlOperator() error {
return nil
}

func (k k8sSetUpImpl) InstallKafkaOperator() error {
log.Println("Installing Kafka operator ...")

if installed := k.isKafkaOperatorInstalled(); !installed {
log.Println("Kafka operator not installed ...")
if err := k.doKafkaOperatorInstallation(); err != nil {
return fmt.Errorf("error installing Kafka operator: %v", err)
}
k.waiKafkaOperatorRunning()

} else {
log.Println("Kafka operator is installed ...")
}

return nil
}

func (k k8sSetUpImpl) waitPsqlOperatorRunning() {
cnt := true
for cnt {
Expand All @@ -59,24 +77,41 @@ func (k k8sSetUpImpl) waitPsqlOperatorRunning() {
log.Print("Psql operator is running")
}

func (k k8sSetUpImpl) isPsqlOperatorRunning() (running bool, err error) {
log.Printf("Checking if postgres operator is already running ...")
func (k k8sSetUpImpl) waiKafkaOperatorRunning() {
cnt := true
for cnt {
ready, err := k.isKafkaOperatorRunning()
cnt = !(err == nil && ready)
}
log.Print("Kafka operator is running")
}

func (k k8sSetUpImpl) isOperatorRunning(name, namespace string) (running bool, err error) {
log.Printf("Checking if %s operator is already running ...", name)

var podNames, output string
if podNames, err = k.kubectl("get", "pod", "-o", "name"); err == nil {
for _, name := range strings.Split(podNames, "\n") {
if strings.Contains(name, "postgres-operator") {
if output, err = k.kubectl("get", name, "-o", "jsonpath='{.status.phase}'"); err != nil {
if podNames, err = k.kubectl("get", "pod", "-o", "name", "-n", namespace); err == nil {
for _, podName := range strings.Split(podNames, "\n") {
if strings.Contains(podName, name) {
if output, err = k.kubectl("get", podName, "-o", "jsonpath='{.status.containerStatuses[0].ready}'", "-n", namespace); err != nil {
return false, err
}
running = output == "'Running'"
running = output == "'true'"
break
}
}
}
return
}

func (k k8sSetUpImpl) isPsqlOperatorRunning() (bool, error) {
return k.isOperatorRunning("postgres-operator", "default")
}

func (k k8sSetUpImpl) isKafkaOperatorRunning() (bool, error) {
return k.isOperatorRunning("strimzi-cluster-operator", "kafka")
}

func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
log.Println("Checking if postgresql operator is already installed ...")
if _, err := k.kubectl("describe", "service/postgres-operator"); err != nil {
Expand All @@ -86,6 +121,15 @@ func (k k8sSetUpImpl) isPostgreSQLOperatorInstalled() bool {
return true
}

func (k k8sSetUpImpl) isKafkaOperatorInstalled() bool {
log.Println("Checking if kafka operator is already installed ...")
if _, err := k.kubectl("describe", "deployment.apps/strimzi-cluster-operator", "-n", "kafka"); err != nil {
return false
}

return true
}

func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
log.Println("Installing postgreSQL operator ...")
dir, err := ioutil.TempDir("", "pets-go-infra")
Expand Down Expand Up @@ -120,6 +164,18 @@ func (k *k8sSetUpImpl) doPsqlOperatorInstallation() error {
return nil
}

func (k *k8sSetUpImpl) doKafkaOperatorInstallation() error {
log.Println("Installing kafka operator ...")
if _, err := k.kubectl("create", "namespace", "kafka"); err != nil {
return fmt.Errorf("error in kubectl create: %v", err)
}
if _, err := k.kubectl("apply", "-f", "https://strimzi.io/install/latest?namespace=kafka", "-n", "kafka"); err != nil {
return fmt.Errorf("error in kubectl apply: %v", err)
}

return nil
}

func (k k8sSetUpImpl) kubectl(params ...string) (output string, err error) {
return k.executeCommand(k.kubectlPath, params...)
}
Expand Down
Loading

0 comments on commit 442d67a

Please sign in to comment.