Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(msk): support ServerlessCluster #32780

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,24 @@ const cluster = new msk.Cluster(this, 'cluster', {
storageMode: msk.StorageMode.TIERED,
});
```

## MSK Serverless

You can also use MSK Serverless by using `ServerlessCluster` class.

MSK Serverless is a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity.

MSK Serverless requires IAM access control for all clusters.

For more infomation, see [Use MSK Serverless clusters](https://docs.aws.amazon.com/msk/latest/developerguide/serverless-getting-started.html).

```ts
declare const vpc: ec2.Vpc;

const serverlessCluster = new msk.ServerlessCluster(this, 'ServerlessCluster', {
clusterName: 'MyServerlessCluster',
vpcConfigs: [
{ vpc },
],
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-msk-alpha/awslint.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"exclude": [
"props-physical-name:@aws-cdk/aws-msk-alpha.ServerlessClusterProps"
]
}
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface ICluster extends core.IResource, ec2.IConnectable {
/**
* A new or imported MSK Cluster.
*/
abstract class ClusterBase extends core.Resource implements ICluster {
export abstract class ClusterBase extends core.Resource implements ICluster {
public abstract readonly clusterArn: string;
public abstract readonly clusterName: string;
/** @internal */
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './cluster';
export * from './cluster-version';
export * from './serverless-cluster';

// AWS::MSK CloudFormation Resources:
147 changes: 147 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/serverless-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import { Fn, Lazy, Names } from 'aws-cdk-lib';
import * as constructs from 'constructs';
import { ClusterBase, ICluster } from '.';
import { CfnServerlessCluster } from 'aws-cdk-lib/aws-msk';

/**
* Properties for a MSK Serverless Cluster
*/
export interface ServerlessClusterProps {
/**
* The physical name of the cluster.
*
* @default - auto generate
*/
readonly clusterName?: string;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add clusterName validation because the correct regex pattern is unclear.
Please refer to #32792.


/**
* The configuration of the Amazon VPCs for the cluster.
* You can specify up to 5 VPC configurations.
*/
readonly vpcConfigs: VpcConfig[];
Copy link
Contributor

@badmintoncryer badmintoncryer Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think MSK Serverless clusters can only be configured for a single VPC. However, the current configuration allows multiple VPCs to be passed simultaneously in vpcConfigs.

In CloudFormation, vpcConfigs supports multiple configurations, but are there any practical cases where multiple settings are actually needed? If not, wouldn't it make sense to allow only a single VpcConfig to be accepted? What are your thoughts on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multiple VPC setup is provided for cases where clients in different VPCs need to connect to a Serverless Cluster, so I think it’s better to have this option available.

In docs, the following is stated:

You can connect clients from up to five different VPCs with MSK Serverless clusters. To help client applications switch over to another Availability Zone in the event of an outage, you must specify at least two subnets in each VPC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had assumed that it only supported a single VPC! Thank you for the explanation.

}

/**
* The configuration of the Amazon VPCs for the cluster.
*/
export interface VpcConfig {
/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;

/**
* The subnets associated with the cluster.
*
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* The security groups associated with the cluster.
* You can specify up to 5 security groups.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];
}

/**
* Create a MSK Serverless Cluster.
*
* @resource AWS::MSK::ServerlessCluster
*/
export class ServerlessCluster extends ClusterBase {
/**
* Reference an existing cluster, defined outside of the CDK code, by name.
*/
public static fromClusterArn(
scope: constructs.Construct,
id: string,
clusterArn: string,
): ICluster {
class Import extends ClusterBase {
public readonly clusterArn = clusterArn;
public readonly clusterName = Fn.select(1, Fn.split('/', clusterArn)); // ['arn:partition:kafka:region:account-id', clusterName, clusterId]
}

return new Import(scope, id);
}

public readonly clusterArn: string;
public readonly clusterName: string;

private _securityGroups: ec2.ISecurityGroup[] = [];

constructor(scope: constructs.Construct, id: string, props: ServerlessClusterProps) {
super(scope, id, {
physicalName: props.clusterName ??
Lazy.string({
produce: () => Names.uniqueResourceName(this, { maxLength: 64 }),
}),
});

if (props.vpcConfigs.length < 1 || props.vpcConfigs.length > 5) {
throw Error(`\`vpcConfigs\` must contain between 1 and 5 configurations, got ${props.vpcConfigs.length} configurations.`);
}

const vpcConfigs = props.vpcConfigs.map((vpcConfig, index) => this._renderVpcConfig(vpcConfig, index));

this._connections = new ec2.Connections({
securityGroups: this._securityGroups,
});

const resource = new CfnServerlessCluster(this, 'Resource', {
clusterName: this.physicalName,
clientAuthentication: {
sasl: {
iam: {
enabled: true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is required and cannot be false.
I tried deploying with enabled set to false, but the deployment failed with the following error:

A serverless cluster must use SASL/IAM authentication. (Service: Kafka, Status Code: 400, Request ID...

As a result, I have decided not to expose this property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your information.
All the CloudFormation documents are useless, and I wish something could be done about it.

},
},
},
vpcConfigs,
});

this.clusterName = this.getResourceNameAttribute(
Fn.select(1, Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
}

/**
* Render Vpc Config property
*/
private _renderVpcConfig(vpcConfig: VpcConfig, index: number): CfnServerlessCluster.VpcConfigProperty {
const subnetSelection = vpcConfig.vpc.selectSubnets(vpcConfig.vpcSubnets);

if (subnetSelection.subnets.length < 2) {
throw Error(
`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}.`,
mazyu36 marked this conversation as resolved.
Show resolved Hide resolved
);
}

let securityGroups: ec2.ISecurityGroup[] = [];

if (vpcConfig.securityGroups) {
if (vpcConfig.securityGroups.length > 5) {
throw Error(`\`securityGroups\` must not exceed 5 elements, got ${vpcConfig.securityGroups.length} elements.`);
}
securityGroups = vpcConfig.securityGroups;
} else {
securityGroups.push(new ec2.SecurityGroup(this, `SecurityGroup-${index}`, {
description: 'MSK Serverless security group',
vpc: vpcConfig.vpc,
}));
}

this._securityGroups.push(...securityGroups);

return {
subnetIds: subnetSelection.subnets.map((subnet) => subnet.subnetId),
securityGroups: securityGroups?.map((securityGroup) => securityGroup.securityGroupId),
};
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading