From 62963a590100cec21eb23449276125af79e8c67d Mon Sep 17 00:00:00 2001 From: kateyeo <88804869+kateyeo@users.noreply.github.com> Date: Tue, 27 Jun 2023 09:56:33 -0700 Subject: [PATCH 1/2] Add skipRetryStrategy to RetryLibrary and necessary change to logic of retryingCallable to allow errors to be skipped (#1252) --- .../transfer/CallableImporter.java | 2 +- .../types/transfer/errors/ErrorDetail.java | 9 +++- .../types/transfer/retry/RetryException.java | 10 ++++ .../types/transfer/retry/RetryStrategy.java | 8 +++- .../transfer/retry/RetryingCallable.java | 6 ++- .../transfer/retry/SkipRetryStrategy.java | 48 +++++++++++++++++++ 6 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/SkipRetryStrategy.java diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableImporter.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableImporter.java index a71352d13..6610cc5f3 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableImporter.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableImporter.java @@ -70,7 +70,7 @@ public ImportResult call() throws Exception { Collection errors = idempotentImportExecutor.getRecentErrors(); success = result.getType() == ImportResult.ResultType.OK && errors.isEmpty(); - if (!success) { + if (!success && errors.iterator().hasNext() && !errors.iterator().next().canSkip()) { throw new IOException( "Problem with importer, forcing a retry, " + "first error: " diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/errors/ErrorDetail.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/errors/ErrorDetail.java index 3d371ec97..2b188fc8d 100644 --- a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/errors/ErrorDetail.java +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/errors/ErrorDetail.java @@ -36,7 +36,8 @@ public abstract class ErrorDetail { private static final String DATA_KEY = "Data"; public static ErrorDetail.Builder builder() { - return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder(); + return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder() + .setCanSkip(false); } @JsonProperty("id") @@ -48,6 +49,9 @@ public static ErrorDetail.Builder builder() { @JsonProperty("exception") public abstract String exception(); + @JsonProperty("canSkip") + public abstract boolean canSkip(); + @AutoValue.Builder public abstract static class Builder { @JsonCreator @@ -65,5 +69,8 @@ private static ErrorDetail.Builder create() { @JsonProperty("exception") public abstract Builder setException(String exception); + + @JsonProperty("canSkip") + public abstract Builder setCanSkip(boolean canSkip); } } diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryException.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryException.java index 1b3a0f8e8..3cfb36b37 100644 --- a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryException.java +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryException.java @@ -22,10 +22,16 @@ public class RetryException extends Exception { private final int triesSoFar; + private final boolean canSkip; RetryException(int triesSoFar, Exception exception) { + this(triesSoFar, exception, false); + } + + RetryException(int triesSoFar, Exception exception, boolean canSkip) { super(exception); this.triesSoFar = triesSoFar; + this.canSkip = canSkip; } @Override @@ -36,4 +42,8 @@ public Exception getCause() { public int getTriesSoFar() { return triesSoFar; } + + public boolean canSkip() { + return canSkip; + } } diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryStrategy.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryStrategy.java index 35e956a5a..0a9f9b087 100644 --- a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryStrategy.java +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryStrategy.java @@ -33,7 +33,8 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = UniformRetryStrategy.class, name = "Uniform"), @JsonSubTypes.Type(value = ExponentialBackoffStrategy.class, name = "Exponential"), - @JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal") + @JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal"), + @JsonSubTypes.Type(value = SkipRetryStrategy.class, name = "Skip") }) public interface RetryStrategy { @@ -52,4 +53,9 @@ public interface RetryStrategy { * Gets milliseconds until the next retry, given elapsed time so far */ long getRemainingIntervalMillis(int tries, long elapsedMillis); + + /** Shows whether exception should be skipped */ + default boolean canSkip() { + return false; + } } diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java index 9fa9ff9a2..1c74f57a9 100644 --- a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/RetryingCallable.java @@ -116,7 +116,11 @@ public T call() throws RetryException { monitor.debug( () -> String.format("Strategy canTryAgain returned false after %d retries", attempts)); - throw new RetryException(attempts, mostRecentException); + if (strategy.canSkip()) { + throw new RetryException(attempts, mostRecentException, true); + } else { + throw new RetryException(attempts, mostRecentException); + } } } } diff --git a/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/SkipRetryStrategy.java b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/SkipRetryStrategy.java new file mode 100644 index 000000000..9274c2a35 --- /dev/null +++ b/portability-types-transfer/src/main/java/org/datatransferproject/types/transfer/retry/SkipRetryStrategy.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 The Data Transfer Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.datatransferproject.types.transfer.retry; + +/** {@link RetryStrategy} that allows exception to be skipped. Useful for known non fatal errors. */ +public class SkipRetryStrategy implements RetryStrategy { + + public SkipRetryStrategy() {} + + @Override + public boolean canTryAgain(int tries) { + return false; + } + + @Override + public long getNextIntervalMillis(int tries) { + return -1L; + } + + @Override + public long getRemainingIntervalMillis(int tries, long elapsedMillis) { + return -1L; + } + + @Override + public boolean canSkip() { + return true; + } + + @Override + public String toString() { + return "SkipRetryStrategy{}"; + } +} \ No newline at end of file From 1136476dd4dd15a5dd377c4173f109ea978d3915 Mon Sep 17 00:00:00 2001 From: kateyeo <88804869+kateyeo@users.noreply.github.com> Date: Wed, 28 Jun 2023 09:44:05 -0700 Subject: [PATCH 2/2] Add flag guarded retrying idempotent executor to google importer (#1254) --- .../google/GoogleTransferExtension.java | 12 +++- .../google/photos/GooglePhotosImporter.java | 57 ++++++++++++++++++- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java index 9d325cad9..4051d232e 100644 --- a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java +++ b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/GoogleTransferExtension.java @@ -37,6 +37,8 @@ import org.datatransferproject.datatransfer.google.videos.GoogleVideosImporter; import org.datatransferproject.spi.cloud.storage.AppCredentialStore; import org.datatransferproject.spi.cloud.storage.JobStore; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension; import org.datatransferproject.types.common.models.DataVertical; import org.datatransferproject.spi.transfer.extension.TransferExtension; import org.datatransferproject.spi.transfer.provider.Exporter; @@ -107,6 +109,10 @@ public void initialize(ExtensionContext context) { GoogleCredentialFactory credentialFactory = new GoogleCredentialFactory(httpTransport, jsonFactory, appCredentials, monitor); + IdempotentImportExecutor idempotentImportExecutor = context.getService( + IdempotentImportExecutorExtension.class).getRetryingIdempotentImportExecutor(context); + boolean enableRetrying = context.getSetting("enableRetrying", false); + ImmutableMap.Builder importerBuilder = ImmutableMap.builder(); importerBuilder.put(BLOBS, new DriveImporter(credentialFactory, jobStore, monitor)); importerBuilder.put(CONTACTS, new GoogleContactsImporter(credentialFactory)); @@ -120,7 +126,9 @@ public void initialize(ExtensionContext context) { jobStore, jsonFactory, monitor, - context.getSetting("googleWritesPerSecond", 1.0))); + context.getSetting("googleWritesPerSecond", 1.0), + idempotentImportExecutor, + enableRetrying)); importerBuilder.put(VIDEOS, new GoogleVideosImporter(appCredentials, jobStore, monitor)); importerMap = importerBuilder.build(); @@ -141,4 +149,4 @@ public void initialize(ExtensionContext context) { initialized = true; } -} +} \ No newline at end of file diff --git a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java index 2db0be2d6..b70df968c 100644 --- a/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java +++ b/extensions/data-transfer/portability-data-transfer-google/src/main/java/org/datatransferproject/datatransfer/google/photos/GooglePhotosImporter.java @@ -78,6 +78,29 @@ public class GooglePhotosImporter private final Map photosInterfacesMap; private final GooglePhotosInterface photosInterface; private final HashMap multilingualStrings = new HashMap<>(); + private IdempotentImportExecutor retryingIdempotentExecutor; + private Boolean enableRetrying; + + public GooglePhotosImporter( + GoogleCredentialFactory credentialFactory, + JobStore jobStore, + JsonFactory jsonFactory, + Monitor monitor, + double writesPerSecond, + IdempotentImportExecutor retryingIdempotentExecutor, + boolean enableRetrying) { + this( + credentialFactory, + jobStore, + jsonFactory, + new HashMap<>(), + null, + new ConnectionProvider(jobStore), + monitor, + writesPerSecond, + retryingIdempotentExecutor, + enableRetrying); + } public GooglePhotosImporter( GoogleCredentialFactory credentialFactory, @@ -106,6 +129,30 @@ public GooglePhotosImporter( ConnectionProvider connectionProvider, Monitor monitor, double writesPerSecond) { + this( + credentialFactory, + jobStore, + jsonFactory, + photosInterfacesMap, + photosInterface, + connectionProvider, + monitor, + writesPerSecond, + null, + false); + } + + GooglePhotosImporter( + GoogleCredentialFactory credentialFactory, + JobStore jobStore, + JsonFactory jsonFactory, + Map photosInterfacesMap, + GooglePhotosInterface photosInterface, + ConnectionProvider connectionProvider, + Monitor monitor, + double writesPerSecond, + IdempotentImportExecutor retryingIdempotentExecutor, + boolean enableRetrying) { this.credentialFactory = credentialFactory; this.jobStore = jobStore; this.jsonFactory = jsonFactory; @@ -114,6 +161,8 @@ public GooglePhotosImporter( this.connectionProvider = connectionProvider; this.monitor = monitor; this.writesPerSecond = writesPerSecond; + this.retryingIdempotentExecutor = retryingIdempotentExecutor; + this.enableRetrying = enableRetrying; } // TODO(aksingh737) WARNING: stop maintaining this code here; this needs to be reconciled against @@ -131,10 +180,12 @@ public ImportResult importItem( // Nothing to do return ImportResult.OK; } - GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, idempotentImportExecutor, authData); + IdempotentImportExecutor executor = + (retryingIdempotentExecutor != null && enableRetrying) ? retryingIdempotentExecutor : idempotentImportExecutor; + GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, executor, authData); for (PhotoAlbum album : data.getAlbums()) { - idempotentImportExecutor.executeAndSwallowIOExceptions( + executor.executeAndSwallowIOExceptions( album.getId(), album.getName(), () -> importSingleAlbum(jobId, authData, album)); } long bytes = importPhotos(data.getPhotos(), gPhotosUpload); @@ -157,7 +208,7 @@ String importSingleAlbum(UUID jobId, TokensAndUrlAuthData authData, PhotoAlbum i @VisibleForTesting // TODO(aksingh737,jzacsh) stop exposing this to unit tests public long importPhotos(Collection photos, GPhotosUpload gPhotosUpload) - throws Exception { + throws Exception { return gPhotosUpload.uploadItemsViaBatching(photos, this::importPhotoBatch); }