Skip to content

Commit

Permalink
feat(msk): support ServerlessCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
mazyu36 committed Jan 7, 2025
1 parent db345c5 commit b5f1c88
Show file tree
Hide file tree
Showing 12 changed files with 3,090 additions and 1 deletion.
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-msk-alpha/lib/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface ICluster extends core.IResource, ec2.IConnectable {
/**
* A new or imported MSK Cluster.
*/
abstract class ClusterBase extends core.Resource implements ICluster {
export abstract class ClusterBase extends core.Resource implements ICluster {
public abstract readonly clusterArn: string;
public abstract readonly clusterName: string;
/** @internal */
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './cluster';
export * from './cluster-version';
export * from './serverless-cluster';

// AWS::MSK CloudFormation Resources:
148 changes: 148 additions & 0 deletions packages/@aws-cdk/aws-msk-alpha/lib/serverless-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as core from 'aws-cdk-lib/core';
import * as constructs from 'constructs';
import { ClusterBase, ICluster } from '.';
import { CfnServerlessCluster } from 'aws-cdk-lib/aws-msk';

/**
* Properties for a MSK Serverless Cluster
*/
export interface ServerlessClusterProps {
/**
* The physical name of the cluster.
*
* @default - auto generate
*/
readonly clusterName: string;

/**
* Use IAM to authenticate the identity of clients that connect to an MSK cluster.
* Use the IAM console to create and deploy IAM user- or role-based policies.
*/
readonly iam: boolean;

/**
* The configuration of the Amazon VPCs for the cluster.
*/
readonly vpcConfigs: VpcConfig[];
}

/**
* The configuration of the Amazon VPCs for the cluster.
*/
export interface VpcConfig {
/**
* Defines the virtual networking environment for this cluster.
* Must have at least 2 subnets in two different AZs.
*/
readonly vpc: ec2.IVpc;

/**
* The subnets associated with the cluster.
*
* @default - the Vpc default strategy if not specified.
*/
readonly vpcSubnets?: ec2.SubnetSelection;

/**
* The security groups associated with the cluster.
*
* @default - create new security group
*/
readonly securityGroups?: ec2.ISecurityGroup[];
}

/**
* Create a MSK Serverless Cluster.
*
* @resource AWS::MSK::ServerlessCluster
*/
export class ServerlessCluster extends ClusterBase {
/**
* Reference an existing cluster, defined outside of the CDK code, by name.
*/
public static fromClusterArn(
scope: constructs.Construct,
id: string,
clusterArn: string,
): ICluster {
class Import extends ClusterBase {
public readonly clusterArn = clusterArn;
public readonly clusterName = core.Fn.select(1, core.Fn.split('/', clusterArn)); // ['arn:partition:kafka:region:account-id', clusterName, clusterId]
}

return new Import(scope, id);
}

public readonly clusterArn: string;
public readonly clusterName: string;

private _securityGroups: ec2.ISecurityGroup[] = [];

constructor(scope: constructs.Construct, id: string, props: ServerlessClusterProps) {
super(scope, id, {
physicalName: props.clusterName ??
core.Lazy.string({
produce: () => core.Names.uniqueResourceName(this, { maxLength: 64 }),
}),
});

if (
!core.Token.isUnresolved(props.clusterName) &&
!/^[a-zA-Z0-9]+$/.test(props.clusterName) &&
props.clusterName.length > 64
) {
throw Error(
'The cluster name must only contain alphanumeric characters and have a maximum length of 64 characters.' +
`got: '${props.clusterName}. length: ${props.clusterName.length}'`,
);
}

const vpcConfigs = props.vpcConfigs.map((vpcConfig, index) => this._renderVpcConfig(vpcConfig, index));

this._connections = new ec2.Connections({
securityGroups: this._securityGroups,
});

const resource = new CfnServerlessCluster(this, 'Resource', {
clusterName: this.physicalName,
clientAuthentication: {
sasl: {
iam: {
enabled: props.iam,
},
},
},
vpcConfigs,
});

this.clusterName = this.getResourceNameAttribute(
core.Fn.select(1, core.Fn.split('/', resource.ref)),
);
this.clusterArn = resource.ref;
}

private _renderVpcConfig(vpcConfig: VpcConfig, index: number): CfnServerlessCluster.VpcConfigProperty {
const subnetSelection = vpcConfig.vpc.selectSubnets(vpcConfig.vpcSubnets);

if (subnetSelection.subnets.length < 2) {
throw Error(
`Cluster requires at least 2 subnets, got ${subnetSelection.subnets.length}`,
);
}

const securityGroups = vpcConfig.securityGroups ?? [
new ec2.SecurityGroup(this, `SecurityGroup-${index}`, {
description: 'MSK Serverless security group',
vpc: vpcConfig.vpc,
}),
];

this._securityGroups.push(...securityGroups);

return {
subnetIds: subnetSelection.subnets.map((subnet) => subnet.subnetId),
securityGroups: vpcConfig.securityGroups?.map((securityGroup) => securityGroup.securityGroupId),
};
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b5f1c88

Please sign in to comment.