Skip to content

Commit

Permalink
feat(batch-jobs): Add step to pass API server env vars to spark jobs (#…
Browse files Browse the repository at this point in the history
…400)

* Add step to pass api server env vars to spark jobs

* Fix bug and add unit tests
  • Loading branch information
deadlycoconuts authored Jan 7, 2025
1 parent 2725429 commit 26ea22a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
15 changes: 13 additions & 2 deletions api/turing/cluster/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"fmt"
"os"
"strconv"

apisparkv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
Expand Down Expand Up @@ -192,7 +193,7 @@ func createSparkExecutor(request *CreateSparkRequest) (*apisparkv1beta2.Executor
Path: serviceAccountMount,
},
},
Env: append(defaultEnvVars, getEnvVarFromRequest(request)...),
Env: getEnvVars(request),
Labels: request.JobLabels,
},
}
Expand Down Expand Up @@ -242,7 +243,7 @@ func createSparkDriver(request *CreateSparkRequest) (*apisparkv1beta2.DriverSpec
Path: serviceAccountMount,
},
},
Env: append(defaultEnvVars, getEnvVarFromRequest(request)...),
Env: getEnvVars(request),
Labels: request.JobLabels,
ServiceAccount: &request.ServiceAccountName,
},
Expand Down Expand Up @@ -295,3 +296,13 @@ func toMegabyte(request string) (*string, error) {
strVal := fmt.Sprintf("%sm", strconv.Itoa(int(inMegaBytes)))
return &strVal, nil
}

func getEnvVars(request *CreateSparkRequest) []apicorev1.EnvVar {
envVars := defaultEnvVars

for _, ev := range request.SparkInfraConfig.APIServerEnvVars {
envVars = append(envVars, apicorev1.EnvVar{Name: ev, Value: os.Getenv(ev)})
}

return append(envVars, getEnvVarFromRequest(request)...)
}
85 changes: 85 additions & 0 deletions api/turing/cluster/spark_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"os"
"testing"

apisparkv1beta2 "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
Expand Down Expand Up @@ -124,6 +125,90 @@ func TestGetCPURequestAndLimit(t *testing.T) {
}
}

func TestGetEnvVars(t *testing.T) {
request := &CreateSparkRequest{
JobName: jobName,
JobLabels: jobLabels,
JobImageRef: jobImageRef,
JobApplicationPath: jobApplicationPath,
JobArguments: jobArguments,
JobConfigMount: batch.JobConfigMount,
DriverCPURequest: cpuValue,
DriverMemoryRequest: memoryValue,
ExecutorCPURequest: cpuValue,
ExecutorMemoryRequest: memoryValue,
ExecutorReplica: executorReplica,
ServiceAccountName: serviceAccountName,
SparkInfraConfig: sparkInfraConfig,
EnvVars: &envVars,
}
tests := map[string]struct {
sparkInfraConfigAPIServerEnvVars []string
apiServerEnvVars []apicorev1.EnvVar
expectedEnvVars []apicorev1.EnvVar
}{
"api server env vars specified": {
[]string{"TEST_ENV_VAR_1"},
[]apicorev1.EnvVar{
{
Name: "TEST_ENV_VAR_1",
Value: "TEST_VALUE_1",
},
},
[]apicorev1.EnvVar{
{
Name: envServiceAccountPathKey,
Value: envServiceAccountPath,
},
{
Name: "TEST_ENV_VAR_1",
Value: "TEST_VALUE_1",
},
{
Name: "foo",
Value: barString,
},
},
},
"no api server env vars specified": {
[]string{},
[]apicorev1.EnvVar{
{
Name: "TEST_ENV_VAR_1",
Value: "TEST_VALUE_1",
},
},
[]apicorev1.EnvVar{
{
Name: envServiceAccountPathKey,
Value: envServiceAccountPath,
},
{
Name: "foo",
Value: barString,
},
},
},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
for _, ev := range tt.apiServerEnvVars {
err := os.Setenv(ev.Name, ev.Value)
assert.NoError(t, err)
}

request.SparkInfraConfig.APIServerEnvVars = tt.sparkInfraConfigAPIServerEnvVars
envVars := getEnvVars(request)
assert.Equal(t, tt.expectedEnvVars, envVars)

for _, ev := range tt.apiServerEnvVars {
err := os.Unsetenv(ev.Name)
assert.NoError(t, err)
}
})
}
}

var (
jobName = "jobname"
jobImageRef = "gojek/nosuchimage"
Expand Down
1 change: 1 addition & 0 deletions api/turing/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ type KanikoConfig struct {
// SparkAppConfig contains the infra configurations that is unique to the user's Kubernetes
type SparkAppConfig struct {
NodeSelector map[string]string
APIServerEnvVars []string
CorePerCPURequest float64 `validate:"required"`
CPURequestToCPULimit float64 `validate:"required"`
SparkVersion string `validate:"required"`
Expand Down

0 comments on commit 26ea22a

Please sign in to comment.