diff --git a/bundles/sirix-core/build.gradle b/bundles/sirix-core/build.gradle index 4f6f589f1..6ccbd7afa 100644 --- a/bundles/sirix-core/build.gradle +++ b/bundles/sirix-core/build.gradle @@ -19,6 +19,7 @@ dependencies { api implLibraries.iouring api implLibraries.lz4 api implLibraries.roaringbitmap + api implLibraries.amazonS3 implementation implLibraries.snappyJava implementation implLibraries.browniesCollections @@ -42,6 +43,7 @@ dependencies { testImplementation testLibraries.commonsCollections4 testImplementation testLibraries.commonsCollections4Tests testImplementation testLibraries.assertjCore + testImplementation testLibraries.s3Mock } description = 'SirixDB is a hybrid on-disk and in-memory document oriented, versioned database system. It has a ' + diff --git a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java index fccb49104..85958f2eb 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java +++ b/bundles/sirix-core/src/main/java/org/sirix/access/ResourceConfiguration.java @@ -28,10 +28,22 @@ package org.sirix.access; -import com.google.common.base.MoreObjects; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonWriter; -import net.openhft.hashing.LongHashFunction; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.EOFException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + import org.checkerframework.checker.index.qual.NonNegative; import org.sirix.BinaryEncodingVersion; import org.sirix.access.trx.node.HashType; @@ -46,20 +58,12 @@ import org.sirix.settings.VersioningType; import org.sirix.utils.OS; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; +import net.openhft.hashing.LongHashFunction; /** * Holds the settings for a resource which acts as a base for session that can not change. This @@ -165,6 +169,36 @@ public static int compareStructure(final Path file) { } } + public static final class AWSStorageInformation { + private final String awsProfile; + private final String awsRegion; + private final String bucketName; //this should be same as the database name + private final boolean shouldCreateBucketIfNotExists; + + public AWSStorageInformation(String awsProfile, String awsRegion, String bucketName, + boolean shouldCreateBucketIfNotExists) { + this.awsProfile = awsProfile; + this.awsRegion = awsRegion; + this.bucketName = bucketName; + this.shouldCreateBucketIfNotExists = shouldCreateBucketIfNotExists; + } + + public String getAwsProfile() { + return awsProfile; + } + + public String getAwsRegion() { + return awsRegion; + } + + public String getBucketName() { + return bucketName; + } + + public boolean shouldCreateBucketIfNotExists() { + return shouldCreateBucketIfNotExists; + } + } // FIXED STANDARD FIELDS /** * Standard storage. @@ -297,6 +331,14 @@ public static int compareStructure(final Path file) { // END MEMBERS FOR FIXED FIELDS + /* + * Optional AWS Credentials + * */ + /* + * This could be improved in future to make it more sophisticated in terms setting the credentials + * for creating the cloud client connection + * */ + public AWSStorageInformation awsStoreInfo; /** * Get a new builder instance. * @@ -330,6 +372,9 @@ private ResourceConfiguration(final ResourceConfiguration.Builder builder) { customCommitTimestamps = builder.customCommitTimestamps; storeNodeHistory = builder.storeNodeHistory; binaryVersion = builder.binaryEncodingVersion; + if(builder.awsStoreInfo != null) { + awsStoreInfo = builder.awsStoreInfo; + } } public BinaryEncodingVersion getBinaryEncodingVersion() { @@ -448,7 +493,8 @@ public boolean storeNodeHistory() { private static final String[] JSONNAMES = { "binaryEncoding", "revisioning", "revisioningClass", "numbersOfRevisiontoRestore", "byteHandlerClasses", "storageKind", "hashKind", "hashFunction", "compression", "pathSummary", "resourceID", "deweyIDsStored", - "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount" }; + "persistenter", "storeDiffs", "customCommitTimestamps", "storeNodeHistory", "storeChildCount", "awsStoreInfo", + "awsProfile","awsRegion","bucketName","shouldCreateBucketIfNotExists"}; /** * Serialize the configuration. @@ -500,6 +546,15 @@ public static void serialize(final ResourceConfiguration config) throws SirixIOE jsonWriter.name(JSONNAMES[15]).value(config.storeNodeHistory); // Child count. jsonWriter.name(JSONNAMES[16]).value(config.storeChildCount); + if(config.awsStoreInfo != null) { + jsonWriter.name(JSONNAMES[17]).beginObject(); + jsonWriter.name(JSONNAMES[18]).value(config.awsStoreInfo.getAwsProfile()); + jsonWriter.name(JSONNAMES[19]).value(config.awsStoreInfo.getAwsRegion()); + jsonWriter.name(JSONNAMES[20]).value(config.awsStoreInfo.getBucketName()); + jsonWriter.name(JSONNAMES[21]).value(config.awsStoreInfo.shouldCreateBucketIfNotExists()); + jsonWriter.name(JSONNAMES[17]).endObject(); + } + jsonWriter.endObject(); } catch (final IOException e) { throw new SirixIOException(e); @@ -596,7 +651,23 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE name = jsonReader.nextName(); assert name.equals(JSONNAMES[16]); final boolean storeChildCount = jsonReader.nextBoolean(); + //name = jsonReader.nextName(); + AWSStorageInformation awsStoreInfo=null; + try { + name = jsonReader.nextName(); + assert name.equals(JSONNAMES[17]); + jsonReader.beginObject(); + if(jsonReader.hasNext()) { + final String awsProfile=jsonReader.nextString(); + final String awsRegion=jsonReader.nextString(); + final String bucketName=jsonReader.nextString(); + final boolean shouldCreateBucketIfNotExists=jsonReader.nextBoolean(); + awsStoreInfo = new AWSStorageInformation(awsProfile,awsRegion, bucketName, shouldCreateBucketIfNotExists); + } + jsonReader.endObject(); + }catch(SirixIOException | EOFException | IllegalStateException io) { + } jsonReader.endObject(); jsonReader.close(); fileReader.close(); @@ -619,7 +690,8 @@ public static ResourceConfiguration deserialize(final Path file) throws SirixIOE .storeDiffs(storeDiffs) .storeChildCount(storeChildCount) .customCommitTimestamps(customCommitTimestamps) - .storeNodeHistory(storeNodeHistory); + .storeNodeHistory(storeNodeHistory) + .awsStoreInfo(awsStoreInfo); // Deserialized instance. final ResourceConfiguration config = new ResourceConfiguration(builder); @@ -713,6 +785,8 @@ public static final class Builder { private BinaryEncodingVersion binaryEncodingVersion = BINARY_ENCODING_VERSION; + private AWSStorageInformation awsStoreInfo; + /** * Constructor, setting the mandatory fields. * @@ -880,6 +954,12 @@ public Builder binaryEncodingVersion(BinaryEncodingVersion binaryEncodingVersion return this; } + /*Since this is an optional config parameter, null check is not needed*/ + public Builder awsStoreInfo(final AWSStorageInformation awsStoreInfo) { + this.awsStoreInfo = awsStoreInfo; + return this; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java b/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java index 2dd9f68c7..bff9bfb22 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/StorageType.java @@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.sirix.access.ResourceConfiguration; import org.sirix.exception.SirixIOException; +import org.sirix.io.cloud.amazon.AmazonS3Storage; import org.sirix.io.file.FileStorage; import org.sirix.io.filechannel.FileChannelStorage; import org.sirix.io.iouring.IOUringStorage; @@ -113,6 +114,16 @@ public IOStorage getInstance(final ResourceConfiguration resourceConf) { storage.loadRevisionFileDataIntoMemory(cache); return storage; } + }, + + CLOUD { + @Override + public IOStorage getInstance(final ResourceConfiguration resourceConf) { + final AsyncCache cache = + getIntegerRevisionFileDataAsyncCache(resourceConf); + final var storage = new AmazonS3Storage(resourceConf, cache); + return storage; + } }; public static final ConcurrentMap> CACHE_REPOSITORY = diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java new file mode 100644 index 000000000..fdd4ae509 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/CloudPlatform.java @@ -0,0 +1,7 @@ +package org.sirix.io.cloud; + +public enum CloudPlatform { + + AWS, GCP, AZURE + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java new file mode 100644 index 000000000..95a3c5742 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/ICloudStorage.java @@ -0,0 +1,9 @@ +package org.sirix.io.cloud; + +import org.sirix.io.IOStorage; + +public interface ICloudStorage extends IOStorage { + + + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java new file mode 100644 index 000000000..66e97dea3 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3Storage.java @@ -0,0 +1,187 @@ +package org.sirix.io.cloud.amazon; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.sirix.access.ResourceConfiguration; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.Writer; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.bytepipe.ByteHandlerPipeline; +import org.sirix.io.cloud.ICloudStorage; +import org.sirix.page.PagePersister; +import org.sirix.page.SerializationType; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.AsyncCache; + +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.core.waiters.WaiterResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.waiters.S3Waiter; + +/** + * Factory to provide Amazon S3 as storage backend + * + * @Auther Sanket Band (@sband) + **/ + +public final class AmazonS3Storage implements ICloudStorage { + + /** + * Data file name. + */ + private static final String FILENAME = "sirix.data"; + + /** + * Revisions file name. + */ + private static final String REVISIONS_FILENAME = "sirix.revisions"; + + /** + * Instance to local storage. + */ + private final Path file; + + private S3Client s3Client; + + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3Storage.class)); + + /** + * Byte handler pipeline. + */ + private final ByteHandlerPipeline byteHandlerPipeline; + + /** + * Revision file data cache. + */ + private final AsyncCache cache; + + private ResourceConfiguration.AWSStorageInformation awsStorageInfo; + + private final AmazonS3StorageReader reader; + + /** + * Support AWS authentication only with .aws credentials file with the required + * profile name from the creds file + */ + public AmazonS3Storage(final ResourceConfiguration resourceConfig, AsyncCache cache) { + this.awsStorageInfo = resourceConfig.awsStoreInfo; + this.cache = cache; + this.byteHandlerPipeline = resourceConfig.byteHandlePipeline; + this.file = resourceConfig.resourcePath; + this.s3Client = getS3Client(); // this client is needed for the below checks, so initialize it here only. + String bucketName = awsStorageInfo.getBucketName(); + boolean shouldCreateBucketIfNotExists = awsStorageInfo.shouldCreateBucketIfNotExists(); + if (!isBucketExists(bucketName) && shouldCreateBucketIfNotExists) { + createBucket(bucketName); + } + this.reader = new AmazonS3StorageReader(bucketName, s3Client, getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), new ByteHandlerPipeline(this.byteHandlerPipeline), + SerializationType.DATA, new PagePersister(), cache.synchronous(), resourceConfig); + } + + void createBucket(String bucketName) { + try { + S3Waiter s3Waiter = s3Client.waiter(); + CreateBucketRequest bucketRequest = CreateBucketRequest.builder().bucket(bucketName).build(); + + s3Client.createBucket(bucketRequest); + HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder().bucket(bucketName).build(); + + WaiterResponse waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); + if (waiterResponse.matched().response().isPresent()) { + LOGGER.info(String.format("S3 bucket: %s has been created.", bucketName)); + } + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + LOGGER.error(String.format("Bucket: %s could not be created. Will not consume S3 storage", bucketName)); + System.exit(1); + } + } + + boolean isBucketExists(String bucketName) { + HeadBucketRequest headBucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + + try { + s3Client.headBucket(headBucketRequest); + return true; + } catch (NoSuchBucketException e) { + return false; + } + } + + S3Client getS3Client() { + return this.s3Client == null + ? S3Client.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build() + : this.s3Client; + } + + S3AsyncClient getAsyncS3Client() { + return S3AsyncClient.builder().region(Region.of(awsStorageInfo.getAwsRegion())) + .credentialsProvider(ProfileCredentialsProvider.create(awsStorageInfo.getAwsProfile())).build(); + } + + @Override + public Writer createWriter() { + return new AmazonS3StorageWriter(getDataFilePath().toAbsolutePath().toString(), + getRevisionFilePath().toAbsolutePath().toString(), awsStorageInfo.getBucketName(), + SerializationType.DATA, new PagePersister(), cache, reader, this.getAsyncS3Client()); + } + + @Override + public Reader createReader() { + return this.reader; + } + + @Override + public void close() { + + } + + @Override + public boolean exists() { + Path storage = this.reader.readObjectDataFromS3(getDataFilePath().toAbsolutePath().toString()); + try { + return Files.exists(storage) && Files.size(storage) > 0; + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public ByteHandler getByteHandler() { + return this.byteHandlerPipeline; + } + + /** + * Getting path for data file. This path would be used on the local storage + * + * @return the path for this data file + */ + private Path getDataFilePath() { + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(FILENAME); + } + + /** + * Getting concrete storage for this file. + * + * @return the concrete storage for this database + */ + private Path getRevisionFilePath() { + return file.resolve(ResourceConfiguration.ResourcePaths.DATA.getPath()).resolve(REVISIONS_FILENAME); + } +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java new file mode 100644 index 000000000..ca36e3aec --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageReader.java @@ -0,0 +1,143 @@ +package org.sirix.io.cloud.amazon; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.Path; +import java.time.Instant; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.access.ResourceConfiguration; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.file.FileReader; +import org.sirix.io.filechannel.FileChannelReader; +import org.sirix.page.PagePersister; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.SerializationType; +import org.sirix.page.interfaces.Page; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.hash.HashFunction; + +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class AmazonS3StorageReader implements Reader { + + /** + * S3 storage bucket name + * + */ + private final String bucketName; + + private final S3Client s3Client; + + private final ResourceConfiguration resourceConfig; + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageReader.class)); + + private FileChannelReader reader; + + public AmazonS3StorageReader(String bucketName, S3Client s3Client, String dataFileKeyName, + String revisionsOffsetFileKeyName, final ByteHandler byteHandler, final SerializationType serializationType, + final PagePersister pagePersister, final Cache cache, + ResourceConfiguration resourceConfig) { + this.bucketName = bucketName; + this.s3Client = s3Client; + this.resourceConfig = resourceConfig; + Path dataFilePath = readObjectDataFromS3(dataFileKeyName); + Path revisionOffsetFilePath = readObjectDataFromS3(revisionsOffsetFileKeyName); + try { + this.reader = new FileChannelReader(new RandomAccessFile(dataFilePath.toFile(), "r").getChannel(), + new RandomAccessFile(revisionOffsetFilePath.toFile(), "r").getChannel(), byteHandler, serializationType, + pagePersister, cache); + } catch (IOException io) { + LOGGER.error(io.getMessage()); + System.exit(1); + } + + } + + /** + * @param keyName - Key name of the object to be read from S3 storage + * @return path - The location of the local file that contains the data that is + * written to the file system storage in the system temp directory. + */ + protected Path readObjectDataFromS3(String keyName) { + + try { + GetObjectRequest objectRequest = GetObjectRequest.builder().key(keyName).bucket(bucketName).build(); + + ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); + byte[] data = objectBytes.asByteArray(); + /* + * As the bucketName has to be same as the database name, it makes sense to + * use/create file on the local filesystem instead of in the tmp partition + */ + Path path = resourceConfig.resourcePath; + // Write the data to a local file. + File myFile = path.toFile(); + try (OutputStream os = new FileOutputStream(myFile)) { + os.write(data); + } + return path; + } catch (IOException ex) { + ex.printStackTrace(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } + return null; + } + + ByteHandler getByteHandler() { + return this.reader.getByteHandler(); + } + + HashFunction getHashFunction() { + return this.reader.getHashFunction(); + } + + @Override + public PageReference readUberPageReference() { + return reader.readUberPageReference(); + } + + @Override + public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { + return reader.read(key, pageReadTrx); + } + + @Override + public void close() { + s3Client.close(); + reader.close(); + } + + @Override + public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { + return reader.readRevisionRootPage(revision, pageReadTrx); + } + + @Override + public Instant readRevisionRootPageCommitTimestamp(int revision) { + return reader.readRevisionRootPageCommitTimestamp(revision); + } + + @Override + public RevisionFileData getRevisionFileData(int revision) { + return reader.getRevisionFileData(revision); + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java new file mode 100644 index 000000000..3b8d34c61 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/cloud/amazon/AmazonS3StorageWriter.java @@ -0,0 +1,319 @@ +package org.sirix.io.cloud.amazon; + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.file.FileSystems; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.jetbrains.annotations.NotNull; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.exception.SirixIOException; +import org.sirix.io.AbstractForwardingReader; +import org.sirix.io.IOStorage; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.io.Writer; +import org.sirix.page.KeyValueLeafPage; +import org.sirix.page.PagePersister; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.SerializationType; +import org.sirix.page.UberPage; +import org.sirix.page.interfaces.Page; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.AsyncCache; + +import net.openhft.chronicle.bytes.Bytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +public class AmazonS3StorageWriter extends AbstractForwardingReader implements Writer { + + /** + * Random access to work on. + */ + private RandomAccessFile dataFile; + + /** + * {@link AmazonS3StorageReader} reference for this writer. + */ + private final AmazonS3StorageReader reader; + + private final SerializationType type; + + private RandomAccessFile revisionsFile; + + private final PagePersister pagePersister; + + private final AsyncCache cache; + + private boolean isFirstUberPage; + + private final Bytes byteBufferBytes = Bytes.elasticByteBuffer(1_000); + + private final S3AsyncClient s3Client; + + private final String bucketName; + + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(AmazonS3StorageWriter.class)); + + public AmazonS3StorageWriter(final String dataFileKeyName, final String revisionsOffsetFileKeyName, + final String bucketName, final SerializationType serializationType, final PagePersister pagePersister, + final AsyncCache cache, final AmazonS3StorageReader reader, + final S3AsyncClient s3Client) { + this.bucketName = bucketName; + type = requireNonNull(serializationType); + try { + this.dataFile = new RandomAccessFile(requireNonNull(reader.readObjectDataFromS3(dataFileKeyName)).toFile(), + "rw"); + this.revisionsFile = type == SerializationType.DATA + ? new RandomAccessFile( + requireNonNull(reader.readObjectDataFromS3(revisionsOffsetFileKeyName)).toFile(), "rw") + : null; + } catch (IOException io) { + LOGGER.error(String.format( + "Cannot create S3 storage writer, " + + "please check if DATA path OR Revision offset file path exists. Error details : %s", + io.getMessage())); + } + + this.pagePersister = requireNonNull(pagePersister); + this.cache = cache; + this.reader = requireNonNull(reader); + this.s3Client = s3Client; + } + + /** + * @param bucketName - S3 bucket name on AWS + * @param keyName - Name of the file that includes the full path that is + * supposed to be used on the local file system + * @param object - File that could be read from the local filesystem that + * contains the actual information to be stored on S3 + * + */ + protected void writeObjectToS3(String keyName, File object, boolean isDataFile) { + try { + Map metadata = new HashMap<>(); + metadata.put("x-amz-meta-sirix", isDataFile ? "data" : "revision"); + PutObjectRequest putOb = PutObjectRequest.builder().bucket(bucketName).key(keyName).metadata(metadata) + .build(); + + CompletableFuture objectFutureResponse = s3Client.putObject(putOb, + AsyncRequestBody.fromFile(object)); + objectFutureResponse.whenComplete((response, error) -> { + try { + if (response != null) { + LOGGER.info(String.format("Object: %s has been uploaded on %s", keyName, bucketName)); + /* + * No need to delete/cleanup the file as we are writing on the local file + * system, so this avoid unnecessarily filling up the filesystem space + */ + } else { + // Handle error + error.printStackTrace(); + LOGGER.error(error.getMessage()); + System.exit(1); + } + } finally { + s3Client.close(); + } + }); + + objectFutureResponse.join(); + } catch (S3Exception e) { + LOGGER.error(e.awsErrorDetails().errorMessage()); + System.exit(1); + } + } + + @Override + public void close() { + try { + if (dataFile != null) { + dataFile.close(); + } + if (revisionsFile != null) { + revisionsFile.close(); + } + if (reader != null) { + reader.close(); + } + this.s3Client.close(); + } catch (final IOException e) { + throw new SirixIOException(e); + } + } + + @Override + public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { + try { + final long fileSize = dataFile.length(); + long offset = fileSize == 0 ? IOStorage.FIRST_BEACON : fileSize; + return writePageReference(pageReadOnlyTrx, pageReference, offset); + } catch (final IOException e) { + throw new SirixIOException(e); + } + } + + private String getFileKeyName(String fileDescriptorPath) { + return fileDescriptorPath + .substring((System.getProperty("java.io.tmpdir") + FileSystems.getDefault().getSeparator()).length()); + } + + @NotNull + private AmazonS3StorageWriter writePageReference(final PageReadOnlyTrx pageReadOnlyTrx, + final PageReference pageReference, long offset) { + // Perform byte operations. + try { + // Serialize page. + final Page page = pageReference.getPage(); + + final byte[] serializedPage; + + try (final ByteArrayOutputStream output = new ByteArrayOutputStream(1_000); + final DataOutputStream dataOutput = new DataOutputStream( + reader.getByteHandler().serialize(output))) { + pagePersister.serializePage(pageReadOnlyTrx, byteBufferBytes, page, type); + final var byteArray = byteBufferBytes.toByteArray(); + dataOutput.write(byteArray); + dataOutput.flush(); + serializedPage = output.toByteArray(); + } + + byteBufferBytes.clear(); + + final byte[] writtenPage = new byte[serializedPage.length + IOStorage.OTHER_BEACON]; + final ByteBuffer buffer = ByteBuffer.allocate(writtenPage.length); + buffer.putInt(serializedPage.length); + buffer.put(serializedPage); + buffer.flip(); + buffer.get(writtenPage); + + // Getting actual offset and appending to the end of the current file. + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage) { + if (offset % REVISION_ROOT_PAGE_BYTE_ALIGN != 0) { + offset += REVISION_ROOT_PAGE_BYTE_ALIGN - (offset % REVISION_ROOT_PAGE_BYTE_ALIGN); + } + } else if (offset % PAGE_FRAGMENT_BYTE_ALIGN != 0) { + offset += PAGE_FRAGMENT_BYTE_ALIGN - (offset % PAGE_FRAGMENT_BYTE_ALIGN); + } + } + dataFile.seek(offset); + dataFile.write(writtenPage); + /* Write the file object to S3 */ + this.writeObjectToS3(this.getFileKeyName(dataFile.getFD().toString()), + new File(dataFile.getFD().toString()), Boolean.TRUE); + + // Remember page coordinates. + pageReference.setKey(offset); + + if (page instanceof KeyValueLeafPage keyValueLeafPage) { + pageReference.setHash(keyValueLeafPage.getHashCode()); + } else { + pageReference.setHash(reader.getHashFunction().hashBytes(serializedPage).asBytes()); + } + + if (type == SerializationType.DATA) { + if (page instanceof RevisionRootPage revisionRootPage) { + if (revisionRootPage.getRevision() == 0) { + revisionsFile.seek(revisionsFile.length() + IOStorage.FIRST_BEACON); + } else { + revisionsFile.seek(revisionsFile.length()); + } + revisionsFile.writeLong(offset); + revisionsFile.writeLong(revisionRootPage.getRevisionTimestamp()); + if (cache != null) { + final long currOffset = offset; + cache.put(revisionRootPage.getRevision(), + CompletableFuture.supplyAsync(() -> new RevisionFileData(currOffset, + Instant.ofEpochMilli(revisionRootPage.getRevisionTimestamp())))); + } + } else if (page instanceof UberPage && isFirstUberPage) { + revisionsFile.seek(0); + revisionsFile.write(serializedPage); + revisionsFile.seek(IOStorage.FIRST_BEACON >> 1); + revisionsFile.write(serializedPage); + } + this.writeObjectToS3(this.getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + + return this; + } catch (final IOException e) { + throw new SirixIOException(e); + } + } + + @Override + public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, + Bytes bufferedBytes) { + isFirstUberPage = true; + writePageReference(pageReadOnlyTrx, pageReference, 0); + isFirstUberPage = false; + writePageReference(pageReadOnlyTrx, pageReference, 100); + return this; + } + + @Override + public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { + try { + final var dataFileRevisionRootPageOffset = cache.get(revision, (unused) -> getRevisionFileData(revision)) + .get(5, TimeUnit.SECONDS).offset(); + + // Read page from file. + dataFile.seek(dataFileRevisionRootPageOffset); + final int dataLength = dataFile.readInt(); + + dataFile.getChannel().truncate(dataFileRevisionRootPageOffset + IOStorage.OTHER_BEACON + dataLength); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + } catch (InterruptedException | ExecutionException | TimeoutException | IOException e) { + throw new IllegalStateException(e); + } + + return this; + } + + @Override + public Writer truncate() { + try { + dataFile.setLength(0); + this.writeObjectToS3(getFileKeyName(dataFile.getFD().toString()), new File(dataFile.getFD().toString()), + Boolean.TRUE); + if (revisionsFile != null) { + revisionsFile.setLength(0); + this.writeObjectToS3(getFileKeyName(revisionsFile.getFD().toString()), + new File(revisionsFile.getFD().toString()), Boolean.FALSE); + } + } catch (final IOException e) { + throw new SirixIOException(e); + } + + return this; + } + + @Override + protected Reader delegate() { + return this.reader; + } +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java new file mode 100644 index 000000000..c1831bc23 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorage.java @@ -0,0 +1,48 @@ +package org.sirix.io.combined; + +import org.sirix.io.IOStorage; +import org.sirix.io.Reader; +import org.sirix.io.Writer; +import org.sirix.io.bytepipe.ByteHandler; +import org.sirix.io.cloud.ICloudStorage; + +public class CombinedStorage implements IOStorage { + + private final IOStorage localStorage; + + private final ICloudStorage remoteStorage; + + public CombinedStorage(final IOStorage localStorage, + final ICloudStorage remoteStorage) { + this.localStorage = localStorage; + this.remoteStorage = remoteStorage; + } + + @Override + public Writer createWriter() { + return new CombinedStorageWriter(localStorage.createWriter(), remoteStorage.createWriter(), localStorage.createReader()); + } + + @Override + public Reader createReader() { + return new CombinedStorageReader(localStorage.createReader(), remoteStorage.createReader()); + } + + @Override + public void close() { + localStorage.close(); + remoteStorage.close(); + } + + @Override + public boolean exists() { + if(!localStorage.exists()) return remoteStorage.exists(); + return localStorage.exists(); + } + + @Override + public ByteHandler getByteHandler() { + return localStorage.getByteHandler(); + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java new file mode 100644 index 000000000..1307acc9c --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageReader.java @@ -0,0 +1,74 @@ +package org.sirix.io.combined; + +import java.time.Instant; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.Reader; +import org.sirix.io.RevisionFileData; +import org.sirix.page.PageReference; +import org.sirix.page.RevisionRootPage; +import org.sirix.page.interfaces.Page; + +public class CombinedStorageReader implements Reader { + + + private Reader localReader, remoteReader; + + public CombinedStorageReader(Reader localReader, Reader remoteReader) { + this.localReader = localReader; + this.remoteReader = remoteReader; + } + + @Override + public PageReference readUberPageReference() { + PageReference pageReference = localReader.readUberPageReference(); + if(pageReference==null) { + pageReference = remoteReader.readUberPageReference(); + } + return pageReference; + } + + @Override + public Page read(PageReference key, @Nullable PageReadOnlyTrx pageReadTrx) { + Page page = localReader.read(key, pageReadTrx); + if(page==null) { + page = remoteReader.read(key, pageReadTrx); + } + return page; + } + + @Override + public void close() { + localReader.close(); + remoteReader.close(); + } + + @Override + public RevisionRootPage readRevisionRootPage(int revision, PageReadOnlyTrx pageReadTrx) { + RevisionRootPage revRootPage = localReader.readRevisionRootPage(revision, pageReadTrx); + if(revRootPage==null) { + revRootPage = remoteReader.readRevisionRootPage(revision, pageReadTrx); + } + return revRootPage; + } + + @Override + public Instant readRevisionRootPageCommitTimestamp(int revision) { + Instant revRootPageCommitTS = localReader.readRevisionRootPageCommitTimestamp(revision); + if(revRootPageCommitTS==null) { + revRootPageCommitTS = remoteReader.readRevisionRootPageCommitTimestamp(revision); + } + return revRootPageCommitTS; + } + + @Override + public RevisionFileData getRevisionFileData(int revision) { + RevisionFileData revFileData = localReader.getRevisionFileData(revision); + if(revFileData == null) { + revFileData = remoteReader.getRevisionFileData(revision); + } + return revFileData; + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java new file mode 100644 index 000000000..aea138543 --- /dev/null +++ b/bundles/sirix-core/src/main/java/org/sirix/io/combined/CombinedStorageWriter.java @@ -0,0 +1,72 @@ +package org.sirix.io.combined; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.sirix.api.PageReadOnlyTrx; +import org.sirix.io.AbstractForwardingReader; +import org.sirix.io.Reader; +import org.sirix.io.Writer; +import org.sirix.io.cloud.amazon.AmazonS3StorageReader; +import org.sirix.page.PageReference; +import org.sirix.utils.LogWrapper; +import org.slf4j.LoggerFactory; + +import net.openhft.chronicle.bytes.Bytes; + +public class CombinedStorageWriter extends AbstractForwardingReader implements Writer { + + private Writer localStorageWriter, remoteStorageWriter; + private Reader storageReader; + /** Logger. */ + private static final LogWrapper LOGGER = new LogWrapper(LoggerFactory.getLogger(CombinedStorageWriter.class)); + + public CombinedStorageWriter(Writer localWriter, Writer remoteWriter, Reader storageReader) { + this.localStorageWriter = localWriter; + this.remoteStorageWriter = remoteWriter; + this.storageReader = storageReader; + } + + @Override + public Writer write(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, Bytes bufferedBytes) { + Writer writer = localStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes); + CompletableFuture.supplyAsync(() -> remoteStorageWriter.write(pageReadOnlyTrx, pageReference, bufferedBytes)); + return writer; + } + + @Override + public Writer writeUberPageReference(PageReadOnlyTrx pageReadOnlyTrx, PageReference pageReference, + Bytes bufferedBytes) { + Writer writer = localStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes); + CompletableFuture.supplyAsync(() -> remoteStorageWriter.writeUberPageReference(pageReadOnlyTrx, pageReference, bufferedBytes)); + return writer; + } + + @Override + public Writer truncateTo(PageReadOnlyTrx pageReadOnlyTrx, int revision) { + Writer writer = localStorageWriter.truncateTo(pageReadOnlyTrx, revision); + CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncateTo(pageReadOnlyTrx, revision)); + return writer; + } + + @Override + public Writer truncate() { + Writer writer = localStorageWriter.truncate(); + CompletableFuture remoteWriterTask = CompletableFuture.supplyAsync(() -> remoteStorageWriter.truncate()); + return writer; + } + + @Override + public void close() { + localStorageWriter.close(); + remoteStorageWriter.close(); + + } + + @Override + protected Reader delegate() { + return storageReader; + } + +} diff --git a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java index 195cc0074..9a32ac3cc 100644 --- a/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java +++ b/bundles/sirix-core/src/main/java/org/sirix/io/filechannel/FileChannelReader.java @@ -167,4 +167,11 @@ public RevisionFileData getRevisionFileData(int revision) { public void close() { } + public ByteHandler getByteHandler() { + return this.byteHandler; + } + + public HashFunction getHashFunction() { + return this.hashFunction; + } } diff --git a/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java b/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java new file mode 100644 index 000000000..811774895 --- /dev/null +++ b/bundles/sirix-core/src/test/java/org/sirix/io/cloud/amazon/AWSS3StorageTest.java @@ -0,0 +1,76 @@ +package org.sirix.io.cloud.amazon; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.sirix.XmlTestHelper; +import org.sirix.XmlTestHelper.PATHS; +import org.sirix.access.ResourceConfiguration; +import org.sirix.api.Database; +import org.sirix.api.xml.XmlResourceSession; +import org.sirix.io.StorageType; + +import io.findify.s3mock.S3Mock; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +public class AWSS3StorageTest { + + private AmazonS3Storage awsStorage; + private S3Client s3Client; + private AmazonS3StorageWriter cloudWriter; + private AmazonS3StorageReader cloudReader; + + @Before + public void setup() { + final ResourceConfiguration.Builder resourceConfig = new ResourceConfiguration.Builder(XmlTestHelper.RESOURCE); + resourceConfig.storageType(StorageType.CLOUD); + Database xmlDatabase = XmlTestHelper.getDatabase(PATHS.PATH1.getFile()); + resourceConfig.awsStoreInfo(new ResourceConfiguration.AWSStorageInformation("default", + Region.US_EAST_1.id(), xmlDatabase.getName(), true)); + ResourceConfiguration testResources = resourceConfig.build(); + S3Mock api = S3Mock.create(8001, "."); + api.start(); + s3Client = S3Client.builder().region(Region.of(testResources.awsStoreInfo.getAwsRegion())) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .dualstackEnabled(true) + .endpointOverride(URI.create("http://127.0.0.1:8001")) + .build(); + testResources.resourcePath = PATHS.PATH1.getFile(); + + awsStorage = (AmazonS3Storage)StorageType.CLOUD.getInstance(testResources); + awsStorage.setS3Client(s3Client); + cloudWriter = (AmazonS3StorageWriter)awsStorage.createWriter(); + cloudReader = (AmazonS3StorageReader)awsStorage.createReader(); + } + + @Test + public void testS3StorageWriterNotNull() { + assertNotNull(cloudWriter); + } + + @Test + public void testS3StorageReaderNotNull() { + assertNotNull(cloudReader); + } + + @Test + public void testCreateBucket() { + awsStorage.createBucket(); + assertTrue(awsStorage.isBucketExists()); + } + + + @After + public void tearDown() { + XmlTestHelper.deleteEverything(); + s3Client.close(); + } + + +} \ No newline at end of file diff --git a/libraries.gradle b/libraries.gradle index 72dde06da..9cad91620 100644 --- a/libraries.gradle +++ b/libraries.gradle @@ -44,7 +44,9 @@ implLibraries = [ iouring : 'one.jasyncfio:jasyncfio:0.0.7:linux-amd64', roaringbitmap : 'org.roaringbitmap:RoaringBitmap:0.9.36', fastObjectPool : 'cn.danielw:fast-object-pool:2.2.1', - zeroAllocationHashing : 'net.openhft:zero-allocation-hashing:0.16' + zeroAllocationHashing : 'net.openhft:zero-allocation-hashing:0.16', + amazonS3 : 'software.amazon.awssdk:s3:2.20.62' + ] testLibraries = [ @@ -65,5 +67,6 @@ testLibraries = [ kotestAssertions : 'io.kotest:kotest-assertions-core-jvm:4.0.5', commonsCollections4 : 'org.apache.commons:commons-collections4:4.3', commonsCollections4Tests : 'org.apache.commons:commons-collections4:4.3:tests', - assertjCore : 'org.assertj:assertj-core:3.23.1' + assertjCore : 'org.assertj:assertj-core:3.23.1', + s3Mock : 'io.findify:s3mock_2.13:0.2.6' ]