From 029c72776643c1f5bcf25ea03805b074f53326b3 Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Wed, 14 Jun 2023 10:00:14 +0200 Subject: [PATCH] Stop large archive downloads timing out (#3925) * don't timeout when making archive of many large remote files * re-implement sorting for archives * ignore long-running test * scalafmt * don't use lazy source * renames / tidy up * log error message * scalafmt * remove println * fix compile error * increase patience on TarDownloadSpec * scalafmt --- .../plugins/archive/ArchiveDownload.scala | 97 ++++++++++++++----- .../delta/plugins/archive/Archives.scala | 3 +- .../plugins/archive/ArchiveDownloadSpec.scala | 3 +- .../delta/plugins/archive/ArchivesSpec.scala | 6 +- .../plugins/archive/TarDownloadSpec.scala | 6 +- .../delta/plugins/storage/files/Files.scala | 5 +- .../plugins/storage/files/FilesSpec.scala | 23 +++-- .../delta/sdk/directives/FileResponse.scala | 45 +++++++-- .../sdk/directives/ResponseToJsonLd.scala | 20 ++-- .../kg/archives/archive-many-large-files.json | 79 +++++++++++++++ .../nexus/tests/kg/RemoteStorageSpec.scala | 62 +++++++++++- 11 files changed, 291 insertions(+), 58 deletions(-) create mode 100644 tests/src/test/resources/kg/archives/archive-many-large-files.json diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala index f68c037cbc..20967698dc 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownload.scala @@ -14,18 +14,23 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering -import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource +import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue} import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse +import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.Complete +import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources +import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} import com.typesafe.scalalogging.Logger +import fs2.Stream import io.circe.{Json, Printer} -import monix.bio.{IO, UIO} +import monix.bio.{IO, Task, UIO} +import monix.execution.Scheduler import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -54,7 +59,7 @@ trait ArchiveDownload { project: ProjectRef, format: ArchiveFormat[M], ignoreNotFound: Boolean - )(implicit caller: Caller): IO[ArchiveRejection, AkkaSource] + )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] } @@ -62,6 +67,12 @@ object ArchiveDownload { implicit private val logger: Logger = Logger[ArchiveDownload] + case class ArchiveDownloadError(filename: String, response: Complete[JsonLdValue]) extends SDKError { + override def getMessage: String = { + s"Error streaming file '$filename' for archive: ${response.value.value}" + } + } + /** * The default [[ArchiveDownload]] implementation. * @@ -88,17 +99,43 @@ object ArchiveDownload { project: ProjectRef, format: ArchiveFormat[M], ignoreNotFound: Boolean - )(implicit caller: Caller): IO[ArchiveRejection, AkkaSource] = { - implicit val entryOrdering: Ordering[M] = format.ordering - val referenceList = value.resources.toList + )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = { + val references = value.resources.toList for { - _ <- checkResourcePermissions(referenceList, project) - entries <- referenceList.traverseFilter { - case ref: ResourceReference => resourceEntry(ref, project, format, ignoreNotFound) - case ref: FileReference => fileEntry(ref, project, format, ignoreNotFound) - } - sorted = entries.sortBy { case (entry, _) => entry } - } yield Source(sorted).via(format.writeFlow) + _ <- checkResourcePermissions(references, project) + contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound, format) + } yield { + Source.fromGraph(StreamConverter(contentStream)).via(format.writeFlow) + } + } + + private def resolveReferencesAsStream[M]( + references: List[ArchiveReference], + project: ProjectRef, + ignoreNotFound: Boolean, + format: ArchiveFormat[M] + )(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (M, AkkaSource)]] = { + references + .traverseFilter { + case ref: FileReference => fileEntry(ref, project, format, ignoreNotFound) + case ref: ResourceReference => resourceEntry(ref, project, format, ignoreNotFound) + } + .map(sortWith(format)) + .map(asStream) + } + + private def sortWith[M]( + format: ArchiveFormat[M] + )(list: List[(M, Task[AkkaSource])]): List[(M, Task[AkkaSource])] = { + list.sortBy { case (entry, _) => + entry + }(format.ordering) + } + + private def asStream[M](list: List[(M, Task[AkkaSource])]) = { + Stream.iterable(list).evalMap[Task, (M, AkkaSource)] { case (metadata, source) => + source.map(metadata -> _) + } } private def checkResourcePermissions( @@ -121,10 +158,10 @@ object ArchiveDownload { ignoreNotFound: Boolean )(implicit caller: Caller - ): IO[ArchiveRejection, Option[(Metadata, AkkaSource)]] = { + ): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = { val refProject = ref.project.getOrElse(project) // the required permissions are checked for each file content fetch - val tarEntryIO = fetchFileContent(ref.ref, refProject, caller) + val entry = fetchFileContent(ref.ref, refProject, caller) .mapError { case _: FileRejection.FileNotFound => ResourceNotFound(ref.ref, project) case _: FileRejection.TagNotFound => ResourceNotFound(ref.ref, project) @@ -132,16 +169,24 @@ object ArchiveDownload { case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm) case other => WrappedFileRejection(other) } - .flatMap { fileResponse => + .flatMap { case FileResponse(fileMetadata, content) => IO.fromEither( - pathOf(ref, project, format, fileResponse.filename).map { path => - val metadata = format.metadata(path, fileResponse.bytes) - Some((metadata, fileResponse.content)) + pathOf(ref, project, format, fileMetadata.filename).map { path => + val archiveMetadata = format.metadata(path, fileMetadata.bytes) + val contentTask: Task[AkkaSource] = content + .tapError(response => + UIO.delay( + logger + .error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}") + ) + ) + .mapError(response => ArchiveDownloadError(fileMetadata.filename, response)) + Some((archiveMetadata, contentTask)) } ) } - if (ignoreNotFound) tarEntryIO.onErrorRecover { case _: ResourceNotFound => None } - else tarEntryIO + if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None } + else entry } private def pathOf( @@ -164,14 +209,14 @@ object ArchiveDownload { project: ProjectRef, format: ArchiveFormat[Metadata], ignoreNotFound: Boolean - ): IO[ArchiveRejection, Option[(Metadata, AkkaSource)]] = { - val tarEntryIO = resourceRefToByteString(ref, project).map { content => + ): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = { + val archiveEntry = resourceRefToByteString(ref, project).map { content => val path = pathOf(ref, project) val metadata = format.metadata(path, content.length.toLong) - Some((metadata, Source.single(content))) + Some((metadata, Task.pure(Source.single(content)))) } - if (ignoreNotFound) tarEntryIO.onErrorHandle { _: ResourceNotFound => None } - else tarEntryIO + if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None } + else archiveEntry } private def resourceRefToByteString( diff --git a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala index abbedb779d..aa7adc25d0 100644 --- a/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala +++ b/delta/plugins/archive/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/Archives.scala @@ -25,6 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.{EphemeralDefinition, EphemeralLog} import io.circe.Json import monix.bio.{IO, UIO} +import monix.execution.Scheduler /** * Archives module. @@ -168,7 +169,7 @@ class Archives( project: ProjectRef, format: ArchiveFormat[_], ignoreNotFound: Boolean - )(implicit caller: Caller): IO[ArchiveRejection, AkkaSource] = + )(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = (for { resource <- fetch(id, project) value = resource.value diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala index c4efcc1731..8b5bea0b85 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchiveDownloadSpec.scala @@ -43,6 +43,7 @@ import monix.bio.{IO, UIO} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.{Inspectors, OptionValues} +import monix.execution.Scheduler.Implicits.global import java.util.UUID import scala.concurrent.ExecutionContext @@ -146,7 +147,7 @@ abstract class ArchiveDownloadSpec def rejectedAccess(value: ArchiveValue) = { archiveDownload - .apply(value, project.ref, format, ignoreNotFound = true)(Caller.Anonymous) + .apply(value, project.ref, format, ignoreNotFound = true)(Caller.Anonymous, global) .rejectedWith[AuthorizationFailed] } diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala index 59df61a554..4b7dddf1fb 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/ArchivesSpec.scala @@ -62,8 +62,10 @@ class ArchivesSpec private val cfg = ArchivePluginConfig(1, EphemeralLogConfig(5.seconds, 5.hours)) private val download = new ArchiveDownload { - override def apply[F](value: ArchiveValue, project: ProjectRef, format: ArchiveFormat[F], ignoreNotFound: Boolean)( - implicit caller: Caller + override def apply[M](value: ArchiveValue, project: ProjectRef, format: ArchiveFormat[M], ignoreNotFound: Boolean)( + implicit + caller: Caller, + scheduler: Scheduler ): IO[ArchiveRejection, AkkaSource] = IO.pure(Source.empty) } diff --git a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/TarDownloadSpec.scala b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/TarDownloadSpec.scala index 4873058a0d..456b4bc266 100644 --- a/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/TarDownloadSpec.scala +++ b/delta/plugins/archive/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/archive/TarDownloadSpec.scala @@ -2,8 +2,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveFormat import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource +import scala.concurrent.duration.DurationInt + class TarDownloadSpec extends ArchiveDownloadSpec { - override def format: ArchiveFormat[_] = ArchiveFormat.Tar + + implicit override def patienceConfig: PatienceConfig = PatienceConfig(3.seconds, 10.millis) + override def format: ArchiveFormat[_] = ArchiveFormat.Tar override def sourceToMap(source: AkkaSource): Map[String, String] = fromTar(source).map { case (k, v) => k -> v.utf8String } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index ef0eb945e4..05413eba06 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -345,7 +345,10 @@ final class Files( storage <- storages.fetch(file.value.storage, project) permission = storage.value.storageValue.readPermission _ <- aclCheck.authorizeForOr(project, permission)(AuthorizationFailed(project, permission)) - s <- FetchFile(storage.value).apply(attributes).mapError(FetchRejection(file.id, storage.id, _)) + s = FetchFile(storage.value) + .apply(attributes) + .mapError(FetchRejection(file.id, storage.id, _)) + .leftWiden[FileRejection] mediaType = attributes.mediaType.getOrElse(`application/octet-stream`) } yield FileResponse(attributes.filename, mediaType, attributes.bytes, s) }.span("fetchFileContent") diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala index 49168a266c..7e2062143f 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/FilesSpec.scala @@ -20,6 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress +import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount} import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ @@ -526,27 +527,31 @@ class FilesSpec(docker: RemoteStorageDocker) } + def consumeContent(response: FileResponse): String = { + consume(response.content.accepted) + } + "fetching a file content" should { "succeed" in { val response = files.fetchContent(file1, projectRef).accepted - consume(response.content) shouldEqual content - response.filename shouldEqual "file.txt" - response.contentType shouldEqual `text/plain(UTF-8)` + consumeContent(response) shouldEqual content + response.metadata.filename shouldEqual "file.txt" + response.metadata.contentType shouldEqual `text/plain(UTF-8)` } "succeed by tag" in { val response = files.fetchContent(IdSegmentRef(file1, tag), projectRef).accepted - consume(response.content) shouldEqual content - response.filename shouldEqual "file.txt" - response.contentType shouldEqual `text/plain(UTF-8)` + consumeContent(response) shouldEqual content + response.metadata.filename shouldEqual "file.txt" + response.metadata.contentType shouldEqual `text/plain(UTF-8)` } "succeed by rev" in { val response = files.fetchContent(IdSegmentRef(file1, 1), projectRef).accepted - consume(response.content) shouldEqual content - response.filename shouldEqual "myfile.txt" - response.contentType shouldEqual `text/plain(UTF-8)` + consumeContent(response) shouldEqual content + response.metadata.filename shouldEqual "myfile.txt" + response.metadata.contentType shouldEqual `text/plain(UTF-8)` } "reject if tag does not exist" in { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala index 981ef0266d..c553fcef08 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/FileResponse.scala @@ -1,18 +1,47 @@ package ch.epfl.bluebrain.nexus.delta.sdk.directives import akka.http.scaladsl.model.ContentType -import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder +import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse.{Content, Metadata} +import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue} +import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.Complete +import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields +import monix.bio.IO /** * A file response content * - * @param filename - * the filename - * @param contentType - * the file content type - * @param bytes - * the file size + * @param metadata + * the file metadata * @param content * the file content */ -final case class FileResponse(filename: String, contentType: ContentType, bytes: Long, content: AkkaSource) +final case class FileResponse(metadata: Metadata, content: Content) + +object FileResponse { + + type Content = IO[Complete[JsonLdValue], AkkaSource] + + /** + * Metadata for the file response + * + * @param filename + * the filename + * @param contentType + * the file content type + * @param bytes + * the file size + */ + final case class Metadata(filename: String, contentType: ContentType, bytes: Long) + + def apply[E: JsonLdEncoder: HttpResponseFields]( + filename: String, + contentType: ContentType, + bytes: Long, + io: IO[E, AkkaSource] + ) = + new FileResponse(Metadata(filename, contentType, bytes), io.mapError { e => Complete(e).map(JsonLdValue(_)) }) + + def apply(filename: String, contentType: ContentType, bytes: Long, source: AkkaSource): FileResponse = + new FileResponse(Metadata(filename, contentType, bytes), IO.pure(source)) +} diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLd.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLd.scala index f5f1dc8fc8..6dfde427e9 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLd.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/directives/ResponseToJsonLd.scala @@ -103,21 +103,27 @@ object ResponseToJsonLd extends FileBytesInstances { s"=?UTF-8?B?$encodedFilename?=" } - override def apply(statusOverride: Option[StatusCode]): Route = - onSuccess(io.attempt.runToFuture) { + override def apply(statusOverride: Option[StatusCode]): Route = { + val flattened = io.flatMap { fr => fr.content.attempt.map(_.map { s => fr.metadata -> s }) }.attempt + onSuccess(flattened.runToFuture) { case Left(complete: Complete[E]) => emit(complete) case Left(reject: Reject[E]) => emit(reject) - case Right(response) => + case Right(Left(c)) => + implicit val valueEncoder = c.value.encoder + emit(c.value.value) + + case Right(Right((metadata, content))) => headerValueByType(Accept) { accept => - if (accept.mediaRanges.exists(_.matches(response.contentType.mediaType))) { - val encodedFilename = attachmentString(response.filename) + if (accept.mediaRanges.exists(_.matches(metadata.contentType.mediaType))) { + val encodedFilename = attachmentString(metadata.filename) respondWithHeaders(RawHeader("Content-Disposition", s"""attachment; filename="$encodedFilename"""")) { - complete(statusOverride.getOrElse(OK), HttpEntity(response.contentType, response.content)) + complete(statusOverride.getOrElse(OK), HttpEntity(metadata.contentType, content)) } } else - reject(unacceptedMediaTypeRejection(Seq(response.contentType.mediaType))) + reject(unacceptedMediaTypeRejection(Seq(metadata.contentType.mediaType))) } } + } } } diff --git a/tests/src/test/resources/kg/archives/archive-many-large-files.json b/tests/src/test/resources/kg/archives/archive-many-large-files.json new file mode 100644 index 0000000000..99dcee6464 --- /dev/null +++ b/tests/src/test/resources/kg/archives/archive-many-large-files.json @@ -0,0 +1,79 @@ +{ + "resources": [ + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile1.txt", + "path": "/some/other/largefile1.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile2.txt", + "path": "/some/other/largefile2.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile3.txt", + "path": "/some/other/largefile3.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile4.txt", + "path": "/some/other/largefile4.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile5.txt", + "path": "/some/other/largefile5.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile6.txt", + "path": "/some/other/largefile6.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile7.txt", + "path": "/some/other/largefile7.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile8.txt", + "path": "/some/other/largefile8.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile9.txt", + "path": "/some/other/largefile9.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile10.txt", + "path": "/some/other/largefile10.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile11.txt", + "path": "/some/other/largefile11.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile12.txt", + "path": "/some/other/largefile12.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile13.txt", + "path": "/some/other/largefile13.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile14.txt", + "path": "/some/other/largefile14.txt" + }, + { + "@type": "File", + "resourceId": "https://dev.nexus.test.com/simplified-resource/largefile15.txt", + "path": "/some/other/largefile15.txt" + } + ] +} \ No newline at end of file diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/RemoteStorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/RemoteStorageSpec.scala index ed3bfd5dd5..fca845e76d 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/RemoteStorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/RemoteStorageSpec.scala @@ -1,6 +1,7 @@ package ch.epfl.bluebrain.nexus.tests.kg -import akka.http.scaladsl.model.{ContentTypes, StatusCodes} +import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes, StatusCodes} +import akka.util.ByteString import ch.epfl.bluebrain.nexus.tests.HttpClient._ import ch.epfl.bluebrain.nexus.tests.Identity import ch.epfl.bluebrain.nexus.tests.Identity.storages.Coyote @@ -10,10 +11,11 @@ import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.Supervision import io.circe.generic.semiauto.deriveDecoder import io.circe.{Decoder, Json} import monix.bio.Task +import org.scalactic.source.Position import org.scalatest.Assertion import scala.annotation.nowarn -import sys.process._ +import scala.sys.process._ class RemoteStorageSpec extends StorageSpec { @@ -125,6 +127,62 @@ class RemoteStorageSpec extends StorageSpec { } yield succeed } + def putFile(name: String, content: String, storageId: String)(implicit position: Position) = { + deltaClient.putAttachment[Json]( + s"/files/$fullId/test-resource:$name?storage=nxv:${storageId}", + content, + MediaTypes.`text/plain`.toContentType(HttpCharsets.`UTF-8`), + name, + Coyote + ) { (_, response) => + response.status shouldEqual StatusCodes.Created + } + } + + def randomString(length: Int) = { + val r = new scala.util.Random + val sb = new StringBuilder + for (_ <- 1 to length) { + sb.append(r.nextPrintableChar()) + } + sb.toString + } + + "succeed many large files are in the archive, going over the time limit" ignore { + val content = randomString(130000000) + val payload = jsonContentOf("/kg/archives/archive-many-large-files.json") + var before = 0L + for { + _ <- putFile("largefile1.txt", content, s"${storageId}2") + _ <- putFile("largefile2.txt", content, s"${storageId}2") + _ <- putFile("largefile3.txt", content, s"${storageId}2") + _ <- putFile("largefile4.txt", content, s"${storageId}2") + _ <- putFile("largefile5.txt", content, s"${storageId}2") + _ <- putFile("largefile6.txt", content, s"${storageId}2") + _ <- putFile("largefile7.txt", content, s"${storageId}2") + _ <- putFile("largefile8.txt", content, s"${storageId}2") + _ <- putFile("largefile9.txt", content, s"${storageId}2") + _ <- putFile("largefile10.txt", content, s"${storageId}2") + _ <- putFile("largefile11.txt", content, s"${storageId}2") + _ <- putFile("largefile12.txt", content, s"${storageId}2") + _ <- putFile("largefile13.txt", content, s"${storageId}2") + _ <- putFile("largefile14.txt", content, s"${storageId}2") + _ <- putFile("largefile15.txt", content, s"${storageId}2") + _ <- deltaClient.put[ByteString](s"/archives/$fullId/nxv:very-large-archive", payload, Coyote) { (_, response) => + before = System.currentTimeMillis() + response.status shouldEqual StatusCodes.Created + } + _ <- + deltaClient.get[ByteString](s"/archives/$fullId/nxv:very-large-archive", Coyote, acceptAll) { (_, response) => + println(s"time taken to download archive: ${System.currentTimeMillis() - before}ms") + response.status shouldEqual StatusCodes.OK + contentType(response) shouldEqual MediaTypes.`application/x-tar`.toContentType + } + } yield { + succeed + } + } + "creating a remote storage" should { "fail creating a RemoteDiskStorage without folder" in { val payload = jsonContentOf(