Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Nov 14, 2018
2 parents 8bffa1e + e88efe5 commit 85b7c1a
Show file tree
Hide file tree
Showing 32 changed files with 1,057 additions and 945 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ Below are the various artifacts published:
|conductor-common|Common models used by various conductor modules|
|conductor-core|Core Conductor module|
|conductor-redis-persistence|Persistence using Redis/Dynomite|
|conductor-es2-persistence|Indexing using Elasticsearch 2.X|
|conductor-es5-persistence|Indexing using Elasticsearch 5.X|
|conductor-jersey|Jersey JAX-RS resources for the core services|
|conductor-ui|node.js based UI for Conductor|
Expand All @@ -50,7 +49,7 @@ To build the server, use the following dependencies in your classpath:
* conductor-core
* conductor-jersey
* conductor-redis-persistence (_unless using your own persistence module_)
* conductor-es2-persistence _or_ conductor-es5-persistence (_unless using your own index module_)
* conductor-es5-persistence (_unless using your own index module_)
* conductor-contribs (_optional_)


Expand All @@ -71,7 +70,7 @@ com.netflix.conductor.dao.RedisWorkflowModule

* The default persistence used is [Dynomite](https://github.com/Netflix/dynomite)
* For queues, we are relying on [dyno-queues](https://github.com/Netflix/dyno-queues)
* The indexing backend is [Elasticsearch](https://www.elastic.co/) (2.+)
* The indexing backend is [Elasticsearch](https://www.elastic.co/) (5.x)

## Other Requirements
* JDK 1.8+
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
package com.netflix.conductor.common.metadata.tasks;

import com.github.vmg.protogen.annotations.*;

/**
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,19 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.common.metadata.tasks;

import com.github.vmg.protogen.annotations.ProtoField;
import com.github.vmg.protogen.annotations.ProtoMessage;

import java.util.Objects;

@ProtoMessage
public class PollData {
@ProtoField(id = 1)
String queueName;
private String queueName;

@ProtoField(id = 2)
String domain;
private String domain;

@ProtoField(id = 3)
String workerId;
private String workerId;

@ProtoField(id = 4)
long lastPollTime;
private long lastPollTime;

public PollData() {
super();
Expand Down Expand Up @@ -68,45 +71,20 @@ public void setLastPollTime(long lastPollTime) {
this.lastPollTime = lastPollTime;
}


@Override
public synchronized int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((domain == null) ? 0 : domain.hashCode());
result = prime * result + (int) (lastPollTime ^ (lastPollTime >>> 32));
result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
result = prime * result + ((workerId == null) ? 0 : workerId.hashCode());
return result;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PollData pollData = (PollData) o;
return getLastPollTime() == pollData.getLastPollTime() &&
Objects.equals(getQueueName(), pollData.getQueueName()) &&
Objects.equals(getDomain(), pollData.getDomain()) &&
Objects.equals(getWorkerId(), pollData.getWorkerId());
}

@Override
public synchronized boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PollData other = (PollData) obj;
if (domain == null) {
if (other.domain != null)
return false;
} else if (!domain.equals(other.domain))
return false;
if (lastPollTime != other.lastPollTime)
return false;
if (queueName == null) {
if (other.queueName != null)
return false;
} else if (!queueName.equals(other.queueName))
return false;
if (workerId == null) {
if (other.workerId != null)
return false;
} else if (!workerId.equals(other.workerId))
return false;
return true;
public int hashCode() {
return Objects.hash(getQueueName(), getDomain(), getWorkerId(), getLastPollTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,13 +677,12 @@ public Task copy() {
copy.setTaskDefName(taskDefName);
copy.setTaskType(taskType);
copy.setWorkflowInstanceId(workflowInstanceId);
copy.setWorkflowType(workflowType);
copy.setResponseTimeoutSeconds(responseTimeoutSeconds);
copy.setStatus(status);
copy.setRetryCount(retryCount);
copy.setPollCount(pollCount);
copy.setTaskId(taskId);
copy.setReasonForIncompletion(reasonForIncompletion);
copy.setWorkerId(workerId);
copy.setWorkflowTask(workflowTask);
copy.setDomain(domain);
copy.setInputMessage(inputMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.core.execution;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -68,15 +65,14 @@
*/
public class DeciderService {

private static Logger logger = LoggerFactory.getLogger(DeciderService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DeciderService.class);

private final QueueDAO queueDAO;

private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;

private final Map<String, TaskMapper> taskMappers;

private final ExternalPayloadStorageUtils externalPayloadStorageUtils;

@SuppressWarnings("ConstantConditions")
private final Predicate<Task> isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted() || SystemTaskType.isBuiltIn(task.getTaskType());
Expand Down Expand Up @@ -117,13 +113,13 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
DeciderOutcome outcome = new DeciderOutcome();

if (workflow.getStatus().equals(WorkflowStatus.PAUSED)) {
logger.debug("Workflow " + workflow.getWorkflowId() + " is paused");
LOGGER.debug("Workflow " + workflow.getWorkflowId() + " is paused");
return outcome;
}

if (workflow.getStatus().isTerminal()) {
//you cannot evaluate a terminal workflow
logger.debug("Workflow " + workflow.getWorkflowId() + " is already finished. status=" + workflow.getStatus() + ", reason=" + workflow.getReasonForIncompletion());
LOGGER.debug("Workflow " + workflow.getWorkflowId() + " is already finished. status=" + workflow.getStatus() + ", reason=" + workflow.getReasonForIncompletion());
return outcome;
}

Expand Down Expand Up @@ -187,7 +183,7 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
List<Task> nextTasks = getNextTask(workflow, pendingTask);
nextTasks.forEach(nextTask -> tasksToBeScheduled.putIfAbsent(nextTask.getReferenceTaskName(), nextTask));
outcome.tasksToBeUpdated.add(pendingTask);
logger.debug("Scheduling Tasks from {}, next = {} for workflowId: {}", pendingTask.getTaskDefName(),
LOGGER.debug("Scheduling Tasks from {}, next = {} for workflowId: {}", pendingTask.getTaskDefName(),
nextTasks.stream()
.map(Task::getTaskDefName)
.collect(Collectors.toList()),
Expand All @@ -200,14 +196,14 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
.filter(task -> !executedTaskRefNames.contains(task.getReferenceTaskName()))
.collect(Collectors.toList());
if (!unScheduledTasks.isEmpty()) {
logger.debug("Scheduling Tasks {} for workflow: {}", unScheduledTasks.stream()
LOGGER.debug("Scheduling Tasks {} for workflow: {}", unScheduledTasks.stream()
.map(Task::getTaskDefName)
.collect(Collectors.toList()),
workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow)) {
logger.debug("Marking workflow as complete. workflow=" + workflow.getWorkflowId() + ", tasks=" + workflow.getTasks());
LOGGER.debug("Marking workflow as complete. workflow=" + workflow.getWorkflowId() + ", tasks=" + workflow.getTasks());
outcome.isComplete = true;
}

Expand All @@ -217,7 +213,8 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
private List<Task> startWorkflow(Workflow workflow) throws TerminateWorkflowException {
final WorkflowDef workflowDef = workflow.getWorkflowDefinition();

logger.debug("Starting workflow " + workflowDef.getName() + "/" + workflow.getWorkflowId());
LOGGER.debug("Starting workflow {}, version{}, id {}", workflowDef.getName(), workflowDef.getVersion(), workflow.getWorkflowId());

//The tasks will be empty in case of new workflow
List<Task> tasks = workflow.getTasks();
// Check if the workflow is a re-run case or if it is a new workflow execution
Expand All @@ -227,9 +224,8 @@ private List<Task> startWorkflow(Workflow workflow) throws TerminateWorkflowExce
throw new TerminateWorkflowException("No tasks found to be executed", WorkflowStatus.COMPLETED);
}

WorkflowTask taskToSchedule = workflowDef.getTasks().get(0); //Nothing isSystemTask running yet - so schedule the first task
//Loop until a non-skipped task isSystemTask found

WorkflowTask taskToSchedule = workflowDef.getTasks().get(0); //Nothing is running yet - so schedule the first task
//Loop until a non-skipped task is found
while (isTaskSkipped(taskToSchedule, workflow)) {
taskToSchedule = workflowDef.getNextTask(taskToSchedule.getTaskReferenceName());
}
Expand All @@ -249,7 +245,7 @@ private List<Task> startWorkflow(Workflow workflow) throws TerminateWorkflowExce
return task;
})
.orElseThrow(() -> {
String reason = String.format("The workflow %s isSystemTask marked for re-run from %s but could not find the starting task",
String reason = String.format("The workflow %s is marked for re-run from %s but could not find the starting task",
workflow.getWorkflowId(), workflow.getReRunFromWorkflowId());
return new TerminateWorkflowException(reason);
});
Expand Down Expand Up @@ -441,7 +437,7 @@ Workflow populateWorkflowAndTaskData(Workflow workflow) {
void checkForTimeout(TaskDef taskDef, Task task) {

if (taskDef == null) {
logger.warn("missing task type " + task.getTaskDefName() + ", workflowId=" + task.getWorkflowInstanceId());
LOGGER.warn("missing task type " + task.getTaskDefName() + ", workflowId=" + task.getWorkflowInstanceId());
return;
}
if (task.getStatus().isTerminal() || taskDef.getTimeoutSeconds() <= 0 || !task.getStatus().equals(IN_PROGRESS)) {
Expand All @@ -450,7 +446,7 @@ void checkForTimeout(TaskDef taskDef, Task task) {

long timeout = 1000L * taskDef.getTimeoutSeconds();
long now = System.currentTimeMillis();
long elapsedTime = now - (task.getStartTime() + ((long)task.getStartDelayInSeconds() * 1000L));
long elapsedTime = now - (task.getStartTime() + ((long) task.getStartDelayInSeconds() * 1000L));

if (elapsedTime < timeout) {
return;
Expand All @@ -476,7 +472,7 @@ void checkForTimeout(TaskDef taskDef, Task task) {
@VisibleForTesting
boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
if (taskDefinition == null) {
logger.warn("missing task type : {}, workflowId= {}", task.getTaskDefName(), task.getWorkflowInstanceId());
LOGGER.warn("missing task type : {}, workflowId= {}", task.getTaskDefName(), task.getWorkflowInstanceId());
return false;
}
if (task.getStatus().isTerminal() || !task.getStatus().equals(IN_PROGRESS) || taskDefinition.getResponseTimeoutSeconds() == 0) {
Expand All @@ -492,14 +488,14 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
return false;
}

logger.debug("Evaluating responseTimeOut for Task: {}, with Task Definition: {} ", task, taskDefinition);
LOGGER.debug("Evaluating responseTimeOut for Task: {}, with Task Definition: {} ", task, taskDefinition);

long responseTimeout = 1000L * taskDefinition.getResponseTimeoutSeconds();
long now = System.currentTimeMillis();
long noResponseTime = now - task.getUpdateTime();

if (noResponseTime < responseTimeout) {
logger.debug("Current responseTime: {} has not exceeded the configured responseTimeout of {} " +
LOGGER.debug("Current responseTime: {} has not exceeded the configured responseTimeout of {} " +
"for the Task: {} with Task Definition: {}", noResponseTime, responseTimeout, task, taskDefinition);
return false;
}
Expand All @@ -510,7 +506,7 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {

private void timeoutTask(TaskDef taskDef, Task task) {
String reason = "responseTimeout: " + taskDef.getResponseTimeoutSeconds() + " exceeded for the taskId: " + task.getTaskId() + " with Task Definition: " + task.getTaskDefName();
logger.debug(reason);
LOGGER.debug(reason);
task.setStatus(TIMED_OUT);
task.setReasonForIncompletion(reason);
}
Expand Down
Loading

0 comments on commit 85b7c1a

Please sign in to comment.