Skip to content

Commit

Permalink
Use namespaced variables
Browse files Browse the repository at this point in the history
  • Loading branch information
IceS2 committed Feb 3, 2025
1 parent 04cab8e commit 018fdc7
Show file tree
Hide file tree
Showing 46 changed files with 421 additions and 591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ public class Workflow {
public static final String RELATED_ENTITY_VARIABLE = "relatedEntity";
public static final String PAYLOAD = "payload";
public static final String RESULT_VARIABLE = "result";
public static final String RESOLVED_BY_VARIABLE = "resolvedBy";
public static final String UPDATED_BY_VARIABLE = "updatedBy";
public static final String STAGE_INSTANCE_STATE_ID_VARIABLE = "stageInstanceStateId";
public static final String WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE =
"workflowInstanceExecutionId";
public static final String WORKFLOW_RUNTIME_EXCEPTION = "workflowRuntimeException";
public static final String EXCEPTION_VARIABLE = "exception";
public static final String GLOBAL_NAMESPACE = "global";
private final TriggerWorkflow triggerWorkflow;
private final MainWorkflow mainWorkflow;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType.GOVERNANCE_WORKFLOW_CHANGE_EVENT;
import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getNamespacedVariableName;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -60,7 +63,9 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {

Map<String, Object> variables = new HashMap<>();

variables.put("relatedEntity", entityLink.getLinkString());
variables.put(
getNamespacedVariableName(GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE),
entityLink.getLinkString());

WorkflowHandler.getInstance().triggerWithSignal(signal, variables);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -19,12 +21,15 @@
import org.flowable.engine.RepositoryService;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.TaskService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;
import org.flowable.engine.repository.ProcessDefinition;
import org.flowable.engine.runtime.Execution;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;
import org.flowable.task.service.delegate.DelegateTask;
import org.flowable.variable.api.delegate.VariableScope;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.service.Entity;
Expand Down Expand Up @@ -198,19 +203,59 @@ public void setCustomTaskId(String taskId, UUID customTaskId) {
taskService.setVariable(taskId, "customTaskId", customTaskId.toString());
}

public String getParentActivityId(String executionId) {
String activityId = null;

Execution execution =
runtimeService.createExecutionQuery().executionId(executionId).singleResult();

if (execution != null && execution.getParentId() != null) {
Execution parentExecution =
runtimeService.createExecutionQuery().executionId(execution.getParentId()).singleResult();

if (parentExecution != null) {
activityId = parentExecution.getActivityId();
}
}

return activityId;
}

private Task getTaskFromCustomTaskId(UUID customTaskId) {
return taskService
.createTaskQuery()
.processVariableValueEquals("customTaskId", customTaskId.toString())
.singleResult();
}

public Map<String, Object> transformToNodeVariables(
UUID customTaskId, Map<String, Object> variables) {
Map<String, Object> namespacedVariables = null;
Optional<Task> oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId));

if (oTask.isPresent()) {
Task task = oTask.get();
String namespace = getParentActivityId(task.getExecutionId());
namespacedVariables = new HashMap<>();
for (Map.Entry<String, Object> entry : variables.entrySet()) {
namespacedVariables.put(
getNamespacedVariableName(namespace, entry.getKey()), entry.getValue());
}

} else {
// LOG
}

return namespacedVariables;
}

public void resolveTask(UUID taskId) {
resolveTask(taskId, null);
}

public void resolveTask(UUID customTaskId, Map<String, Object> variables) {
try {
Optional<Task> oTask =
Optional.ofNullable(
taskService
.createTaskQuery()
.processVariableValueEquals("customTaskId", customTaskId.toString())
.singleResult());

Optional<Task> oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId));
if (oTask.isPresent()) {
Task task = oTask.get();
Optional.ofNullable(variables)
Expand Down Expand Up @@ -323,4 +368,41 @@ public void terminateWorkflow(String workflowName) {
instance.getId(), "Terminating all instances due to user request.");
});
}

public static String getNamespacedVariableName(String namespace, String varName) {
if (namespace != null) {
return String.format("%s_%s", namespace, varName);
} else {
return varName;
}
}

public static void setNodeVariable(DelegateExecution execution, String varName, Object varValue) {
String namespace = execution.getParent().getCurrentActivityId();
setNamespacedVariable(execution, namespace, varName, varValue);
}

public static void setGlobalVariable(VariableScope execution, String varName, Object varValue) {
setNamespacedVariable(execution, GLOBAL_NAMESPACE, varName, varValue);
}

public static void setNamespacedVariable(
VariableScope execution, String namespace, String varName, Object varValue) {
execution.setVariable(getNamespacedVariableName(namespace, varName), varValue);
}

public static Object getNamespacedVariable(
VariableScope execution, String namespace, String varName) {
return execution.getVariable(getNamespacedVariableName(namespace, varName));
}

public static void setNodeVariable(DelegateTask delegateTask, String varName, Object varValue) {
WorkflowHandler workflowHandler = WorkflowHandler.getInstance();
String namespace = workflowHandler.getParentActivityId(delegateTask.getExecutionId());
if (namespace != null) {
setNamespacedVariable(delegateTask, namespace, varName, varValue);
} else {
// Raise Error
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.openmetadata.service.governance.workflows;

import static org.openmetadata.service.governance.workflows.Workflow.GLOBAL_NAMESPACE;
import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_INSTANCE_EXECUTION_ID_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getNamespacedVariable;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId;

import java.util.UUID;
Expand All @@ -15,7 +17,8 @@ public class WorkflowInstanceExecutionIdSetterListener implements JavaDelegate {
public void execute(DelegateExecution execution) {
try {
String workflowName = getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
String relatedEntity = (String) execution.getVariable(RELATED_ENTITY_VARIABLE);
String relatedEntity =
(String) getNamespacedVariable(execution, GLOBAL_NAMESPACE, RELATED_ENTITY_VARIABLE);
LOG.debug(
String.format(
"New Execution for Workflow '%s'. Related Entity: '%s'",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.openmetadata.service.governance.workflows.elements;

import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE;
import static org.openmetadata.service.governance.workflows.WorkflowHandler.getNamespacedVariableName;

import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.Process;
Expand All @@ -13,17 +14,15 @@ public class Edge {
public Edge(org.openmetadata.schema.governance.workflows.elements.EdgeDefinition edgeDefinition) {
SequenceFlow edge = new SequenceFlow(edgeDefinition.getFrom(), edgeDefinition.getTo());
if (!CommonUtil.nullOrEmpty(edgeDefinition.getCondition())) {
edge.setConditionExpression(getFlowableCondition(edgeDefinition.getCondition()));
edge.setConditionExpression(
getFlowableCondition(edgeDefinition.getFrom(), edgeDefinition.getCondition()));
}
this.edge = edge;
}

private String getFlowableCondition(boolean condition) {
if (condition) {
return String.format("${%s}", RESULT_VARIABLE);
} else {
return String.format("${!%s}", RESULT_VARIABLE);
}
private String getFlowableCondition(String from, String condition) {
return String.format(
"${%s == '%s'}", getNamespacedVariableName(from, RESULT_VARIABLE), condition);
}

public void addToWorkflow(BpmnModel model, Process process) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import org.openmetadata.schema.governance.workflows.elements.NodeSubType;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.JsonLogicTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTaskDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.endEvent.EndEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.StartEventDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.JsonLogicFilterTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.NoOpTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask;
import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetGlossaryTermStatusTask;
import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent;
Expand All @@ -30,8 +27,6 @@ public static NodeInterface createNode(WorkflowNodeDefinitionInterface nodeDefin
case SET_GLOSSARY_TERM_STATUS_TASK -> new SetGlossaryTermStatusTask(
(SetGlossaryTermStatusTaskDefinition) nodeDefinition);
case USER_APPROVAL_TASK -> new UserApprovalTask((UserApprovalTaskDefinition) nodeDefinition);
case PYTHON_WORKFLOW_AUTOMATION_TASK -> new NoOpTask(nodeDefinition);
case JSON_LOGIC_TASK -> new JsonLogicFilterTask((JsonLogicTaskDefinition) nodeDefinition);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import org.openmetadata.schema.governance.workflows.TriggerType;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.CustomSignalTriggerDefinition;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.service.governance.workflows.elements.triggers.CustomSignalTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.EventBasedEntityTrigger;
import org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger;

Expand All @@ -18,10 +16,6 @@ public static TriggerInterface createTrigger(WorkflowDefinition workflow) {
workflow.getName(),
triggerWorkflowId,
(EventBasedEntityTriggerDefinition) workflow.getTrigger());
case CUSTOM_SIGNAL -> new CustomSignalTrigger(
workflow.getName(),
triggerWorkflowId,
(CustomSignalTriggerDefinition) workflow.getTrigger());
case PERIODIC_BATCH_ENTITY -> new PeriodicBatchEntityTrigger(
workflow.getName(),
triggerWorkflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder;
import org.openmetadata.service.util.JsonUtils;

public class CheckEntityAttributesTask implements NodeInterface {
private final SubProcess subProcess;
Expand All @@ -33,7 +34,10 @@ public CheckEntityAttributesTask(CheckEntityAttributesTaskDefinition nodeDefinit
new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build();

ServiceTask checkEntityAttributes =
getCheckEntityAttributesServiceTask(subProcessId, nodeDefinition.getConfig().getRules());
getCheckEntityAttributesServiceTask(
subProcessId,
nodeDefinition.getConfig().getRules(),
JsonUtils.pojoToJson(nodeDefinition.getInputNamespaceMap()));

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build();
Expand All @@ -54,16 +58,23 @@ public BoundaryEvent getRuntimeExceptionBoundaryEvent() {
return runtimeExceptionBoundaryEvent;
}

private ServiceTask getCheckEntityAttributesServiceTask(String subProcessId, String rules) {
private ServiceTask getCheckEntityAttributesServiceTask(
String subProcessId, String rules, String inputNamespaceMap) {
FieldExtension rulesExpr =
new FieldExtensionBuilder().fieldName("rulesExpr").fieldValue(rules).build();
FieldExtension inputNamespaceMapExpr =
new FieldExtensionBuilder()
.fieldName("inputNamespaceMapExpr")
.fieldValue(inputNamespaceMap)
.build();

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "checkEntityAttributes"))
.implementation(CheckEntityAttributesImpl.class.getName())
.build();
serviceTask.getFieldExtensions().add(rulesExpr);
serviceTask.getFieldExtensions().add(inputNamespaceMapExpr);
return serviceTask;
}

Expand Down

This file was deleted.

Loading

0 comments on commit 018fdc7

Please sign in to comment.