Skip to content

Commit

Permalink
Fix MySQL and PostgreSQL (again) (#4)
Browse files Browse the repository at this point in the history
Fixes (with rather ugly workarounds) issues with enums and the blob column. Adds the ability to test against all the drivers more easily.
  • Loading branch information
gwynne authored May 6, 2024
1 parent 5b76c5f commit cad57b4
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ let package = Package(
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
],
targets: [
.target(
Expand All @@ -35,6 +37,8 @@ let package = Package(
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"),
.product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"),
.target(name: "QueuesFluentDriver"),
],
swiftSettings: swiftSettings
Expand Down
4 changes: 4 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ let package = Package(
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.29.2"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
.package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.0"),
.package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"),
],
targets: [
.target(
Expand All @@ -38,6 +40,8 @@ let package = Package(
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"),
.product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"),
.target(name: "QueuesFluentDriver"),
],
swiftSettings: swiftSettings
Expand Down
37 changes: 25 additions & 12 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers
import struct Foundation.Data

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
Expand All @@ -14,24 +15,36 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
self.sqlDb.select()
.columns("payload", "max_retry_count", "job_name", "delay_until", "queued_at", "attempts")
.columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at")
.from(JobModel.schema)
.where("id", .equal, id.string)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing {
try $0.decode(model: JobData.self, keyDecodingStrategy: .convertFromSnakeCase)
try $0.decode(model: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)
}.map {
.init(payload: .init($0.payload), maxRetryCount: $0.maxRetryCount, jobName: $0.jobName, delayUntil: $0.delayUntil, queuedAt: $0.queuedAt)
}
}

// See `Queue.set(_:to:)`.
public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
self.sqlDb.eventLoop.makeFutureWithTask {
try await self.sqlDb.insert(into: JobModel.schema)
.model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
.onConflict { try $0
.set(excludedContentOf: JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
}
.columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at")
.values(
SQLBind(id.string),
SQLBind(self.queueName.string),
SQLBind(jobStorage.jobName),
SQLBind(jobStorage.queuedAt),
SQLBind(jobStorage.delayUntil),
SQLLiteral.string(StoredJobState.pending.rawValue),
SQLBind(jobStorage.maxRetryCount),
SQLBind(jobStorage.attempts),
SQLBind(Data(jobStorage.payload)),
.now()
)
// .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) // because enums!
.run()
}
}
Expand All @@ -41,7 +54,7 @@ public struct FluentQueue: Queue, Sendable {
self.get(id).flatMap { _ in
self.sqlDb.delete(from: JobModel.schema)
.where("id", .equal, id.string)
.where("state", .notEqual, StoredJobState.completed)
.where("state", .notEqual, SQLLiteral.string(StoredJobState.completed.rawValue))
.run()
}
}
Expand All @@ -50,7 +63,7 @@ public struct FluentQueue: Queue, Sendable {
public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.sqlDb
.update(JobModel.schema)
.set("state", to: StoredJobState.pending)
.set("state", to: SQLLiteral.string(StoredJobState.pending.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id.string)
.run()
Expand Down Expand Up @@ -84,7 +97,7 @@ public struct FluentQueue: Queue, Sendable {
.select()
.column("id")
.from(JobModel.schema)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.where("queue_name", .equal, self.queueName.string)
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
Expand All @@ -93,7 +106,7 @@ public struct FluentQueue: Queue, Sendable {

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, .group(select.query))
.returning("id")
Expand All @@ -109,10 +122,10 @@ public struct FluentQueue: Queue, Sendable {

try await transaction
.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.run()

return JobIdentifier(string: id)
Expand Down
5 changes: 3 additions & 2 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import struct Foundation.Date
import struct Foundation.Data
import struct Queues.JobData
import struct Queues.JobIdentifier
import struct Queues.QueueName
Expand Down Expand Up @@ -45,7 +46,7 @@ struct JobModel: Codable, Sendable {
let attempts: Int

/// The job's payload.
let payload: [UInt8]
let payload: Data

/// The standard automatic update tracking timestamp.
let updatedAt: Date
Expand All @@ -59,7 +60,7 @@ struct JobModel: Codable, Sendable {
self.state = .pending
self.maxRetryCount = jobData.maxRetryCount
self.attempts = jobData.attempts ?? 0
self.payload = jobData.payload
self.payload = Data(jobData.payload)
self.updatedAt = .init()
}
}
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public struct JobModelMigration: AsyncSQLMigration {
.value("completed")
.run()
case .inline:
stateEnumType = "enum(\(StoredJobState.allCases.map(\.rawValue).joined(separator: ",")))"
stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')"
default:
stateEnumType = "varchar(16)"
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ struct SQLNow: SQLExpression {
switch serializer.dialect.name {
case "sqlite": // For SQLite, write out the literal string 'now' (see below)
SQLLiteral.string("now").serialize(to: &serializer)
case "postgresql": // For Postgres, "current_timestamp" is a keyword, not a function, so use "now()" instead.
SQLFunction("now").serialize(to: &serializer)
default: // Everywhere else, just call the SQL standard function.
SQLFunction("current_timestamp").serialize(to: &serializer)
}
Expand Down
49 changes: 38 additions & 11 deletions Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,46 @@ import Logging
@testable import QueuesFluentDriver
@preconcurrency import Queues
import FluentSQLiteDriver
import FluentPostgresDriver
import FluentMySQLDriver
import NIOSSL

final class QueuesFluentDriverTests: XCTestCase {
var dbid: DatabaseID { .sqlite }

private func useDbs(_ app: Application) throws {
app.databases.use(.sqlite(.memory), as: .sqlite)
app.databases.use(DatabaseConfigurationFactory.postgres(configuration: .init(
hostname: Environment.get("DATABASE_HOST") ?? "localhost",
port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? SQLPostgresConfiguration.ianaPortNumber,
username: Environment.get("DATABASE_USERNAME") ?? "test_username",
password: Environment.get("DATABASE_PASSWORD") ?? "test_password",
database: Environment.get("DATABASE_NAME") ?? "test_database",
tls: .prefer(try .init(configuration: .clientDefault)))
), as: .psql)
var config = TLSConfiguration.clientDefault
config.certificateVerification = .none
app.databases.use(DatabaseConfigurationFactory.mysql(configuration: .init(
hostname: Environment.get("DATABASE_HOST") ?? "localhost",
port: Environment.get("DATABASE_PORT").flatMap(Int.init(_:)) ?? MySQLConfiguration.ianaPortNumber,
username: Environment.get("DATABASE_USERNAME") ?? "test_username",
password: Environment.get("DATABASE_PASSWORD") ?? "test_password",
database: Environment.get("DATABASE_NAME") ?? "test_database",
tlsConfiguration: config
)), as: .mysql)
}

func testApplication() async throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.migrations.add(JobModelMigration())
try self.useDbs(app)
app.migrations.add(JobModelMigration(), to: self.dbid)

let email = Email()
app.queues.add(email)

app.queues.use(.fluent())
app.queues.use(.fluent(self.dbid))

try await app.autoMigrate()

Expand All @@ -41,10 +68,10 @@ final class QueuesFluentDriverTests: XCTestCase {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
try self.useDbs(app)
app.queues.add(FailingJob())
app.queues.use(.fluent())
app.migrations.add(JobModelMigration())
app.queues.use(.fluent(self.dbid))
app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()

let jobId = JobIdentifier()
Expand All @@ -61,7 +88,7 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssert($0 is FailingJob.Failure)
}

await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertNotNilAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first())

try await app.autoRevert()
Expand All @@ -71,13 +98,13 @@ final class QueuesFluentDriverTests: XCTestCase {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
try self.useDbs(app)

app.queues.add(DelayedJob())

app.queues.use(.fluent())
app.queues.use(.fluent(self.dbid))

app.migrations.add(JobModelMigration())
app.migrations.add(JobModelMigration(), to: self.dbid)
try await app.autoMigrate()

let jobId = JobIdentifier()
Expand All @@ -92,7 +119,7 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertEqualAsync(try await (app.databases.database(self.dbid, logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)

Expand Down

0 comments on commit cad57b4

Please sign in to comment.