From cad57b4d64114d1ccfb1b9801149ecfcd68b3598 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Mon, 6 May 2024 14:46:15 -0500 Subject: [PATCH] Fix MySQL and PostgreSQL (again) (#4) Fixes (with rather ugly workarounds) issues with enums and the blob column. Adds the ability to test against all the drivers more easily. --- Package.swift | 4 ++ Package@swift-5.9.swift | 4 ++ Sources/QueuesFluentDriver/FluentQueue.swift | 37 +++++++++----- Sources/QueuesFluentDriver/JobModel.swift | 5 +- .../QueuesFluentDriver/JobModelMigrate.swift | 2 +- .../SQLKit+Convenience.swift | 2 + .../QueuesFluentDriverTests.swift | 49 ++++++++++++++----- 7 files changed, 77 insertions(+), 26 deletions(-) diff --git a/Package.swift b/Package.swift index 49c4223..3ec78ca 100644 --- a/Package.swift +++ b/Package.swift @@ -16,6 +16,8 @@ let package = Package( .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), + .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"), + .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"), ], targets: [ .target( @@ -35,6 +37,8 @@ let package = Package( dependencies: [ .product(name: "XCTVapor", package: "vapor"), .product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"), + .product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"), + .product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"), .target(name: "QueuesFluentDriver"), ], swiftSettings: swiftSettings diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift index f2d7c78..9e9eb8e 100644 --- a/Package@swift-5.9.swift +++ b/Package@swift-5.9.swift @@ -19,6 +19,8 @@ let package = Package( .package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"), .package(url: "https://github.com/vapor/queues.git", from: "1.13.0"), .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"), + .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"), + .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"), ], targets: [ .target( @@ -38,6 +40,8 @@ let package = Package( dependencies: [ .product(name: "XCTVapor", package: "vapor"), .product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"), + .product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"), + .product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"), .target(name: "QueuesFluentDriver"), ], swiftSettings: swiftSettings diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 58b74ee..48c7d83 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -1,6 +1,7 @@ @preconcurrency import Queues @preconcurrency import SQLKit import NIOConcurrencyHelpers +import struct Foundation.Data /// An implementation of `Queue` which stores job data and metadata in a Fluent database. public struct FluentQueue: Queue, Sendable { @@ -14,13 +15,15 @@ public struct FluentQueue: Queue, Sendable { // See `Queue.get(_:)`. public func get(_ id: JobIdentifier) -> EventLoopFuture { self.sqlDb.select() - .columns("payload", "max_retry_count", "job_name", "delay_until", "queued_at", "attempts") + .columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at") .from(JobModel.schema) .where("id", .equal, id.string) .first() .unwrap(or: QueuesFluentError.missingJob(id)) .flatMapThrowing { - try $0.decode(model: JobData.self, keyDecodingStrategy: .convertFromSnakeCase) + try $0.decode(model: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase) + }.map { + .init(payload: .init($0.payload), maxRetryCount: $0.maxRetryCount, jobName: $0.jobName, delayUntil: $0.delayUntil, queuedAt: $0.queuedAt) } } @@ -28,10 +31,20 @@ public struct FluentQueue: Queue, Sendable { public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture { self.sqlDb.eventLoop.makeFutureWithTask { try await self.sqlDb.insert(into: JobModel.schema) - .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) - .onConflict { try $0 - .set(excludedContentOf: JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) - } + .columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at") + .values( + SQLBind(id.string), + SQLBind(self.queueName.string), + SQLBind(jobStorage.jobName), + SQLBind(jobStorage.queuedAt), + SQLBind(jobStorage.delayUntil), + SQLLiteral.string(StoredJobState.pending.rawValue), + SQLBind(jobStorage.maxRetryCount), + SQLBind(jobStorage.attempts), + SQLBind(Data(jobStorage.payload)), + .now() + ) + // .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) // because enums! .run() } } @@ -41,7 +54,7 @@ public struct FluentQueue: Queue, Sendable { self.get(id).flatMap { _ in self.sqlDb.delete(from: JobModel.schema) .where("id", .equal, id.string) - .where("state", .notEqual, StoredJobState.completed) + .where("state", .notEqual, SQLLiteral.string(StoredJobState.completed.rawValue)) .run() } } @@ -50,7 +63,7 @@ public struct FluentQueue: Queue, Sendable { public func push(_ id: JobIdentifier) -> EventLoopFuture { self.sqlDb .update(JobModel.schema) - .set("state", to: StoredJobState.pending) + .set("state", to: SQLLiteral.string(StoredJobState.pending.rawValue)) .set("updated_at", to: .now()) .where("id", .equal, id.string) .run() @@ -84,7 +97,7 @@ public struct FluentQueue: Queue, Sendable { .select() .column("id") .from(JobModel.schema) - .where("state", .equal, StoredJobState.pending) + .where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue)) .where("queue_name", .equal, self.queueName.string) .where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now()) .orderBy("delay_until") @@ -93,7 +106,7 @@ public struct FluentQueue: Queue, Sendable { if self.sqlDb.dialect.supportsReturning { return try await self.sqlDb.update(JobModel.schema) - .set("state", to: StoredJobState.processing) + .set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue)) .set("updated_at", to: .now()) .where("id", .equal, .group(select.query)) .returning("id") @@ -109,10 +122,10 @@ public struct FluentQueue: Queue, Sendable { try await transaction .update(JobModel.schema) - .set("state", to: StoredJobState.processing) + .set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue)) .set("updated_at", to: .now()) .where("id", .equal, id) - .where("state", .equal, StoredJobState.pending) + .where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue)) .run() return JobIdentifier(string: id) diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index e01566f..373660d 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -1,4 +1,5 @@ import struct Foundation.Date +import struct Foundation.Data import struct Queues.JobData import struct Queues.JobIdentifier import struct Queues.QueueName @@ -45,7 +46,7 @@ struct JobModel: Codable, Sendable { let attempts: Int /// The job's payload. - let payload: [UInt8] + let payload: Data /// The standard automatic update tracking timestamp. let updatedAt: Date @@ -59,7 +60,7 @@ struct JobModel: Codable, Sendable { self.state = .pending self.maxRetryCount = jobData.maxRetryCount self.attempts = jobData.attempts ?? 0 - self.payload = jobData.payload + self.payload = Data(jobData.payload) self.updatedAt = .init() } } diff --git a/Sources/QueuesFluentDriver/JobModelMigrate.swift b/Sources/QueuesFluentDriver/JobModelMigrate.swift index 690c1da..70565a4 100644 --- a/Sources/QueuesFluentDriver/JobModelMigrate.swift +++ b/Sources/QueuesFluentDriver/JobModelMigrate.swift @@ -18,7 +18,7 @@ public struct JobModelMigration: AsyncSQLMigration { .value("completed") .run() case .inline: - stateEnumType = "enum(\(StoredJobState.allCases.map(\.rawValue).joined(separator: ",")))" + stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')" default: stateEnumType = "varchar(16)" } diff --git a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift index 42ed2bc..e90bb34 100644 --- a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift +++ b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift @@ -9,6 +9,8 @@ struct SQLNow: SQLExpression { switch serializer.dialect.name { case "sqlite": // For SQLite, write out the literal string 'now' (see below) SQLLiteral.string("now").serialize(to: &serializer) + case "postgresql": // For Postgres, "current_timestamp" is a keyword, not a function, so use "now()" instead. + SQLFunction("now").serialize(to: &serializer) default: // Everywhere else, just call the SQL standard function. SQLFunction("current_timestamp").serialize(to: &serializer) } diff --git a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift index b469b6b..2caeeb8 100644 --- a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift +++ b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift @@ -5,19 +5,46 @@ import Logging @testable import QueuesFluentDriver @preconcurrency import Queues import FluentSQLiteDriver +import FluentPostgresDriver +import FluentMySQLDriver +import NIOSSL final class QueuesFluentDriverTests: XCTestCase { + var dbid: DatabaseID { .sqlite } + + private func useDbs(_ app: Application) throws { + app.databases.use(.sqlite(.memory), as: .sqlite) + app.databases.use(DatabaseConfigurationFactory.postgres(configuration: .init( + hostname: Environment.get("DATABASE_HOST") ?? "localhost", + port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? SQLPostgresConfiguration.ianaPortNumber, + username: Environment.get("DATABASE_USERNAME") ?? "test_username", + password: Environment.get("DATABASE_PASSWORD") ?? "test_password", + database: Environment.get("DATABASE_NAME") ?? "test_database", + tls: .prefer(try .init(configuration: .clientDefault))) + ), as: .psql) + var config = TLSConfiguration.clientDefault + config.certificateVerification = .none + app.databases.use(DatabaseConfigurationFactory.mysql(configuration: .init( + hostname: Environment.get("DATABASE_HOST") ?? "localhost", + port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? MySQLConfiguration.ianaPortNumber, + username: Environment.get("DATABASE_USERNAME") ?? "test_username", + password: Environment.get("DATABASE_PASSWORD") ?? "test_password", + database: Environment.get("DATABASE_NAME") ?? "test_database", + tlsConfiguration: config + )), as: .mysql) + } + func testApplication() async throws { let app = Application(.testing) defer { app.shutdown() } - app.databases.use(.sqlite(.memory), as: .sqlite) - app.migrations.add(JobModelMigration()) + try self.useDbs(app) + app.migrations.add(JobModelMigration(), to: self.dbid) let email = Email() app.queues.add(email) - app.queues.use(.fluent()) + app.queues.use(.fluent(self.dbid)) try await app.autoMigrate() @@ -41,10 +68,10 @@ final class QueuesFluentDriverTests: XCTestCase { let app = Application(.testing) defer { app.shutdown() } - app.databases.use(.sqlite(.memory), as: .sqlite) + try self.useDbs(app) app.queues.add(FailingJob()) - app.queues.use(.fluent()) - app.migrations.add(JobModelMigration()) + app.queues.use(.fluent(self.dbid)) + app.migrations.add(JobModelMigration(), to: self.dbid) try await app.autoMigrate() let jobId = JobIdentifier() @@ -61,7 +88,7 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssert($0 is FailingJob.Failure) } - await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + await XCTAssertNotNilAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first()) try await app.autoRevert() @@ -71,13 +98,13 @@ final class QueuesFluentDriverTests: XCTestCase { let app = Application(.testing) defer { app.shutdown() } - app.databases.use(.sqlite(.memory), as: .sqlite) + try self.useDbs(app) app.queues.add(DelayedJob()) - app.queues.use(.fluent()) + app.queues.use(.fluent(self.dbid)) - app.migrations.add(JobModelMigration()) + app.migrations.add(JobModelMigration(), to: self.dbid) try await app.autoMigrate() let jobId = JobIdentifier() @@ -92,7 +119,7 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssertEqual(res.status, .ok) } - await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) + await XCTAssertEqualAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase) .select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string) .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)