Skip to content

Commit

Permalink
feat: add config that sets headers to topic param values on incoming …
Browse files Browse the repository at this point in the history
…messages. (#143)

* feat: add config that sets headers to topic param values on incoming messages.

* Fixes for solarcloud.

* Updated README to document the new paramatersAsHeaders parameter. Simplified one generated function.

* Added rabbit support for the parametersToHeaders feature. Renamed 'topic' to 'channel' in variable names to keep it generic.

* Refactored to pass sonarCloud

* feat: add config that sets headers to topic param values on incoming messages.

* Fixes for solarcloud.

* Updated README to document the new paramatersAsHeaders parameter. Simplified one generated function.

* Added rabbit support for the parametersToHeaders feature. Renamed 'topic' to 'channel' in variable names to keep it generic.

* Refactored to pass sonarCloud

* Updated description of parametersToHeaders in package.json

* package.json was missing a comma.

Co-authored-by: Michael Davis <[email protected]>
  • Loading branch information
damaru-inc and MichaelDavisSolace authored Aug 9, 2021
1 parent 3be1d35 commit 8594d04
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 82 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ host | | tcp://localhost:55555 | The host connection property. Currently this on
javaPackage | info.x-java-package | | The Java package of the generated classes. If not set then the classes will be in the default package.
msgVpn | | default | The message vpn connection property. Currently this only works with the Solace binder. When other binders are used this parameter is ignored.
password | | default | The client password connection property. Currently this only works with the Solace binder. When other binders are used this parameter is ignored.
parametersToHeaders | | false | If true, this will create headers on the incoming messages for each channel parameter. Currently this only works with messages originating from Solace (using the solace_destination header) and RabbitMQ (using the amqp_receivedRoutingKey header.)
reactive | | false | If true, the generated functions will use the Reactive style and use the Flux class.
solaceSpringCloudVersion | info.x-solace-spring-cloud-version | 2.1.0 | The version of the solace-spring-cloud-bom dependency used when generating an application.
springBootVersion | info.x-spring-boot-version | 2.4.7 | The version of Spring Boot used when generating an application.
Expand Down
175 changes: 117 additions & 58 deletions filters/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ const _ = require('lodash');
const ScsLib = require('../lib/scsLib.js');
const scsLib = new ScsLib();
// To enable debug logging, set the env var DEBUG="type function" with whatever things you want to see.
const debugAppProperties = require('debug')('appProperties');
const debugDynamic = require('debug')('dynamic');
const debugFunction = require('debug')('function');
const debugJavaClass = require('debug')('javaClass');
const debugPayload = require('debug')('payload');
const debugProperty = require('debug')('property');
const debugTopic = require('debug')('topic');
const debugChannel = require('debug')('channel');
const debugType = require('debug')('type');

const stringMap = new Map();
Expand Down Expand Up @@ -79,6 +78,8 @@ class SCSFunction {
if (this.type === 'consumer' || (this.type === 'function' && this.dynamic && this.dynamicType === 'streamBridge')) {
if (this.reactive) {
ret = `public Consumer<Flux<${this.subscribePayload}>> ${this.name}()`;
} else if (this.dynamic && this.parametersToHeaders) {
ret = `public Consumer<Message<${this.subscribePayload}>> ${this.name}()`;
} else {
ret = `public Consumer<${this.subscribePayload}> ${this.name}()`;
}
Expand Down Expand Up @@ -142,6 +143,13 @@ function appProperties([asyncapi, params]) {
doc.spring.cloud = {};
const cloud = doc.spring.cloud;
cloud.function = {};

// See if we have dynamic functions, and if the parametersToHeaders param is set.
// If so, add the input-header-mapping-expression config to consumers which consume dynamic channels.
if (params.parametersToHeaders) {
handleParametersToHeaders(asyncapi, params, cloud);
}

debugProperty('appProperties getFunctionDefinitions');
cloud.function.definition = getFunctionDefinitions(asyncapi, params);
cloud.stream = {};
Expand Down Expand Up @@ -190,6 +198,34 @@ function appProperties([asyncapi, params]) {
}
filter.appProperties = appProperties;

function handleParametersToHeaders(asyncapi, params, cloud) {
const dynamicFuncs = getDynamicFunctions([asyncapi, params]);

if (dynamicFuncs && (params.binder === 'solace' || params.binder === 'rabbit')) {
cloud.function.configuration = {};
const funcs = getFunctionSpecs(asyncapi, params);

funcs.forEach((spec, name, map) => {
if (spec.dynamic && spec.type === 'consumer') {
cloud.function.configuration[name] = {};
cloud.function.configuration[name]['input-header-mapping-expression'] = {};
const headerConfig = cloud.function.configuration[name]['input-header-mapping-expression'];
addHeaderConfigs(params, spec.channelInfo, headerConfig);
}
});
}
}

function addHeaderConfigs(params, channelInfo, headerConfig) {
for (const param of channelInfo.parameters) {
if (params.binder === 'solace') {
headerConfig[param.name] = `headers.solace_destination.getName.split("/")[${param.position}]`;
} else if (params.binder === 'rabbit') {
headerConfig[param.name] = `headers.amqp_receivedRoutingKey.getName.split("/")[${param.position}]`;
}
}
}

function artifactId([info, params]) {
return scsLib.getParamOrDefault(info, params, 'artifactId', 'x-artifact-id');
}
Expand Down Expand Up @@ -338,7 +374,7 @@ function functionSpecs([asyncapi, params]) {
}
filter.functionSpecs = functionSpecs;

// This returns the non-SCS type functions for sending to dynamic topics.
// This returns the non-SCS type functions for sending to dynamic channels.
function getDynamicFunctions([asyncapi, params]) {
const functionMap = new Map();
debugDynamic('start:');
Expand All @@ -350,10 +386,10 @@ function getDynamicFunctions([asyncapi, params]) {
if (publisher) {
debugDynamic('found publisher:');
debugDynamic(publisher);
const topicInfo = getTopicInfo(channelName, channel);
if (topicInfo.hasParams) {
const channelInfo = getChannelInfo(params, channelName, channel);
if (channelInfo.hasParams) {
const spec = {};
spec.topicInfo = topicInfo;
spec.channelInfo = channelInfo;
spec.payloadClass = getPayloadClass(publisher);
spec.sendMethodName = getSendFunctionName(channelName, publisher);
functionMap.set(spec.sendMethodName, spec);
Expand Down Expand Up @@ -548,7 +584,7 @@ function getAdditionalSubs(asyncapi, params) {
let ret;
const funcs = getFunctionSpecs(asyncapi, params);
funcs.forEach((spec, name, map) => {
debugAppProperties(`getAdditionalSubs: ${spec.name} ${spec.isQueueWithSubscription} ${spec.additionalSubscriptions}`);
debugProperty(`getAdditionalSubs: ${spec.name} ${spec.isQueueWithSubscription} ${spec.additionalSubscriptions}`);
// The first additional subscription will be the destination. If there is more than one the rest go here.
if (spec.isQueueWithSubscription && spec.additionalSubscriptions.length > 1) {
if (!ret) {
Expand Down Expand Up @@ -653,15 +689,16 @@ function getFunctionSpecs(asyncapi, params) {
functionSpec.type = 'function';
debugFunction('Found existing subscriber, so this is a function.');
} else {
const topicInfo = getTopicInfo(channelName, channel);
const channelInfo = getChannelInfo(params, channelName, channel);
functionSpec = new SCSFunction();
functionSpec.name = name;
functionSpec.type = 'supplier';
functionSpec.reactive = reactive;
functionSpec.dynamic = topicInfo.hasParams;
functionSpec.topicInfo = topicInfo;
functionSpec.dynamic = channelInfo.hasParams;
functionSpec.channelInfo = channelInfo;
functionSpec.sendMethodName = getSendFunctionName(channelName, publish);
functionSpec.dynamicType = params.dynamicType;
functionSpec.parametersToHeaders = params.parametersToHeaders;
functionMap.set(name, functionSpec);
}
const payload = getPayloadClass(publish);
Expand All @@ -685,7 +722,7 @@ function getFunctionSpecs(asyncapi, params) {
debugFunction(`This already exists: ${name} isQueueWithSubscription: ${functionSpec.isQueueWithSubscription}`);
if (functionSpec.isQueueWithSubscription) { // This comes from an smf binding to a queue.
debugFunction(functionSpec);
for (const sub of smfBinding.topicSubscriptions) {
for (const sub of smfBinding.channelSubscriptions) {
let foundIt = false;
for (const existingSub of functionSpec.additionalSubscriptions) {
debugFunction(`Comparing ${sub} to ${existingSub}`);
Expand Down Expand Up @@ -718,10 +755,15 @@ function getFunctionSpecs(asyncapi, params) {
}
} else {
debugFunction('This is a new one.');
const channelInfo = getChannelInfo(params, channelName, channel);
functionSpec = new SCSFunction();
functionSpec.name = name;
functionSpec.type = 'consumer';
functionSpec.reactive = reactive;
functionSpec.dynamic = channelInfo.hasParams;
functionSpec.channelInfo = channelInfo;
functionSpec.dynamicType = params.dynamicType;
functionSpec.parametersToHeaders = params.parametersToHeaders;
functionMap.set(name, functionSpec);
if (smfBinding && smfBinding.queueName && smfBinding.topicSubscriptions) {
debugFunction(`A new one with subscriptions: ${smfBinding.topicSubscriptions}`);
Expand Down Expand Up @@ -749,10 +791,10 @@ function getFunctionSpecs(asyncapi, params) {
functionSpec.subscribeChannel = dest;
} else if (functionSpec.isQueueWithSubscription) {
functionSpec.subscribeChannel = functionSpec.additionalSubscriptions[0];
debugFunction(`Setting subscribeChannel for topicWithSubs: ${functionSpec.subscribeChannel}`);
debugFunction(`Setting subscribeChannel for channelWithSubs: ${functionSpec.subscribeChannel}`);
} else {
const topicInfo = getTopicInfo(channelName, channel);
functionSpec.subscribeChannel = topicInfo.subscribeTopic;
const channelInfo = getChannelInfo(params, channelName, channel);
functionSpec.subscribeChannel = channelInfo.subscribeChannel;
}
}

Expand Down Expand Up @@ -811,29 +853,42 @@ function getSolace(params) {
return ret;
}

// This returns an object containing information the template needs to render topic strings.
function getTopicInfo(channelName, channel) {
// This returns an object containing information the template needs to render channel strings.
function getChannelInfo(params, channelName, channel) {
const ret = {};
let publishTopic = String(channelName);
let subscribeTopic = String(channelName);
const params = [];

// This isfor the parameterToHeader feature.
const delimiter = (params.binder === 'rabbit' || params.binder === 'kafka') ? '.' : '/';
const channelParts = channelName.split(delimiter);

let publishChannel = String(channelName);
let subscribeChannel = String(channelName);
const parameters = [];
let functionParamList = '';
let functionArgList = '';
let sampleArgList = '';
let first = true;

debugTopic('params:');
debugTopic(channel.parameters());
debugChannel('parameters:');
debugChannel(channel.parameters());
for (const name in channel.parameters()) {
const nameWithBrackets = `{${ name }}`;
const nameWithBrackets = `{${name}}`;
const parameter = channel.parameter(name);
const schema = parameter.schema();
const type = getType(schema.type(), schema.format());
const param = { name: _.camelCase(name) };
debugTopic(`name: ${name} type:`);
debugTopic(type);
debugChannel(`name: ${name} type:`);
debugChannel(type);
let sampleArg = 1;

// Figure out what position it's in. This is just for the parameterToHeader feature.
for (let i = 0; i < channelParts.length; i++) {
if (channelParts[i] === nameWithBrackets) {
param.position = i;
break;
}
}

if (first) {
first = false;
} else {
Expand All @@ -842,51 +897,55 @@ function getTopicInfo(channelName, channel) {
}

sampleArgList += ', ';

if (type) {
debugTopic('It is a type:');
debugTopic(type);
const javaType = type.javaType || typeMap.get(type);
if (!javaType) throw new Error(`topicInfo filter: type not found in typeMap: ${type}`);
param.type = javaType;
const printfArg = type.printFormat;
debugTopic(`printf: ${printfArg}`);
if (!printfArg) throw new Error(`topicInfo filter: printFormat not found in formatMap: ${type}`);
debugTopic(`Replacing ${nameWithBrackets}`);
publishTopic = publishTopic.replace(nameWithBrackets, printfArg);
sampleArg = type.sample;
} else {
const en = schema.enum();
if (en) {
debugTopic(`It is an enum: ${en}`);
param.type = _.upperFirst(name);
param.enum = en;
sampleArg = `Messaging.${param.type}.${en[0]}`;
debugTopic(`Replacing ${nameWithBrackets}`);
publishTopic = publishTopic.replace(nameWithBrackets, '%s');
} else {
throw new Error(`topicInfo filter: Unknown parameter type: ${ JSON.stringify(schema)}`);
}
}

param.sampleArg = sampleArg;
subscribeTopic = subscribeTopic.replace(nameWithBrackets, '*');
[publishChannel, sampleArg] = handleParameterType(name, param, type, publishChannel, schema, nameWithBrackets);
subscribeChannel = subscribeChannel.replace(nameWithBrackets, '*');
functionParamList += `${param.type} ${param.name}`;
functionArgList += param.name;
sampleArgList += sampleArg;
params.push(param);
parameters.push(param);
}
ret.functionArgList = functionArgList;
ret.functionParamList = functionParamList;
ret.sampleArgList = sampleArgList;
ret.channelName = channelName;
ret.params = params;
ret.publishTopic = publishTopic;
ret.subscribeTopic = subscribeTopic;
ret.hasParams = params.length > 0;
ret.parameters = parameters;
ret.publishChannel = publishChannel;
ret.subscribeChannel = subscribeChannel;
ret.hasParams = parameters.length > 0;
return ret;
}

function handleParameterType(name, param, type, publishChannel, schema, nameWithBrackets) {
let sampleArg = 1;
if (type) {
debugChannel('It is a type:');
debugChannel(type);
const javaType = type.javaType || typeMap.get(type);
if (!javaType) throw new Error(`channelInfo filter: type not found in typeMap: ${type}`);
param.type = javaType;
const printfArg = type.printFormat;
debugChannel(`printf: ${printfArg}`);
if (!printfArg) throw new Error(`channelInfo filter: printFormat not found in formatMap: ${type}`);
debugChannel(`Replacing ${nameWithBrackets}`);
publishChannel = publishChannel.replace(nameWithBrackets, printfArg);
sampleArg = type.sample;
} else {
const en = schema.enum();
if (en) {
debugChannel(`It is an enum: ${en}`);
param.type = _.upperFirst(name);
param.enum = en;
sampleArg = `Messaging.${param.type}.${en[0]}`;
debugChannel(`Replacing ${nameWithBrackets}`);
publishChannel = publishChannel.replace(nameWithBrackets, '%s');
} else {
throw new Error(`channelInfo filter: Unknown parameter type: ${ JSON.stringify(schema)}`);
}
}
param.sampleArg = sampleArg;
return [publishChannel, sampleArg];
}

function indent(numTabs) {
return '\t'.repeat(numTabs);
}
Expand Down
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@
"required": false,
"default": "default"
},
"parametersToHeaders": {
"description": "If true, this will create headers on the incoming messages for each channel parameter. Currently this only works with messages originating from Solace (using the solace_destination header) and RabbitMQ (using the amqp_receivedRoutingKey header.)",
"required": false,
"default": false
},
"password": {
"description": "The client password connection property. Currently this only works with the Solace binder.",
"required": false,
Expand Down
Loading

0 comments on commit 8594d04

Please sign in to comment.