Skip to content

Commit

Permalink
[vcr-2.0] Impl TOKEN (#2610)
Browse files Browse the repository at this point in the history
Implements token persist and retrieve
Approved : #2606
Cherry-pick on master
  • Loading branch information
snalli authored Oct 9, 2023
1 parent 4f50807 commit 43ca542
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public AzureCloudDestinationSync(VerifiableProperties verifiableProperties, Metr
* Tests connection to Azure blob storage
*/
protected void testAzureStorageConnectivity() {
logger.info("Testing Azure Storage connectivity");
PagedIterable<BlobContainerItem> blobContainerItemPagedIterable = azureStorageClient.listBlobContainers();
for (BlobContainerItem blobContainerItem : blobContainerItemPagedIterable) {
logger.info("Azure blob storage container = {}", blobContainerItem.getName());
Expand Down Expand Up @@ -625,16 +626,74 @@ public int compactPartition(String partitionPath) throws CloudStorageException {
return 0;
}

// Azure naming rules: https://learn.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata
public static final String TOKEN_CONTAINER = "replica-token-container";

/**
* Return AzureTokenFileName
* @param tokenLayout Token layout
* @param tokenFileName Token file name
* @return AzureTokenFileName
*/
protected String getAzureTokenFileName(AzureBlobLayoutStrategy.BlobLayout tokenLayout, String tokenFileName) {
return tokenLayout.containerName + "-" + tokenFileName;
}
@Override
public void persistTokens(String partitionPath, String tokenFileName, InputStream inputStream)
throws CloudStorageException {

/*
Would have loved to upload the token for each partition separately as an index-able file in
Azure Storage's table offering but due to lack of time and legacy constraints, here is the impl.
https://learn.microsoft.com/en-us/azure/cosmos-db/table/support?toc=https%3A%2F%2Flearn.microsoft.com%2Fen-us%2Fazure%2Fstorage%2Ftables%2Ftoc.json&bc=https%3A%2F%2Flearn.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json
Current impl:
Its copied straight from V1. The only change is the naming and location of token.
With blobLayoutStrategy = PARTITION, the V1 code stores each the token for each container in the container itself.
This makes searching for the token extremely difficult.
So for V2, I store the tokens in TOKEN_CONTAINER. This is not new.
If you set blobLayoutStrategy = CONTAINER, then this behavior ensues.
However, we want to set blobLayoutStrategy = PARTITION for V2 but still want the tokens in a separate container.
*/
// Prepare to upload token to Azure blob storage
AzureBlobLayoutStrategy.BlobLayout
tokenLayout = azureBlobLayoutStrategy.getTokenBlobLayout(partitionPath, tokenFileName);
String azureTokenFileName = getAzureTokenFileName(tokenLayout, tokenFileName);
// There is no parallelism, but we still need to create and pass this object to SDK.
BlobParallelUploadOptions blobParallelUploadOptions =
new BlobParallelUploadOptions(inputStream);
// Without content-type, get-token floods log with warnings
blobParallelUploadOptions.setHeaders(new BlobHttpHeaders().setContentType("application/octet-stream"));
try {
BlobContainerClient blobContainerClient = createOrGetBlobStore(TOKEN_CONTAINER);
// Upload token to Azure blob storage
blobContainerClient.getBlobClient(azureTokenFileName)
.uploadWithResponse(blobParallelUploadOptions, Duration.ofMillis(cloudConfig.cloudRequestTimeout),
Context.NONE);
} catch (Exception e) {
azureMetrics.absTokenPersistFailureCount.inc();
String error = String.format("Unable to persist token %s/%s due to %s", TOKEN_CONTAINER, azureTokenFileName, e.getMessage());
throw AzureCloudDestination.toCloudStorageException(error, e, azureMetrics);
}
}

@Override
public boolean retrieveTokens(String partitionPath, String tokenFileName, OutputStream outputStream)
throws CloudStorageException {
return false;
// Prepare to download token from Azure blob storage
AzureBlobLayoutStrategy.BlobLayout
tokenLayout = azureBlobLayoutStrategy.getTokenBlobLayout(partitionPath, tokenFileName);
String azureTokenFileName = getAzureTokenFileName(tokenLayout, tokenFileName);
try {
BlobContainerClient blobContainerClient = createOrGetBlobStore(TOKEN_CONTAINER);
// Download token from Azure blob storage
blobContainerClient.getBlobClient(azureTokenFileName)
.download(outputStream);
return true;
} catch (Exception e) {
azureMetrics.absTokenRetrieveFailureCount.inc();
String error = String.format("Unable to retrieve token %s/%s due to %s", TOKEN_CONTAINER, azureTokenFileName, e.getMessage());
throw AzureCloudDestination.toCloudStorageException(error, e, azureMetrics);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class AzureMetrics {
public static final String COMPACTION_PROGRESS_WRITE_ERROR_COUNT = "CompactionProgressWriteErrorCount";
public static final String ABS_TOKEN_REFRESH_ATTEMPT_COUNT = "ABSTokenRefreshAttemptCount";
public static final String ABS_TOKEN_PERSIST_FAILURE_COUNT = "ABSTokenPersistFailureCount";
public static final String ABS_TOKEN_RETRIEVE_FAILURE_COUNT = "ABSTokenRetrieveFailureCount";
public static final String ABS_FORBIDDEN_EXCEPTION_COUNT = "ABSForbiddenExceptionCount";
public static final String STORAGE_CLIENT_OPERATION_RETRY_COUNT = "StorageClientOperationRetryCount";
public static final String STORAGE_CLIENT_OPERATION_EXCEPTION_COUNT = "StorageClientOperationExceptionCount";
Expand Down Expand Up @@ -148,6 +149,8 @@ public class AzureMetrics {
public final Counter compactionProgressWriteErrorCount;
public final Counter absTokenRefreshAttemptCount;
public final Counter absTokenPersistFailureCount;
public final Counter absTokenRetrieveFailureCount;

public final Counter absForbiddenExceptionCount;
public final Counter storageClientOperationRetryCount;
public final Counter storageClientOperationExceptionCount;
Expand Down Expand Up @@ -237,6 +240,8 @@ public AzureMetrics(MetricRegistry registry) {
registry.counter(MetricRegistry.name(StorageClient.class, ABS_TOKEN_REFRESH_ATTEMPT_COUNT));
absTokenPersistFailureCount =
registry.counter(MetricRegistry.name(AzureCloudDestination.class, ABS_TOKEN_PERSIST_FAILURE_COUNT));
absTokenRetrieveFailureCount =
registry.counter(MetricRegistry.name(AzureCloudDestination.class, ABS_TOKEN_RETRIEVE_FAILURE_COUNT));
absForbiddenExceptionCount =
registry.counter(MetricRegistry.name(StorageClient.class, ABS_FORBIDDEN_EXCEPTION_COUNT));
storageClientOperationRetryCount =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Configuration;
import com.azure.core.util.ConfigurationBuilder;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.storage.blob.BlobContainerAsyncClient;
Expand Down Expand Up @@ -415,9 +416,12 @@ protected BlobServiceClient createBlobStorageSyncClient() {
try {
validateABSAuthConfigs(azureCloudConfig);
ProxyOptions proxyOptions = null;
if (cloudConfig.vcrProxyHost != null) {
if (cloudConfig.vcrProxyHost != null && !cloudConfig.vcrProxyHost.isEmpty()) {
logger.info("Using vcrProxyHost:vcrProxyPort {}:{}", cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort);
proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP,
new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
} else {
logger.info("No vcrProxyHost provided");
}
HttpClient client = new NettyAsyncHttpClientBuilder().proxy(proxyOptions).build();
/*
Expand All @@ -432,7 +436,7 @@ protected BlobServiceClient createBlobStorageSyncClient() {
RequestRetryOptions retryOptions =
new RequestRetryOptions(null, null, tryTimeoutInSeconds,
null, null, null);
return buildBlobServiceSyncClient(client, new Configuration(), retryOptions, azureCloudConfig);
return buildBlobServiceSyncClient(client, new ConfigurationBuilder().build(), retryOptions, azureCloudConfig);
} catch (MalformedURLException | InterruptedException | ExecutionException ex) {
logger.error("Error building ABS blob service client: {}", ex.getMessage());
throw new IllegalStateException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.github.ambry.utils.ByteBufferInputStream;
import com.github.ambry.utils.ByteBufferOutputStream;
import com.github.ambry.utils.MockTime;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.mockingDetails;

import static com.github.ambry.cloud.CloudTestUtil.*;
import static com.github.ambry.replication.ReplicationTest.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.cloud.azure.AzureCloudDestinationFactory;
import com.github.ambry.cloud.azure.AzuriteUtils;
import com.github.ambry.clustermap.CloudDataNode;
import com.github.ambry.clustermap.CloudReplica;
import com.github.ambry.clustermap.ClusterMap;
Expand Down Expand Up @@ -44,43 +45,57 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assume.*;


/**
* Test of the CloudTokenPersistor.
*/
@RunWith(Parameterized.class)
public class CloudTokenPersistorTest {
public static final Logger logger = LoggerFactory.getLogger(CloudTokenPersistorTest.class);

private final String replicationCloudTokenFactory;
protected String ambryBackupVersion;

/**
* Parameterized constructor.
* @param replicationCloudTokenFactory type of token factory used by {@link CloudDestination}
*/
public CloudTokenPersistorTest(String replicationCloudTokenFactory) {
super();
this.replicationCloudTokenFactory = replicationCloudTokenFactory;
public CloudTokenPersistorTest(String ambryBackupVersion) {
this.ambryBackupVersion = ambryBackupVersion;
}

/**
* static method to generate parameters.
* @return {@link Collection} of parameters.
* Test parameters
* Version 1 = Legacy VCR code and tests
* Version 2 = VCR 2.0 with Sync cloud destination - AzureCloudDestinationSync
*/
@Parameterized.Parameters
public static List<Object[]> input() {
return Arrays.asList(new Object[][]{{"com.github.ambry.cloud.azure.CosmosChangeFeedFindTokenFactory"},
{"com.github.ambry.cloud.azure.CosmosUpdateTimeFindTokenFactory"}});
public static List<Object[]> data() {
return Arrays.asList(new Object[][]{{CloudConfig.AMBRY_BACKUP_VERSION_1}, {CloudConfig.AMBRY_BACKUP_VERSION_2}});
}

@Before
public void beforeTest() {
boolean isV1 = ambryBackupVersion.equals(CloudConfig.AMBRY_BACKUP_VERSION_1);
boolean isConnectToAzurite = new AzuriteUtils().connectToAzurite();
boolean isV2 = ambryBackupVersion.equals(CloudConfig.AMBRY_BACKUP_VERSION_2);
if (!(isV1 || (isV2 && isConnectToAzurite))) {
logger.error("isV1 = {}, isV2 = {}, isConnectToAzurite = {}", isV1, isV2, isConnectToAzurite);
}
assumeTrue(isV1 || (isV2 && isConnectToAzurite));
}

@Test
public void basicTest() throws Exception {
Properties props =
VcrTestUtil.createVcrProperties("DC1", "vcrClusterName", "zkConnectString", 12310, 12410, 12510, null);
props.setProperty("replication.cloud.token.factory", replicationCloudTokenFactory);
CloudConfig cloudConfig = new CloudConfig(new VerifiableProperties(props));
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
ClusterMap clusterMap = new MockClusterMap();
Expand All @@ -106,12 +121,19 @@ public void basicTest() throws Exception {
mountPathToPartitionInfoList.computeIfAbsent(cloudReplicaId.getMountPath(), key -> ConcurrentHashMap.newKeySet())
.add(partitionInfo);

LatchBasedInMemoryCloudDestination cloudDestination =
new LatchBasedInMemoryCloudDestination(Collections.emptyList(),
AzureCloudDestinationFactory.getReplicationFeedType(new VerifiableProperties(props)), clusterMap);
CloudDestination cloudDestination = null;
MetricRegistry metricRegistry = new MetricRegistry();
if (ambryBackupVersion.equals(CloudConfig.AMBRY_BACKUP_VERSION_1)) {
cloudDestination =
new LatchBasedInMemoryCloudDestination(Collections.emptyList(),
AzureCloudDestinationFactory.getReplicationFeedType(new VerifiableProperties(props)), clusterMap);
} else if (ambryBackupVersion.equals(CloudConfig.AMBRY_BACKUP_VERSION_2)) {
cloudDestination = new AzuriteUtils().getAzuriteClient(props, metricRegistry, clusterMap);
}

ReplicationConfig replicationConfig = new ReplicationConfig(new VerifiableProperties(props));
CloudTokenPersistor cloudTokenPersistor = new CloudTokenPersistor("replicaTokens", mountPathToPartitionInfoList,
new ReplicationMetrics(new MetricRegistry(), Collections.emptyList()), clusterMap,
new ReplicationMetrics(metricRegistry, Collections.emptyList()), clusterMap,
new FindTokenHelper(blobIdFactory, replicationConfig), cloudDestination);
cloudTokenPersistor.persist(cloudReplicaId.getMountPath(), replicaTokenInfos);
List<RemoteReplicaInfo.ReplicaTokenInfo> retrievedReplicaTokenInfos =
Expand Down

0 comments on commit 43ca542

Please sign in to comment.