-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2190] Implement ActivityConfigurationStrategy for Temporal Activities #4093
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.temporal.ddm.activity; | ||
|
||
import java.time.Duration; | ||
import java.util.Properties; | ||
|
||
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; | ||
import org.apache.gobblin.util.PropertiesUtils; | ||
|
||
|
||
/** | ||
* Interface for defining timeout strategies for different Temporal activities. | ||
* Each strategy provides a method to retrieve the timeout duration based on the provided properties. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change as per scope of new class to something like -> "Each strategy provides a method to retrieve configuration details, such as timeout duration, based on the provided properties." |
||
*/ | ||
public interface ActivityConfigurationStrategy { | ||
/** Default start to close timeout duration for any activity if not specified. */ | ||
Duration defaultStartToCloseTimeout = Duration.ofMinutes(180); | ||
int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120; | ||
int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5; | ||
int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10; | ||
int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180; | ||
int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180; | ||
|
||
/** | ||
* Retrieves the start to close timeout duration for an activity based on the provided properties. | ||
* | ||
* @param props the properties to be used for configuring the timeout. | ||
* @return the timeout duration for the activity. | ||
*/ | ||
Duration getStartToCloseTimeout(Properties props); | ||
|
||
/** | ||
* Configuration strategy for the Generate Workunits activity. | ||
*/ | ||
class GenerateWorkunitsActivityConfigurationStrategy implements ActivityConfigurationStrategy { | ||
@Override | ||
public Duration getStartToCloseTimeout(Properties props) { | ||
return Duration.ofMinutes(PropertiesUtils.getPropAsInt( | ||
props, | ||
GobblinTemporalConfigurationKeys.GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES, | ||
DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES | ||
)); | ||
} | ||
} | ||
|
||
/** | ||
* Configuration strategy for the Recommend Scaling activity. | ||
*/ | ||
class RecommendScalingActivityConfigurationStrategy implements ActivityConfigurationStrategy { | ||
@Override | ||
public Duration getStartToCloseTimeout(Properties props) { | ||
return Duration.ofMinutes(PropertiesUtils.getPropAsInt( | ||
props, | ||
GobblinTemporalConfigurationKeys.RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES, | ||
DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES | ||
)); | ||
} | ||
} | ||
|
||
/** | ||
* Configuration strategy for the Delete Work Dirs activity. | ||
*/ | ||
class DeleteWorkDirsActivityConfigurationStrategy implements ActivityConfigurationStrategy { | ||
@Override | ||
public Duration getStartToCloseTimeout(Properties props) { | ||
return Duration.ofMinutes(PropertiesUtils.getPropAsInt( | ||
props, | ||
GobblinTemporalConfigurationKeys.DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES, | ||
DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES | ||
)); | ||
} | ||
} | ||
|
||
/** | ||
* Configuration strategy for the Process Workunit activity. | ||
*/ | ||
class ProcessWorkunitActivityConfigurationStrategy implements ActivityConfigurationStrategy { | ||
@Override | ||
public Duration getStartToCloseTimeout(Properties props) { | ||
return Duration.ofMinutes(PropertiesUtils.getPropAsInt( | ||
props, | ||
GobblinTemporalConfigurationKeys.PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES, | ||
DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES | ||
)); | ||
} | ||
} | ||
|
||
/** | ||
* Configuration strategy for the Commit activity. | ||
*/ | ||
class CommitActivityConfigurationStrategy implements ActivityConfigurationStrategy { | ||
@Override | ||
public Duration getStartToCloseTimeout(Properties props) { | ||
return Duration.ofMinutes(PropertiesUtils.getPropAsInt( | ||
props, | ||
GobblinTemporalConfigurationKeys.COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES, | ||
DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES | ||
)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.temporal.ddm.activity; | ||
|
||
|
||
/** | ||
* Enum representing different types of activities in the Temporal workflow. | ||
* Each activity type corresponds to a specific operation that can be performed. | ||
*/ | ||
public enum ActivityType { | ||
/** Activity type for generating work units. */ | ||
GENERATE_WORKUNITS, | ||
|
||
/** Activity type for recommending scaling operations. */ | ||
RECOMMEND_SCALING, | ||
|
||
/** Activity type for deleting work directories. */ | ||
DELETE_WORK_DIRS, | ||
|
||
/** Activity type for processing a work unit. */ | ||
PROCESS_WORKUNIT, | ||
|
||
/** Activity type for committing step. */ | ||
COMMIT, | ||
|
||
/** Default placeholder activity type. */ | ||
DEFAULT_ACTIVITY | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.gobblin.temporal.ddm.util; | ||
|
||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
|
||
import io.temporal.activity.ActivityOptions; | ||
import io.temporal.common.RetryOptions; | ||
import lombok.experimental.UtilityClass; | ||
|
||
import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy; | ||
import org.apache.gobblin.temporal.ddm.activity.ActivityType; | ||
|
||
|
||
/** Utility class for handling Temporal Activity related operations. */ | ||
@UtilityClass | ||
public class TemporalActivityUtils { | ||
|
||
@VisibleForTesting | ||
protected static final RetryOptions DEFAULT_RETRY_OPTIONS = RetryOptions.newBuilder() | ||
.setInitialInterval(Duration.ofSeconds(3)) | ||
.setMaximumInterval(Duration.ofSeconds(100)) | ||
.setBackoffCoefficient(2) | ||
.setMaximumAttempts(4) | ||
.build(); | ||
|
||
private static final Map<ActivityType, ActivityConfigurationStrategy> activityConfigurationStrategies = new HashMap<>(); | ||
|
||
static { | ||
activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy()); | ||
activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy()); | ||
activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy()); | ||
activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy()); | ||
activityConfigurationStrategies.put(ActivityType.COMMIT, new ActivityConfigurationStrategy.CommitActivityConfigurationStrategy()); | ||
} | ||
|
||
/** | ||
* Builds and returns an {@link ActivityOptions} object configured with the specified {@link ActivityType} and properties. | ||
* | ||
* @param activityType the type of the activity for which the options are being built. | ||
* @param props the properties to be used for configuring the activity options. | ||
* @return an {@link ActivityOptions} object configured with the specified activity type and properties. | ||
*/ | ||
public static ActivityOptions buildActivityOptions(ActivityType activityType, Properties props) { | ||
return ActivityOptions.newBuilder() | ||
.setStartToCloseTimeout(getStartToCloseTimeout(activityType, props)) | ||
.setRetryOptions(buildRetryOptions(activityType, props)) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Retrieves the start to close timeout duration for a given {@link ActivityType} based on the provided properties. | ||
* | ||
* @param activityType the type of the activity for which the start to close timeout is being retrieved. | ||
* @param props the properties to be used for configuring the timeout. | ||
* @return the start to close timeout duration for the specified activity type. | ||
*/ | ||
private static Duration getStartToCloseTimeout(ActivityType activityType, Properties props) { | ||
ActivityConfigurationStrategy activityConfigurationStrategy = activityConfigurationStrategies.get(activityType); | ||
if (activityConfigurationStrategy == null) { | ||
return ActivityConfigurationStrategy.defaultStartToCloseTimeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add log to track this scenario |
||
} | ||
return activityConfigurationStrategy.getStartToCloseTimeout(props); | ||
} | ||
|
||
/** | ||
* Builds and returns an {@link RetryOptions} object configured with the specified {@link ActivityType} and properties. | ||
* | ||
* @param activityType the type of the activity for which the options are being built. | ||
* @param props the properties to be used for configuring the activity options. | ||
* @return an {@link RetryOptions} object configured with the specified activity type and properties. | ||
*/ | ||
private static RetryOptions buildRetryOptions(ActivityType activityType, Properties props) { | ||
// Currently returning just the default retry options for each activity type | ||
return DEFAULT_RETRY_OPTIONS; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] timeout strategies -> "activity configuration strategies"