Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2190] Implement ActivityConfigurationStrategy for Temporal Activities #4093

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,19 @@ public interface GobblinTemporalConfigurationKeys {

String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds";
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;

/**
* Activities timeout configs
*/
String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = "activity.starttoclose.timeout.minutes";
String GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import java.time.Duration;
import java.util.Properties;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.PropertiesUtils;


/**
* Interface for defining timeout strategies for different Temporal activities.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] timeout strategies -> "activity configuration strategies"

* Each strategy provides a method to retrieve the timeout duration based on the provided properties.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change as per scope of new class to something like -> "Each strategy provides a method to retrieve configuration details, such as timeout duration, based on the provided properties."

*/
public interface ActivityConfigurationStrategy {
/** Default start to close timeout duration for any activity if not specified. */
Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5;
int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10;
int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;

/**
* Retrieves the start to close timeout duration for an activity based on the provided properties.
*
* @param props the properties to be used for configuring the timeout.
* @return the timeout duration for the activity.
*/
Duration getStartToCloseTimeout(Properties props);

/**
* Configuration strategy for the Generate Workunits activity.
*/
class GenerateWorkunitsActivityConfigurationStrategy implements ActivityConfigurationStrategy {
@Override
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
props,
GobblinTemporalConfigurationKeys.GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
));
}
}

/**
* Configuration strategy for the Recommend Scaling activity.
*/
class RecommendScalingActivityConfigurationStrategy implements ActivityConfigurationStrategy {
@Override
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
props,
GobblinTemporalConfigurationKeys.RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
));
}
}

/**
* Configuration strategy for the Delete Work Dirs activity.
*/
class DeleteWorkDirsActivityConfigurationStrategy implements ActivityConfigurationStrategy {
@Override
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
props,
GobblinTemporalConfigurationKeys.DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
));
}
}

/**
* Configuration strategy for the Process Workunit activity.
*/
class ProcessWorkunitActivityConfigurationStrategy implements ActivityConfigurationStrategy {
@Override
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
props,
GobblinTemporalConfigurationKeys.PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
));
}
}

/**
* Configuration strategy for the Commit activity.
*/
class CommitActivityConfigurationStrategy implements ActivityConfigurationStrategy {
@Override
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
props,
GobblinTemporalConfigurationKeys.COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;


/**
* Enum representing different types of activities in the Temporal workflow.
* Each activity type corresponds to a specific operation that can be performed.
*/
public enum ActivityType {
/** Activity type for generating work units. */
GENERATE_WORKUNITS,

/** Activity type for recommending scaling operations. */
RECOMMEND_SCALING,

/** Activity type for deleting work directories. */
DELETE_WORK_DIRS,

/** Activity type for processing a work unit. */
PROCESS_WORKUNIT,

/** Activity type for committing step. */
COMMIT,

/** Default placeholder activity type. */
DEFAULT_ACTIVITY
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void submitJob(List<WorkUnit> workunits) {
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));

ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
workflow.process(wuSpec, jobProps);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.util;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.google.common.annotations.VisibleForTesting;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import lombok.experimental.UtilityClass;

import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;


/** Utility class for handling Temporal Activity related operations. */
@UtilityClass
public class TemporalActivityUtils {

@VisibleForTesting
protected static final RetryOptions DEFAULT_RETRY_OPTIONS = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(3))
.setMaximumInterval(Duration.ofSeconds(100))
.setBackoffCoefficient(2)
.setMaximumAttempts(4)
.build();

private static final Map<ActivityType, ActivityConfigurationStrategy> activityConfigurationStrategies = new HashMap<>();

static {
activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
activityConfigurationStrategies.put(ActivityType.COMMIT, new ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
}

/**
* Builds and returns an {@link ActivityOptions} object configured with the specified {@link ActivityType} and properties.
*
* @param activityType the type of the activity for which the options are being built.
* @param props the properties to be used for configuring the activity options.
* @return an {@link ActivityOptions} object configured with the specified activity type and properties.
*/
public static ActivityOptions buildActivityOptions(ActivityType activityType, Properties props) {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(activityType, props))
.setRetryOptions(buildRetryOptions(activityType, props))
.build();
}

/**
* Retrieves the start to close timeout duration for a given {@link ActivityType} based on the provided properties.
*
* @param activityType the type of the activity for which the start to close timeout is being retrieved.
* @param props the properties to be used for configuring the timeout.
* @return the start to close timeout duration for the specified activity type.
*/
private static Duration getStartToCloseTimeout(ActivityType activityType, Properties props) {
ActivityConfigurationStrategy activityConfigurationStrategy = activityConfigurationStrategies.get(activityType);
if (activityConfigurationStrategy == null) {
return ActivityConfigurationStrategy.defaultStartToCloseTimeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add log to track this scenario

}
return activityConfigurationStrategy.getStartToCloseTimeout(props);
}

/**
* Builds and returns an {@link RetryOptions} object configured with the specified {@link ActivityType} and properties.
*
* @param activityType the type of the activity for which the options are being built.
* @param props the properties to be used for configuring the activity options.
* @return an {@link RetryOptions} object configured with the specified activity type and properties.
*/
private static RetryOptions buildRetryOptions(ActivityType activityType, Properties props) {
// Currently returning just the default retry options for each activity type
return DEFAULT_RETRY_OPTIONS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.temporal.ddm.work;

import java.net.URI;
import java.util.Optional;

import org.apache.hadoop.fs.Path;

Expand All @@ -36,8 +35,6 @@
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;


Expand Down Expand Up @@ -75,7 +72,7 @@ public Path getJobStatePath() {
return new Path(new Path(workUnitsDir).getParent(), AbstractJobLauncher.JOB_STATE_FILE_NAME);
}

/** Configuration for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr, Workload, int, int, int, Optional)}*/
/** Configuration for {@link org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput} */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

import java.util.Properties;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;

Expand All @@ -35,5 +36,5 @@ public interface CommitStepWorkflow {
* @return number of workunits committed
*/
@WorkflowMethod
CommitStats commit(WUProcessingSpec workSpec);
CommitStats commit(WUProcessingSpec workSpec, Properties props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.temporal.ddm.workflow;

import java.util.Properties;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

Expand All @@ -30,5 +32,5 @@
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed successfully */
@WorkflowMethod
CommitStats process(WUProcessingSpec wuSpec);
CommitStats process(WUProcessingSpec workSpec, Properties props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@

package org.apache.gobblin.temporal.ddm.workflow.impl;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
Expand All @@ -42,22 +42,10 @@
@Slf4j
public class CommitStepWorkflowImpl implements CommitStepWorkflow {

private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(3))
.setMaximumInterval(Duration.ofSeconds(100))
.setBackoffCoefficient(2)
.setMaximumAttempts(4)
.build();

private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make configurable... also add activity heartbeats
.setRetryOptions(ACTIVITY_RETRY_OPTS)
.build();

private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);

@Override
public CommitStats commit(WUProcessingSpec workSpec) {
public CommitStats commit(WUProcessingSpec workSpec, final Properties props) {
final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, TemporalActivityUtils.buildActivityOptions(
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
ActivityType.COMMIT, props));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
Expand Down
Loading
Loading