Skip to content

Commit

Permalink
Miscellaneous cleanup (#6)
Browse files Browse the repository at this point in the history
Bump the versions of our dependencies, clean up the docs, and update
tests for the Concurrency-related changes in Vapor.
  • Loading branch information
gwynne authored May 19, 2024
1 parent cad57b4 commit c369658
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 81 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2020 Matthieu Barthélemy
Copyright (c) 2024 Gwynne Raskind

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
16 changes: 7 additions & 9 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ let package = Package(
.library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"),
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.10.0"),
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.48.4"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.30.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.7.1"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.1"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.5.0"),
],
targets: [
.target(
Expand Down Expand Up @@ -50,6 +50,4 @@ var swiftSettings: [SwiftSetting] { [
.enableUpcomingFeature("ForwardTrailingClosures"),
.enableUpcomingFeature("ConciseMagicFile"),
.enableUpcomingFeature("DisableOutwardActorInference"),
.enableUpcomingFeature("StrictConcurrency"),
.enableExperimentalFeature("StrictConcurrency=complete"),
] }
15 changes: 7 additions & 8 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ let package = Package(
.library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]),
],
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"),
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.10.0"),
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.48.4"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.30.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.7.1"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.1"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.5.0"),
],
targets: [
.target(
Expand Down Expand Up @@ -54,6 +54,5 @@ var swiftSettings: [SwiftSetting] { [
.enableUpcomingFeature("ExistentialAny"),
.enableUpcomingFeature("ConciseMagicFile"),
.enableUpcomingFeature("DisableOutwardActorInference"),
.enableUpcomingFeature("StrictConcurrency"),
.enableExperimentalFeature("StrictConcurrency=complete"),
] }
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database.

This package makes use of the `SKIP LOCKED` feature supported by some of the major database engines (most notably [PostgresSQL][postgres-skip-locked] and [MySQL][mysql-skip-locked]) when available to make a best-effort guarantee that a task or job won't be picked by multiple workers.

This package should be compatible with:
This package should be compatible with any SQL database supported by the various Fluent drivers. It is specifically known to work with:

- PostgreSQL 11.0+
- MySQL 8.0+
- MySQL 5.7+
- MariaDB 10.5+
- SQLite

> [!NOTE]
> [!WARNING]
> Although SQLite can be used with this package, SQLite has no support for advanced locking. It is not likely to function correctly with more than one or two queue workers.
[postgres-skip-locked]: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE
[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.3/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a
[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.4/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a

## Getting started

Expand All @@ -29,7 +30,7 @@ Add `QueuesFluentDriver` as dependency to your `Package.swift`:

```swift
dependencies: [
.package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"),
.package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.4"),
...
]
```
Expand Down Expand Up @@ -62,26 +63,25 @@ app.queues.use(.fluent())
## Options

### Using a custom Database
You can optionally create a dedicated Database, set to `isdefault: false` and with a custom `DatabaseID` and use it for your Queues.
In that case you would initialize the Queues configuration like this:

```swift
let queuesDb = DatabaseID(string: "my_queues_db")
app.databases.use(.postgres(configuration: dbConfig), as: queuesDb, isDefault: false)
app.queues.use(.fluent(queuesDb))
```
You can optionally create a dedicated non-default `Database` with a custom `DatabaseID` for use with your queues, as in the following example:

### Customizing the jobs table name
You can customize the name of the table used by this driver during the migration :
```swift
app.migrations.add(JobMetadataMigrate(schema: "my_jobs"))
extension DatabaseID {
static var queues: Self { .init(string: "my_queues_db") }
}

func configure(_ app: Application) async throws {
app.databases.use(.postgres(configuration: ...), as: .queues, isDefault: false)
app.queues.use(.fluent(.queues))
}
```

## Caveats

### Polling interval and number of workers

By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers.
By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. Additionally, when using SQLite as the underlying database it is generally inadvisable to run more than one worker at a time, as SQLite does not have the .

The polling interval can be changed using the `refreshInterval` configuration setting:

Expand Down
38 changes: 35 additions & 3 deletions Sources/QueuesFluentDriver/Documentation.docc/Documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,22 @@ A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database.
[Queues]: https://github.com/vapor/queues
[Fluent]: https://github.com/vapor/fluent

## Overview
## Compatibility

This package makes use of the `SKIP LOCKED` feature supported by some of the major database engines (most notably [PostgresSQL][postgres-skip-locked] and [MySQL][mysql-skip-locked]) when available to make a best-effort guarantee that a task or job won't be picked by multiple workers.

This package should be compatible with any SQL database supported by the various Fluent drivers. It is specifically known to work with:

- PostgreSQL 11.0+
- MySQL 5.7+
- MariaDB 10.5+
- SQLite

> [!WARNING]
> Although SQLite can be used with this package, SQLite has no support for advanced locking. It is not likely to function correctly with more than one or two queue workers.
[postgres-skip-locked]: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE
[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.4/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a

## Getting started

Expand All @@ -19,7 +34,7 @@ Add `QueuesFluentDriver` as dependency to your `Package.swift`:

```swift
dependencies: [
.package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"),
.package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.4"),
...
]
```
Expand Down Expand Up @@ -52,11 +67,28 @@ app.queues.use(.fluent())

> Warning: Always call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`!
## Options

### Using a custom Database

You can optionally create a dedicated non-default `Database` with a custom `DatabaseID` for use with your queues, as in the following example:

```swift
extension DatabaseID {
static var queues: Self { .init(string: "my_queues_db") }
}

func configure(_ app: Application) async throws {
app.databases.use(.postgres(configuration: ...), as: .queues, isDefault: false)
app.queues.use(.fluent(.queues))
}
```

## Caveats

### Polling interval and number of workers

By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers.
By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. Additionally, when using SQLite as the underlying database it is generally inadvisable to run more than one worker at a time, as SQLite does not have the necessary support for cross-connection locking.

The polling interval can be changed using the `refreshInterval` configuration setting:

Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import SQLKit
import NIOConcurrencyHelpers
import struct Foundation.Data

Expand Down
4 changes: 3 additions & 1 deletion Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ struct SQLDateValue<E: SQLExpression>: SQLExpression {
}
}

/// An alternative of `SQLLockingClause` which specifies the `SKIP LOCKED` modifier when the underlying database
/// An alternative to `SQLLockingClause` which specifies the `SKIP LOCKED` modifier when the underlying database
/// supports it. As MySQL's and PostgreSQL's manuals both note, this should not be used except in very specific
/// scenarios, such as that of this package.
///
/// It is safe to use this expression with SQLite; its dialect correctly denies support for locking expressions.
enum SQLLockingClauseWithSkipLocked: SQLExpression {
/// Request an exclusive "writer" lock, skipping rows that are already locked.
case updateSkippingLocked
Expand Down
78 changes: 36 additions & 42 deletions Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import NIOSSL

final class QueuesFluentDriverTests: XCTestCase {
var dbid: DatabaseID { .sqlite }
var app: Application!

private func useDbs(_ app: Application) throws {
app.databases.use(.sqlite(.memory), as: .sqlite)
Expand All @@ -35,103 +36,88 @@ final class QueuesFluentDriverTests: XCTestCase {
}

func testApplication() async throws {
let app = Application(.testing)
defer { app.shutdown() }

try self.useDbs(app)
app.migrations.add(JobModelMigration(), to: self.dbid)
self.app.migrations.add(JobModelMigration(), to: self.dbid)

let email = Email()
app.queues.add(email)
self.app.queues.add(email)

app.queues.use(.fluent(self.dbid))
self.app.queues.use(.fluent(self.dbid))

try await app.autoMigrate()
try await self.app.autoMigrate()

app.get("send-email") { req in
self.app.get("send-email") { req in
req.queue.dispatch(Email.self, .init(to: "[email protected]"))
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "send-email") { res in
try await self.app.testable().test(.GET, "send-email") { res async in
XCTAssertEqual(res.status, .ok)
}

XCTAssertEqual(email.sent, [])
try await app.queues.queue.worker.run().get()
try await self.app.queues.queue.worker.run().get()
XCTAssertEqual(email.sent, [.init(to: "[email protected]")])

try await app.autoRevert()
try await self.app.autoRevert()
}

func testFailedJobLoss() async throws {
let app = Application(.testing)
defer { app.shutdown() }

try self.useDbs(app)
app.queues.add(FailingJob())
app.queues.use(.fluent(self.dbid))
app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()
self.app.queues.add(FailingJob())
self.app.queues.use(.fluent(self.dbid))
self.app.migrations.add(JobModelMigration(), to: self.dbid)
try await self.app.autoMigrate()

let jobId = JobIdentifier()
app.get("test") { req in
self.app.get("test") { req in
req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobId)
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "test") { res in
try await self.app.testable().test(.GET, "test") { res async in
XCTAssertEqual(res.status, .ok)
}

await XCTAssertThrowsErrorAsync(try await app.queues.queue.worker.run().get()) {
await XCTAssertThrowsErrorAsync(try await self.app.queues.queue.worker.run().get()) {
XCTAssert($0 is FailingJob.Failure)
}

await XCTAssertNotNilAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertNotNilAsync(try await (self.app.databases.database(self.dbid, logger: .init(label: ""), on: self.app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first())

try await app.autoRevert()
try await self.app.autoRevert()
}

func testDelayedJobIsRemovedFromProcessingQueue() async throws {
let app = Application(.testing)
defer { app.shutdown() }

try self.useDbs(app)

app.queues.add(DelayedJob())
self.app.queues.add(DelayedJob())

app.queues.use(.fluent(self.dbid))
self.app.queues.use(.fluent(self.dbid))

app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()
self.app.migrations.add(JobModelMigration(), to: self.dbid)
try await self.app.autoMigrate()

let jobId = JobIdentifier()
app.get("delay-job") { req in
self.app.get("delay-job") { req in
req.queue.dispatch(DelayedJob.self, .init(name: "vapor"),
delayUntil: Date().addingTimeInterval(3600),
id: jobId)
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "delay-job") { res in
try await self.app.testable().test(.GET, "delay-job") { res async in
XCTAssertEqual(res.status, .ok)
}

await XCTAssertEqualAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertEqualAsync(try await (self.app.databases.database(self.dbid, logger: .init(label: ""), on: self.app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)

try await app.autoRevert()
try await self.app.autoRevert()
}

func testCoverageForFailingQueue() {
let app = Application(.testing)
defer { app.shutdown() }
let queue = FailingQueue(
failure: QueuesFluentError.unsupportedDatabase,
context: .init(queueName: .init(string: ""), configuration: .init(), application: app, logger: .init(label: ""), on: app.eventLoopGroup.any())
context: .init(queueName: .init(string: ""), configuration: .init(), application: self.app, logger: .init(label: ""), on: self.app.eventLoopGroup.any())
)
XCTAssertThrowsError(try queue.get(.init()).wait())
XCTAssertThrowsError(try queue.set(.init(), to: JobData(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: .init())).wait())
Expand All @@ -140,8 +126,16 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertThrowsError(try queue.pop().wait())
}

override func setUp() {
override func setUp() async throws {
XCTAssert(isLoggingConfigured)

self.app = try await Application.make(.testing)
try self.useDbs(self.app)
}

override func tearDown() async throws {
try await self.app.asyncShutdown()
self.app = nil
}
}

Expand Down

0 comments on commit c369658

Please sign in to comment.