Skip to content

Commit

Permalink
feat(flink): application deployment (#1223)
Browse files Browse the repository at this point in the history
  • Loading branch information
Serpentiel authored Jun 27, 2023
1 parent 18b9d90 commit 51d4346
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nav_order: 1
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Fix wrong ID used for `organization` resources
- Add `aiven_flink_application_deployment` resource

## [4.5.0] - 2023-06-14

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/aiven/terraform-provider-aiven
go 1.18

require (
github.com/aiven/aiven-go-client v1.17.0
github.com/aiven/aiven-go-client v1.19.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.6.1
github.com/docker/go-units v0.5.0
Expand All @@ -28,7 +28,7 @@ require (
cloud.google.com/go/iam v0.13.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/hashicorp/go-retryablehttp v0.7.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7l
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/aiven/aiven-go-client v1.17.0 h1:4vD2jb8xViTW7cCnH/XabT59DJ8dDyZzOsAo1DsGFHo=
github.com/aiven/aiven-go-client v1.17.0/go.mod h1:3Hh1PDNcqNNCYrkU/jSAHMV/b/ynoy73fwhBPKnMe6I=
github.com/aiven/aiven-go-client v1.18.0 h1:qY+/rtrLOmRmMHaOcrrpPKgEVeAygK/HnFAM5kLHRJE=
github.com/aiven/aiven-go-client v1.18.0/go.mod h1:3Hh1PDNcqNNCYrkU/jSAHMV/b/ynoy73fwhBPKnMe6I=
github.com/aiven/aiven-go-client v1.19.0 h1:wUDPKr5noe4eZ581S4m/RwYuWSKprBUWlrQqGmTbDr8=
github.com/aiven/aiven-go-client v1.19.0/go.mod h1:3Hh1PDNcqNNCYrkU/jSAHMV/b/ynoy73fwhBPKnMe6I=
github.com/aiven/go-api-schemas v1.15.0 h1:Yg6gFPH9cyieOA4LbPFKWWuDUGcXFWnfo/KYY1E0EVc=
github.com/aiven/go-api-schemas v1.15.0/go.mod h1:RmQ8MfxwxAP2ji9eJtP6dICOaTMcQD9b5aQT3Bp7uzI=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down Expand Up @@ -384,6 +388,8 @@ github.com/hashicorp/go-plugin v1.4.10 h1:xUbmA4jC6Dq163/fWcp8P3JuHilrHHMLNRxzGQ
github.com/hashicorp/go-plugin v1.4.10/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0=
github.com/hashicorp/go-retryablehttp v0.7.2 h1:AcYqCvkpalPnPF2pn0KamgwamS42TqUDDYFRKq/RAd0=
github.com/hashicorp/go-retryablehttp v0.7.2/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo=
github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
Expand Down
7 changes: 4 additions & 3 deletions internal/sdkprovider/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ func Provider(version string) *schema.Provider {
"aiven_m3aggregator": m3db.ResourceM3Aggregator(),

// flink
"aiven_flink": flink.ResourceFlink(),
"aiven_flink_application": flink.ResourceFlinkApplication(),
"aiven_flink_application_version": flink.ResourceFlinkApplicationVersion(),
"aiven_flink": flink.ResourceFlink(),
"aiven_flink_application": flink.ResourceFlinkApplication(),
"aiven_flink_application_version": flink.ResourceFlinkApplicationVersion(),
"aiven_flink_application_deployment": flink.ResourceFlinkApplicationDeployment(),

// opensearch
"aiven_opensearch": opensearch.ResourceOpensearch(),
Expand Down
229 changes: 229 additions & 0 deletions internal/sdkprovider/service/flink/flink_application_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Package flink is the package that contains the schema definitions for the Flink resources.
package flink

import (
"context"
"time"

"github.com/aiven/aiven-go-client"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"

"github.com/aiven/terraform-provider-aiven/internal/schemautil"
)

// aivenFlinkApplicationDeploymentSchema is the schema for the Flink Application Deployment resource.
var aivenFlinkApplicationDeploymentSchema = map[string]*schema.Schema{
"project": schemautil.CommonSchemaProjectReference,
"service_name": schemautil.CommonSchemaServiceNameReference,
"application_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: "Application ID",
},
// Request fields.
"parallelism": {
Type: schema.TypeInt,
Optional: true,
Description: "Flink Job parallelism",
ValidateFunc: validation.IntBetween(1, 128),
ForceNew: true,
Default: 1,
},
"restart_enabled": {
Type: schema.TypeBool,
Optional: true,
Description: "Specifies whether a Flink Job is restarted in case it fails",
ForceNew: true,
Default: true,
},
"starting_savepoint": {
Type: schema.TypeString,
Optional: true,
Description: "Job savepoint",
ValidateFunc: validation.StringLenBetween(1, 2048),
ForceNew: true,
},
"version_id": {
Type: schema.TypeString,
Required: true,
Description: "ApplicationVersion ID",
ForceNew: true,
},
// Computed fields.
"created_at": {
Type: schema.TypeString,
Computed: true,
Description: "Application deployment creation time",
},
"created_by": {
Type: schema.TypeString,
Computed: true,
Description: "Application deployment creator",
},
}

// ResourceFlinkApplicationDeployment returns the schema for the Flink Application Deployment resource.
func ResourceFlinkApplicationDeployment() *schema.Resource {
return &schema.Resource{
Description: "The Flink Application Deployment resource allows the creation and management of Aiven Flink " +
"Application Deployments.",
CreateContext: resourceFlinkApplicationDeploymentCreate,
ReadContext: resourceFlinkApplicationDeploymentRead,
DeleteContext: resourceFlinkApplicationDeploymentDelete,
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
Timeouts: schemautil.DefaultResourceTimeouts(),
Schema: aivenFlinkApplicationDeploymentSchema,
}
}

// resourceFlinkApplicationDeploymentCreate creates a new Flink Application Deployment resource.
func resourceFlinkApplicationDeploymentCreate(
ctx context.Context,
d *schema.ResourceData,
m interface{},
) diag.Diagnostics {
client := m.(*aiven.Client)

project := d.Get("project").(string)
serviceName := d.Get("service_name").(string)
applicationID := d.Get("application_id").(string)

var req aiven.CreateFlinkApplicationDeploymentRequest

if v, ok := d.GetOk("parallelism"); ok {
req.Parallelism = v.(int)
}

if v, ok := d.GetOk("restart_enabled"); ok {
req.RestartEnabled = v.(bool)
}

if v, ok := d.GetOk("starting_savepoint"); ok {
req.StartingSavepoint = v.(string)
}

if v, ok := d.GetOk("version_id"); ok {
req.VersionID = v.(string)
}

r, err := client.FlinkApplicationDeployments.Create(project, serviceName, applicationID, req)
if err != nil {
return diag.Errorf("cannot create Flink Application Deployment: %v", err)
}

d.SetId(schemautil.BuildResourceID(project, serviceName, applicationID, r.ID))

return resourceFlinkApplicationDeploymentRead(ctx, d, m)
}

// resourceFlinkApplicationDeploymentDelete deletes an existing Flink Application Deployment resource.
//
//nolint:staticcheck // Ignore resource.StateChangeConf deprecation warning.
func resourceFlinkApplicationDeploymentDelete(
ctx context.Context,
d *schema.ResourceData,
m interface{},
) diag.Diagnostics {
client := m.(*aiven.Client)

project, serviceName, applicationID, deploymentID, err := schemautil.SplitResourceID4(d.Id())
if err != nil {
return diag.Errorf("cannot read Flink Application Deployment resource ID: %v", err)
}

_, err = client.FlinkApplicationDeployments.Cancel(project, serviceName, applicationID, deploymentID)
if err != nil {
return diag.Errorf("error cancelling Flink Application Deployment: %v", err)
}

//goland:noinspection GoDeprecation
conf := &resource.StateChangeConf{
Pending: []string{
"CANCELLING",
},
Target: []string{
"CANCELED",
},
Refresh: func() (interface{}, string, error) {
r, err := client.FlinkApplicationDeployments.Get(project, serviceName, applicationID, deploymentID)
if err != nil {
return nil, "", err
}
return r, r.Status, nil
},
Delay: 1 * time.Second,
Timeout: d.Timeout(schema.TimeoutDelete),
MinTimeout: 1 * time.Second,
}

_, err = conf.WaitForStateContext(ctx)
if err != nil {
return diag.Errorf("error waiting for Flink Application Deployment to become canceled: %s", err)
}

_, err = client.FlinkApplicationDeployments.Delete(project, serviceName, applicationID, deploymentID)
if err != nil {
return diag.Errorf("error deleting Flink Application Deployment: %v", err)
}

return nil
}

// resourceFlinkApplicationDeploymentRead reads an existing Flink Application Deployment resource.
func resourceFlinkApplicationDeploymentRead(_ context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

project, serviceName, applicationID, deploymentID, err := schemautil.SplitResourceID4(d.Id())
if err != nil {
return diag.Errorf("cannot read Flink Application Deployment resource ID: %v", err)
}

r, err := client.FlinkApplicationDeployments.Get(project, serviceName, applicationID, deploymentID)
if err != nil {
return diag.Errorf("cannot get Flink Application Deployment: %v", err)
}

if err := d.Set("project", project); err != nil {
return diag.Errorf("error setting Flink Application Deployment `project` field: %s", err)
}

if err := d.Set("service_name", serviceName); err != nil {
return diag.Errorf("error setting Flink Application Deployment `service_name` field: %s", err)
}

if err := d.Set("application_id", applicationID); err != nil {
return diag.Errorf("error setting Flink Application Version `application_id` field: %s", err)
}

if err := d.Set("parallelism", r.Parallelism); err != nil {
return diag.Errorf("error setting Flink Application Deployment `parallelism` field: %s", err)
}

if err := d.Set("restart_enabled", r.RestartEnabled); err != nil {
return diag.Errorf("error setting Flink Application Deployment `restart_enabled` field: %s", err)
}

if err := d.Set("starting_savepoint", r.StartingSavepoint); err != nil {
return diag.Errorf("error setting Flink Application Deployment `starting_savepoint` field: %s", err)
}

if err := d.Set("version_id", r.VersionID); err != nil {
return diag.Errorf("error setting Flink Application Deployment `version_id` field: %s", err)
}

if err := d.Set("created_at", r.CreatedAt); err != nil {
return diag.Errorf("error setting Flink Application Deployment `created_at` field: %s", err)
}

if err := d.Set("created_by", r.CreatedBy); err != nil {
return diag.Errorf("error setting Flink Application Deployment `created_by` field: %s", err)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/aiven/aiven-go-client"

acc "github.com/aiven/terraform-provider-aiven/internal/acctest"
"github.com/aiven/terraform-provider-aiven/internal/schemautil"

Expand All @@ -16,6 +17,7 @@ import (

func TestAccAivenFlinkApplicationVersion_basic(t *testing.T) {
resourceName := "aiven_flink_application_version.foo"
resourceNameDeployment := "aiven_flink_application_deployment.foobar"
rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.TestAccPreCheck(t) },
Expand All @@ -27,9 +29,21 @@ func TestAccAivenFlinkApplicationVersion_basic(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
checkAivenFlinkApplicationVersionAttributes("data.aiven_flink_application_version.bar"),
resource.TestCheckResourceAttr(resourceName, "project", os.Getenv("AIVEN_PROJECT_NAME")),
resource.TestCheckResourceAttr(resourceName, "service_name", fmt.Sprintf("test-acc-flink-%s", rName)),
resource.TestCheckResourceAttr(
resourceName,
"service_name",
fmt.Sprintf("test-acc-flink-%s", rName),
),
resource.TestCheckResourceAttr(resourceName, "sink.#", "1"),
resource.TestCheckResourceAttr(resourceName, "source.#", "1"),
resource.TestCheckResourceAttr(
resourceNameDeployment, "project", os.Getenv("AIVEN_PROJECT_NAME"),
),
resource.TestCheckResourceAttr(
resourceNameDeployment,
"service_name",
fmt.Sprintf("test-acc-flink-%s", rName),
),
),
},
},
Expand Down Expand Up @@ -127,6 +141,13 @@ resource "aiven_flink_application_version" "foo" {
}
}
resource "aiven_flink_application_deployment" "foobar" {
project = data.aiven_project.foo.project
service_name = aiven_flink.foo.service_name
application_id = aiven_flink_application.foo.application_id
version_id = data.aiven_flink_application_version.bar.application_version_id
}
data "aiven_flink_application_version" "bar" {
project = data.aiven_project.foo.project
service_name = aiven_flink.foo.service_name
Expand Down

0 comments on commit 51d4346

Please sign in to comment.