Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(msk): support ServerlessCluster #32780

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,24 @@ const cluster = new msk.Cluster(this, 'cluster', {
storageMode: msk.StorageMode.TIERED,
});
```

## MSK Serverless

You can also use MSK Serverless by using `ServerlessCluster` class.

MSK Serverless is a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity.

MSK Serverless requires IAM access control for all clusters.

For more infomation, see [Use MSK Serverless clusters](https://docs.aws.amazon.com/msk/latest/developerguide/serverless-getting-started.html).

```ts
declare const vpc: ec2.Vpc;

const serverlessCluster = new msk.ServerlessCluster(this, 'ServerlessCluster', {
clusterName: 'MyServerlessCluster',
vpcConfigs: [
{ vpc },
],
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-msk-alpha/awslint.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"exclude": [
"props-physical-name:@aws-cdk/aws-msk-alpha.ServerlessClusterProps"
]
}
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface ICluster extends core.IResource, ec2.IConnectable {
/**
* A new or imported MSK Cluster.
*/
abstract class ClusterBase extends core.Resource implements ICluster {
export abstract class ClusterBase extends core.Resource implements ICluster {
public abstract readonly clusterArn: string;
public abstract readonly clusterName: string;
/** @internal */
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './cluster';
export * from './cluster-version';
export * from './serverless-cluster';

// AWS::MSK CloudFormation Resources:
147 changes: 147 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/serverless-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import { Fn, Lazy, Names } from 'aws-cdk-lib';
import * as constructs from 'constructs';
import { ClusterBase, ICluster } from '.';
import { CfnServerlessCluster } from 'aws-cdk-lib/aws-msk';

/**
* Properties for a MSK Serverless Cluster
*/
export interface ServerlessClusterProps {
/**
* The physical name of the cluster.
*
* @default - auto generate
*/
readonly clusterName?: string;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add clusterName validation because the correct regex pattern is unclear.
Please refer to #32792.


/**
* The configuration of the Amazon VPCs for the cluster.
* You can specify up to 5 VPC configurations.
*/
readonly vpcConfigs: VpcConfig[];
}

/**
* The configuration of the Amazon VPCs for the cluster.
*/
export interface VpcConfig {
/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;

/**
* The subnets associated with the cluster.
*
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* The security groups associated with the cluster.
* You can specify up to 5 security groups.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];
}

/**
* Create a MSK Serverless Cluster.
*
* @resource AWS::MSK::ServerlessCluster
*/
export class ServerlessCluster extends ClusterBase {
/**
* Reference an existing cluster, defined outside of the CDK code, by name.
*/
public static fromClusterArn(
scope: constructs.Construct,
id: string,
clusterArn: string,
): ICluster {
class Import extends ClusterBase {
public readonly clusterArn = clusterArn;
public readonly clusterName = Fn.select(1, Fn.split('/', clusterArn)); // ['arn:partition:kafka:region:account-id', clusterName, clusterId]
}

return new Import(scope, id);
}

public readonly clusterArn: string;
public readonly clusterName: string;

private _securityGroups: ec2.ISecurityGroup[] = [];

constructor(scope: constructs.Construct, id: string, props: ServerlessClusterProps) {
super(scope, id, {
physicalName: props.clusterName ??
Lazy.string({
produce: () => Names.uniqueResourceName(this, { maxLength: 64 }),
}),
});

if (props.vpcConfigs.length < 1 || props.vpcConfigs.length > 5) {
throw Error(`\`vpcConfigs\` must contain between 1 and 5 configurations, got ${props.vpcConfigs.length} configurations.`);
}

const vpcConfigs = props.vpcConfigs.map((vpcConfig, index) => this._renderVpcConfig(vpcConfig, index));

this._connections = new ec2.Connections({
securityGroups: this._securityGroups,
});

const resource = new CfnServerlessCluster(this, 'Resource', {
clusterName: this.physicalName,
clientAuthentication: {
sasl: {
iam: {
enabled: true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is required and cannot be false.
I tried deploying with enabled set to false, but the deployment failed with the following error:

A serverless cluster must use SASL/IAM authentication. (Service: Kafka, Status Code: 400, Request ID...

As a result, I have decided not to expose this property.

},
},
},
vpcConfigs,
});

this.clusterName = this.getResourceNameAttribute(
Fn.select(1, Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
}

/**
* Render Vpc Config property
*/
private _renderVpcConfig(vpcConfig: VpcConfig, index: number): CfnServerlessCluster.VpcConfigProperty {
const subnetSelection = vpcConfig.vpc.selectSubnets(vpcConfig.vpcSubnets);

if (subnetSelection.subnets.length < 2) {
throw Error(
`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}.`,
);
}

let securityGroups: ec2.ISecurityGroup[] = [];

if (vpcConfig.securityGroups) {
if (vpcConfig.securityGroups.length < 1 || vpcConfig.securityGroups.length > 5) {
throw Error(`\`securityGroups\` must contain between 1 and 5 elements, got ${vpcConfig.securityGroups.length} elements.`);
}
securityGroups = vpcConfig.securityGroups;
} else {
securityGroups.push(new ec2.SecurityGroup(this, `SecurityGroup-${index}`, {
description: 'MSK Serverless security group',
vpc: vpcConfig.vpc,
}));
}

this._securityGroups.push(...securityGroups);

return {
subnetIds: subnetSelection.subnets.map((subnet) => subnet.subnetId),
securityGroups: securityGroups?.map((securityGroup) => securityGroup.securityGroupId),
};
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading