From c369658db6d1d2da7f9db563f134f1c12f4d23da Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Sun, 19 May 2024 13:34:07 -0500 Subject: [PATCH] Miscellaneous cleanup (#6) Bump the versions of our dependencies, clean up the docs, and update tests for the Concurrency-related changes in Vapor. --- LICENSE | 2 +- Package.swift | 16 ++-- Package@swift-5.9.swift | 15 ++-- README.md | 32 ++++---- .../Documentation.docc/Documentation.md | 38 ++++++++- Sources/QueuesFluentDriver/FluentQueue.swift | 2 +- .../SQLKit+Convenience.swift | 4 +- .../QueuesFluentDriverTests.swift | 78 +++++++++---------- 8 files changed, 106 insertions(+), 81 deletions(-) diff --git a/LICENSE b/LICENSE index 8c0ad77..73154ae 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2020 Matthieu Barthélemy +Copyright (c) 2024 Gwynne Raskind Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Package.swift b/Package.swift index 3ec78ca..6dcf815 100644 --- a/Package.swift +++ b/Package.swift @@ -10,14 +10,14 @@ let package = Package( .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"), - .package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"), - .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"), - .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"), + .package(url: "https://github.com/vapor/fluent.git", from: "4.10.0"), + .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.48.4"), + .package(url: "https://github.com/vapor/sql-kit.git", from: "3.30.0"), .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), - .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), - .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"), - .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"), + .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.7.1"), + .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.1"), + .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.5.0"), ], targets: [ .target( @@ -50,6 +50,4 @@ var swiftSettings: [SwiftSetting] { [ .enableUpcomingFeature("ForwardTrailingClosures"), .enableUpcomingFeature("ConciseMagicFile"), .enableUpcomingFeature("DisableOutwardActorInference"), - .enableUpcomingFeature("StrictConcurrency"), - .enableExperimentalFeature("StrictConcurrency=complete"), ] } diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index 9e9eb8e..3cc4c19 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -13,14 +13,14 @@ let package = Package( .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.92.1"), - .package(url: "https://github.com/vapor/fluent.git", from: "4.9.0"), - .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"), - .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"), + .package(url: "https://github.com/vapor/fluent.git", from: "4.10.0"), + .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.48.4"), + .package(url: "https://github.com/vapor/sql-kit.git", from: "3.30.0"), .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), - .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), - .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"), - .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"), + .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.7.1"), + .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.1"), + .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.5.0"), ], targets: [ .target( @@ -54,6 +54,5 @@ var swiftSettings: [SwiftSetting] { [ .enableUpcomingFeature("ExistentialAny"), .enableUpcomingFeature("ConciseMagicFile"), .enableUpcomingFeature("DisableOutwardActorInference"), - .enableUpcomingFeature("StrictConcurrency"), .enableExperimentalFeature("StrictConcurrency=complete"), ] } diff --git a/README.md b/README.md index 397b388..755d391 100644 --- a/README.md +++ b/README.md @@ -9,17 +9,18 @@ A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database. This package makes use of the `SKIP LOCKED` feature supported by some of the major database engines (most notably [PostgresSQL][postgres-skip-locked] and [MySQL][mysql-skip-locked]) when available to make a best-effort guarantee that a task or job won't be picked by multiple workers. -This package should be compatible with: +This package should be compatible with any SQL database supported by the various Fluent drivers. It is specifically known to work with: - PostgreSQL 11.0+ -- MySQL 8.0+ +- MySQL 5.7+ - MariaDB 10.5+ +- SQLite -> [!NOTE] +> [!WARNING] > Although SQLite can be used with this package, SQLite has no support for advanced locking. It is not likely to function correctly with more than one or two queue workers. [postgres-skip-locked]: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE -[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.3/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a +[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.4/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a ## Getting started @@ -29,7 +30,7 @@ Add `QueuesFluentDriver` as dependency to your `Package.swift`: ```swift dependencies: [ - .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"), + .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.4"), ... ] ``` @@ -62,26 +63,25 @@ app.queues.use(.fluent()) ## Options ### Using a custom Database -You can optionally create a dedicated Database, set to `isdefault: false` and with a custom `DatabaseID` and use it for your Queues. -In that case you would initialize the Queues configuration like this: -```swift -let queuesDb = DatabaseID(string: "my_queues_db") -app.databases.use(.postgres(configuration: dbConfig), as: queuesDb, isDefault: false) -app.queues.use(.fluent(queuesDb)) -``` +You can optionally create a dedicated non-default `Database` with a custom `DatabaseID` for use with your queues, as in the following example: -### Customizing the jobs table name -You can customize the name of the table used by this driver during the migration : ```swift -app.migrations.add(JobMetadataMigrate(schema: "my_jobs")) +extension DatabaseID { + static var queues: Self { .init(string: "my_queues_db") } +} + +func configure(_ app: Application) async throws { + app.databases.use(.postgres(configuration: ...), as: .queues, isDefault: false) + app.queues.use(.fluent(.queues)) +} ``` ## Caveats ### Polling interval and number of workers -By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. +By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. Additionally, when using SQLite as the underlying database it is generally inadvisable to run more than one worker at a time, as SQLite does not have the . The polling interval can be changed using the `refreshInterval` configuration setting: diff --git a/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md b/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md index 8590d54..f5ec9bf 100644 --- a/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md +++ b/Sources/QueuesFluentDriver/Documentation.docc/Documentation.md @@ -9,7 +9,22 @@ A driver for [Queues]. Uses [Fluent] to store job metadata in an SQL database. [Queues]: https://github.com/vapor/queues [Fluent]: https://github.com/vapor/fluent -## Overview +## Compatibility + +This package makes use of the `SKIP LOCKED` feature supported by some of the major database engines (most notably [PostgresSQL][postgres-skip-locked] and [MySQL][mysql-skip-locked]) when available to make a best-effort guarantee that a task or job won't be picked by multiple workers. + +This package should be compatible with any SQL database supported by the various Fluent drivers. It is specifically known to work with: + +- PostgreSQL 11.0+ +- MySQL 5.7+ +- MariaDB 10.5+ +- SQLite + +> [!WARNING] +> Although SQLite can be used with this package, SQLite has no support for advanced locking. It is not likely to function correctly with more than one or two queue workers. + +[postgres-skip-locked]: https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE +[mysql-skip-locked]: https://dev.mysql.com/doc/refman/8.4/en/select.html#:~:text=SKIP%20LOCKED%20causes%20a ## Getting started @@ -19,7 +34,7 @@ Add `QueuesFluentDriver` as dependency to your `Package.swift`: ```swift dependencies: [ - .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.2"), + .package(url: "https://github.com/vapor-community/vapor-queues-fluent-driver.git", from: "3.0.0-beta.4"), ... ] ``` @@ -52,11 +67,28 @@ app.queues.use(.fluent()) > Warning: Always call `app.databases.use(...)` **before** calling `app.queues.use(.fluent())`! +## Options + +### Using a custom Database + +You can optionally create a dedicated non-default `Database` with a custom `DatabaseID` for use with your queues, as in the following example: + +```swift +extension DatabaseID { + static var queues: Self { .init(string: "my_queues_db") } +} + +func configure(_ app: Application) async throws { + app.databases.use(.postgres(configuration: ...), as: .queues, isDefault: false) + app.queues.use(.fluent(.queues)) +} +``` + ## Caveats ### Polling interval and number of workers -By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. +By default, the Vapor Queues system starts 2 workers per available CPU core, with each worker would polling the database once per second. On a 4-core system, this would results in 8 workers querying the database every second. Most configurations do not need this many workers. Additionally, when using SQLite as the underlying database it is generally inadvisable to run more than one worker at a time, as SQLite does not have the necessary support for cross-connection locking. The polling interval can be changed using the `refreshInterval` configuration setting: diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 48c7d83..9d49428 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -1,5 +1,5 @@ @preconcurrency import Queues -@preconcurrency import SQLKit +import SQLKit import NIOConcurrencyHelpers import struct Foundation.Data diff --git a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift index e90bb34..7b5ffc8 100644 --- a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift +++ b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift @@ -36,9 +36,11 @@ struct SQLDateValue: SQLExpression { } } -/// An alternative of `SQLLockingClause` which specifies the `SKIP LOCKED` modifier when the underlying database +/// An alternative to `SQLLockingClause` which specifies the `SKIP LOCKED` modifier when the underlying database /// supports it. As MySQL's and PostgreSQL's manuals both note, this should not be used except in very specific /// scenarios, such as that of this package. +/// +/// It is safe to use this expression with SQLite; its dialect correctly denies support for locking expressions. enum SQLLockingClauseWithSkipLocked: SQLExpression { /// Request an exclusive "writer" lock, skipping rows that are already locked. case updateSkippingLocked diff --git a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift index 2caeeb8..726cc0c 100644 --- a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift +++ b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift @@ -11,6 +11,7 @@ import NIOSSL final class QueuesFluentDriverTests: XCTestCase { var dbid: DatabaseID { .sqlite } + var app: Application! private func useDbs(_ app: Application) throws { app.databases.use(.sqlite(.memory), as: .sqlite) @@ -35,103 +36,88 @@ final class QueuesFluentDriverTests: XCTestCase { } func testApplication() async throws { - let app = Application(.testing) - defer { app.shutdown() } - - try self.useDbs(app) - app.migrations.add(JobModelMigration(), to: self.dbid) + self.app.migrations.add(JobModelMigration(), to: self.dbid) let email = Email() - app.queues.add(email) + self.app.queues.add(email) - app.queues.use(.fluent(self.dbid)) + self.app.queues.use(.fluent(self.dbid)) - try await app.autoMigrate() + try await self.app.autoMigrate() - app.get("send-email") { req in + self.app.get("send-email") { req in req.queue.dispatch(Email.self, .init(to: "tanner@vapor.codes")) .map { HTTPStatus.ok } } - try app.testable().test(.GET, "send-email") { res in + try await self.app.testable().test(.GET, "send-email") { res async in XCTAssertEqual(res.status, .ok) } XCTAssertEqual(email.sent, []) - try await app.queues.queue.worker.run().get() + try await self.app.queues.queue.worker.run().get() XCTAssertEqual(email.sent, [.init(to: "tanner@vapor.codes")]) - try await app.autoRevert() + try await self.app.autoRevert() } func testFailedJobLoss() async throws { - let app = Application(.testing) - defer { app.shutdown() } - - try self.useDbs(app) - app.queues.add(FailingJob()) - app.queues.use(.fluent(self.dbid)) - app.migrations.add(JobModelMigration(), to: self.dbid) - try await app.autoMigrate() + self.app.queues.add(FailingJob()) + self.app.queues.use(.fluent(self.dbid)) + self.app.migrations.add(JobModelMigration(), to: self.dbid) + try await self.app.autoMigrate() let jobId = JobIdentifier() - app.get("test") { req in + self.app.get("test") { req in req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobId) .map { HTTPStatus.ok } } - try app.testable().test(.GET, "test") { res in + try await self.app.testable().test(.GET, "test") { res async in XCTAssertEqual(res.status, .ok) } - await XCTAssertThrowsErrorAsync(try await app.queues.queue.worker.run().get()) { + await XCTAssertThrowsErrorAsync(try await self.app.queues.queue.worker.run().get()) { XCTAssert($0 is FailingJob.Failure) } - await XCTAssertNotNilAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + await XCTAssertNotNilAsync(try await (self.app.databases.database(self.dbid, logger: .init(label: ""), on: self.app.eventLoopGroup.any())! as! any SQLDatabase) .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first()) - try await app.autoRevert() + try await self.app.autoRevert() } func testDelayedJobIsRemovedFromProcessingQueue() async throws { - let app = Application(.testing) - defer { app.shutdown() } - - try self.useDbs(app) - - app.queues.add(DelayedJob()) + self.app.queues.add(DelayedJob()) - app.queues.use(.fluent(self.dbid)) + self.app.queues.use(.fluent(self.dbid)) - app.migrations.add(JobModelMigration(), to: self.dbid) - try await app.autoMigrate() + self.app.migrations.add(JobModelMigration(), to: self.dbid) + try await self.app.autoMigrate() let jobId = JobIdentifier() - app.get("delay-job") { req in + self.app.get("delay-job") { req in req.queue.dispatch(DelayedJob.self, .init(name: "vapor"), delayUntil: Date().addingTimeInterval(3600), id: jobId) .map { HTTPStatus.ok } } - try app.testable().test(.GET, "delay-job") { res in + try await self.app.testable().test(.GET, "delay-job") { res async in XCTAssertEqual(res.status, .ok) } - await XCTAssertEqualAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + await XCTAssertEqualAsync(try await (self.app.databases.database(self.dbid, logger: .init(label: ""), on: self.app.eventLoopGroup.any())! as! any SQLDatabase) .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string) .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending) - try await app.autoRevert() + try await self.app.autoRevert() } func testCoverageForFailingQueue() { - let app = Application(.testing) - defer { app.shutdown() } let queue = FailingQueue( failure: QueuesFluentError.unsupportedDatabase, - context: .init(queueName: .init(string: ""), configuration: .init(), application: app, logger: .init(label: ""), on: app.eventLoopGroup.any()) + context: .init(queueName: .init(string: ""), configuration: .init(), application: self.app, logger: .init(label: ""), on: self.app.eventLoopGroup.any()) ) XCTAssertThrowsError(try queue.get(.init()).wait()) XCTAssertThrowsError(try queue.set(.init(), to: JobData(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: .init())).wait()) @@ -140,8 +126,16 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssertThrowsError(try queue.pop().wait()) } - override func setUp() { + override func setUp() async throws { XCTAssert(isLoggingConfigured) + + self.app = try await Application.make(.testing) + try self.useDbs(self.app) + } + + override func tearDown() async throws { + try await self.app.asyncShutdown() + self.app = nil } }