Skip to content

Commit

Permalink
Stop large archive downloads timing out (#3925)
Browse files Browse the repository at this point in the history
* don't timeout when making archive of many large remote files

* re-implement sorting for archives

* ignore long-running test

* scalafmt

* don't use lazy source

* renames / tidy up

* log error message

* scalafmt

* remove println

* fix compile error

* increase patience on TarDownloadSpec

* scalafmt
  • Loading branch information
shinyhappydan authored Jun 14, 2023
1 parent cbabe96 commit 029c727
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,23 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue}
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse
import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.Complete
import ch.epfl.bluebrain.nexus.delta.sdk.error.SDKError
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import io.circe.{Json, Printer}
import monix.bio.{IO, UIO}
import monix.bio.{IO, Task, UIO}
import monix.execution.Scheduler

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -54,14 +59,20 @@ trait ArchiveDownload {
project: ProjectRef,
format: ArchiveFormat[M],
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, AkkaSource]
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource]

}

object ArchiveDownload {

implicit private val logger: Logger = Logger[ArchiveDownload]

case class ArchiveDownloadError(filename: String, response: Complete[JsonLdValue]) extends SDKError {
override def getMessage: String = {
s"Error streaming file '$filename' for archive: ${response.value.value}"
}
}

/**
* The default [[ArchiveDownload]] implementation.
*
Expand All @@ -88,17 +99,43 @@ object ArchiveDownload {
project: ProjectRef,
format: ArchiveFormat[M],
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, AkkaSource] = {
implicit val entryOrdering: Ordering[M] = format.ordering
val referenceList = value.resources.toList
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] = {
val references = value.resources.toList
for {
_ <- checkResourcePermissions(referenceList, project)
entries <- referenceList.traverseFilter {
case ref: ResourceReference => resourceEntry(ref, project, format, ignoreNotFound)
case ref: FileReference => fileEntry(ref, project, format, ignoreNotFound)
}
sorted = entries.sortBy { case (entry, _) => entry }
} yield Source(sorted).via(format.writeFlow)
_ <- checkResourcePermissions(references, project)
contentStream <- resolveReferencesAsStream(references, project, ignoreNotFound, format)
} yield {
Source.fromGraph(StreamConverter(contentStream)).via(format.writeFlow)
}
}

private def resolveReferencesAsStream[M](
references: List[ArchiveReference],
project: ProjectRef,
ignoreNotFound: Boolean,
format: ArchiveFormat[M]
)(implicit caller: Caller): IO[ArchiveRejection, Stream[Task, (M, AkkaSource)]] = {
references
.traverseFilter {
case ref: FileReference => fileEntry(ref, project, format, ignoreNotFound)
case ref: ResourceReference => resourceEntry(ref, project, format, ignoreNotFound)
}
.map(sortWith(format))
.map(asStream)
}

private def sortWith[M](
format: ArchiveFormat[M]
)(list: List[(M, Task[AkkaSource])]): List[(M, Task[AkkaSource])] = {
list.sortBy { case (entry, _) =>
entry
}(format.ordering)
}

private def asStream[M](list: List[(M, Task[AkkaSource])]) = {
Stream.iterable(list).evalMap[Task, (M, AkkaSource)] { case (metadata, source) =>
source.map(metadata -> _)
}
}

private def checkResourcePermissions(
Expand All @@ -121,27 +158,35 @@ object ArchiveDownload {
ignoreNotFound: Boolean
)(implicit
caller: Caller
): IO[ArchiveRejection, Option[(Metadata, AkkaSource)]] = {
): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = {
val refProject = ref.project.getOrElse(project)
// the required permissions are checked for each file content fetch
val tarEntryIO = fetchFileContent(ref.ref, refProject, caller)
val entry = fetchFileContent(ref.ref, refProject, caller)
.mapError {
case _: FileRejection.FileNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.TagNotFound => ResourceNotFound(ref.ref, project)
case _: FileRejection.RevisionNotFound => ResourceNotFound(ref.ref, project)
case FileRejection.AuthorizationFailed(addr, perm) => AuthorizationFailed(addr, perm)
case other => WrappedFileRejection(other)
}
.flatMap { fileResponse =>
.flatMap { case FileResponse(fileMetadata, content) =>
IO.fromEither(
pathOf(ref, project, format, fileResponse.filename).map { path =>
val metadata = format.metadata(path, fileResponse.bytes)
Some((metadata, fileResponse.content))
pathOf(ref, project, format, fileMetadata.filename).map { path =>
val archiveMetadata = format.metadata(path, fileMetadata.bytes)
val contentTask: Task[AkkaSource] = content
.tapError(response =>
UIO.delay(
logger
.error(s"Error streaming file '${fileMetadata.filename}' for archive: ${response.value.value}")
)
)
.mapError(response => ArchiveDownloadError(fileMetadata.filename, response))
Some((archiveMetadata, contentTask))
}
)
}
if (ignoreNotFound) tarEntryIO.onErrorRecover { case _: ResourceNotFound => None }
else tarEntryIO
if (ignoreNotFound) entry.onErrorRecover { case _: ResourceNotFound => None }
else entry
}

private def pathOf(
Expand All @@ -164,14 +209,14 @@ object ArchiveDownload {
project: ProjectRef,
format: ArchiveFormat[Metadata],
ignoreNotFound: Boolean
): IO[ArchiveRejection, Option[(Metadata, AkkaSource)]] = {
val tarEntryIO = resourceRefToByteString(ref, project).map { content =>
): IO[ArchiveRejection, Option[(Metadata, Task[AkkaSource])]] = {
val archiveEntry = resourceRefToByteString(ref, project).map { content =>
val path = pathOf(ref, project)
val metadata = format.metadata(path, content.length.toLong)
Some((metadata, Source.single(content)))
Some((metadata, Task.pure(Source.single(content))))
}
if (ignoreNotFound) tarEntryIO.onErrorHandle { _: ResourceNotFound => None }
else tarEntryIO
if (ignoreNotFound) archiveEntry.onErrorHandle { _: ResourceNotFound => None }
else archiveEntry
}

private def resourceRefToByteString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.{EphemeralDefinition, EphemeralLog}
import io.circe.Json
import monix.bio.{IO, UIO}
import monix.execution.Scheduler

/**
* Archives module.
Expand Down Expand Up @@ -168,7 +169,7 @@ class Archives(
project: ProjectRef,
format: ArchiveFormat[_],
ignoreNotFound: Boolean
)(implicit caller: Caller): IO[ArchiveRejection, AkkaSource] =
)(implicit caller: Caller, scheduler: Scheduler): IO[ArchiveRejection, AkkaSource] =
(for {
resource <- fetch(id, project)
value = resource.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import monix.bio.{IO, UIO}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{Inspectors, OptionValues}
import monix.execution.Scheduler.Implicits.global

import java.util.UUID
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -146,7 +147,7 @@ abstract class ArchiveDownloadSpec

def rejectedAccess(value: ArchiveValue) = {
archiveDownload
.apply(value, project.ref, format, ignoreNotFound = true)(Caller.Anonymous)
.apply(value, project.ref, format, ignoreNotFound = true)(Caller.Anonymous, global)
.rejectedWith[AuthorizationFailed]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class ArchivesSpec

private val cfg = ArchivePluginConfig(1, EphemeralLogConfig(5.seconds, 5.hours))
private val download = new ArchiveDownload {
override def apply[F](value: ArchiveValue, project: ProjectRef, format: ArchiveFormat[F], ignoreNotFound: Boolean)(
implicit caller: Caller
override def apply[M](value: ArchiveValue, project: ProjectRef, format: ArchiveFormat[M], ignoreNotFound: Boolean)(
implicit
caller: Caller,
scheduler: Scheduler
): IO[ArchiveRejection, AkkaSource] =
IO.pure(Source.empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package ch.epfl.bluebrain.nexus.delta.plugins.archive
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveFormat
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource

import scala.concurrent.duration.DurationInt

class TarDownloadSpec extends ArchiveDownloadSpec {
override def format: ArchiveFormat[_] = ArchiveFormat.Tar

implicit override def patienceConfig: PatienceConfig = PatienceConfig(3.seconds, 10.millis)
override def format: ArchiveFormat[_] = ArchiveFormat.Tar

override def sourceToMap(source: AkkaSource): Map[String, String] =
fromTar(source).map { case (k, v) => k -> v.utf8String }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ final class Files(
storage <- storages.fetch(file.value.storage, project)
permission = storage.value.storageValue.readPermission
_ <- aclCheck.authorizeForOr(project, permission)(AuthorizationFailed(project, permission))
s <- FetchFile(storage.value).apply(attributes).mapError(FetchRejection(file.id, storage.id, _))
s = FetchFile(storage.value)
.apply(attributes)
.mapError(FetchRejection(file.id, storage.id, _))
.leftWiden[FileRejection]
mediaType = attributes.mediaType.getOrElse(`application/octet-stream`)
} yield FileResponse(attributes.filename, mediaType, attributes.bytes, s)
}.span("fetchFileContent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.ConfigFixtures
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.{Caller, ServiceAccount}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
Expand Down Expand Up @@ -526,27 +527,31 @@ class FilesSpec(docker: RemoteStorageDocker)

}

def consumeContent(response: FileResponse): String = {
consume(response.content.accepted)
}

"fetching a file content" should {

"succeed" in {
val response = files.fetchContent(file1, projectRef).accepted
consume(response.content) shouldEqual content
response.filename shouldEqual "file.txt"
response.contentType shouldEqual `text/plain(UTF-8)`
consumeContent(response) shouldEqual content
response.metadata.filename shouldEqual "file.txt"
response.metadata.contentType shouldEqual `text/plain(UTF-8)`
}

"succeed by tag" in {
val response = files.fetchContent(IdSegmentRef(file1, tag), projectRef).accepted
consume(response.content) shouldEqual content
response.filename shouldEqual "file.txt"
response.contentType shouldEqual `text/plain(UTF-8)`
consumeContent(response) shouldEqual content
response.metadata.filename shouldEqual "file.txt"
response.metadata.contentType shouldEqual `text/plain(UTF-8)`
}

"succeed by rev" in {
val response = files.fetchContent(IdSegmentRef(file1, 1), projectRef).accepted
consume(response.content) shouldEqual content
response.filename shouldEqual "myfile.txt"
response.contentType shouldEqual `text/plain(UTF-8)`
consumeContent(response) shouldEqual content
response.metadata.filename shouldEqual "myfile.txt"
response.metadata.contentType shouldEqual `text/plain(UTF-8)`
}

"reject if tag does not exist" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
package ch.epfl.bluebrain.nexus.delta.sdk.directives

import akka.http.scaladsl.model.ContentType
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse.{Content, Metadata}
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue}
import ch.epfl.bluebrain.nexus.delta.sdk.directives.Response.Complete
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields
import monix.bio.IO

/**
* A file response content
*
* @param filename
* the filename
* @param contentType
* the file content type
* @param bytes
* the file size
* @param metadata
* the file metadata
* @param content
* the file content
*/
final case class FileResponse(filename: String, contentType: ContentType, bytes: Long, content: AkkaSource)
final case class FileResponse(metadata: Metadata, content: Content)

object FileResponse {

type Content = IO[Complete[JsonLdValue], AkkaSource]

/**
* Metadata for the file response
*
* @param filename
* the filename
* @param contentType
* the file content type
* @param bytes
* the file size
*/
final case class Metadata(filename: String, contentType: ContentType, bytes: Long)

def apply[E: JsonLdEncoder: HttpResponseFields](
filename: String,
contentType: ContentType,
bytes: Long,
io: IO[E, AkkaSource]
) =
new FileResponse(Metadata(filename, contentType, bytes), io.mapError { e => Complete(e).map(JsonLdValue(_)) })

def apply(filename: String, contentType: ContentType, bytes: Long, source: AkkaSource): FileResponse =
new FileResponse(Metadata(filename, contentType, bytes), IO.pure(source))
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,27 @@ object ResponseToJsonLd extends FileBytesInstances {
s"=?UTF-8?B?$encodedFilename?="
}

override def apply(statusOverride: Option[StatusCode]): Route =
onSuccess(io.attempt.runToFuture) {
override def apply(statusOverride: Option[StatusCode]): Route = {
val flattened = io.flatMap { fr => fr.content.attempt.map(_.map { s => fr.metadata -> s }) }.attempt
onSuccess(flattened.runToFuture) {
case Left(complete: Complete[E]) => emit(complete)
case Left(reject: Reject[E]) => emit(reject)
case Right(response) =>
case Right(Left(c)) =>
implicit val valueEncoder = c.value.encoder
emit(c.value.value)

case Right(Right((metadata, content))) =>
headerValueByType(Accept) { accept =>
if (accept.mediaRanges.exists(_.matches(response.contentType.mediaType))) {
val encodedFilename = attachmentString(response.filename)
if (accept.mediaRanges.exists(_.matches(metadata.contentType.mediaType))) {
val encodedFilename = attachmentString(metadata.filename)
respondWithHeaders(RawHeader("Content-Disposition", s"""attachment; filename="$encodedFilename"""")) {
complete(statusOverride.getOrElse(OK), HttpEntity(response.contentType, response.content))
complete(statusOverride.getOrElse(OK), HttpEntity(metadata.contentType, content))
}
} else
reject(unacceptedMediaTypeRejection(Seq(response.contentType.mediaType)))
reject(unacceptedMediaTypeRejection(Seq(metadata.contentType.mediaType)))
}
}
}
}
}

Expand Down
Loading

0 comments on commit 029c727

Please sign in to comment.