Skip to content

Commit

Permalink
Support MySQL 5.7 (#3)
Browse files Browse the repository at this point in the history
MySQL before 8.0 (why is anyone still using 5.7? 😭) didn't support `SKIP
LOCKED`. Also has a temporary fix for a bug in PostgresKit.
  • Loading branch information
gwynne authored May 2, 2024
1 parent 199bd0e commit 5b76c5f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
26 changes: 25 additions & 1 deletion Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
// See `Queue.context`.
public let context: QueueContext

let sqlDb: any SQLDatabase

let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable`

// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
Expand Down Expand Up @@ -56,6 +59,27 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.pop()`.
public func pop() -> EventLoopFuture<JobIdentifier?> {
self.sqlDb.eventLoop.makeFutureWithTask {
// Special case: For MySQL < 8.0, we can't use `SKIP LOCKED`. This is a really hackneyed solution,
// but we need to execute a database query to get the version information, `makeQueue(with:)`
// is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time
// of this writing.
if self._sqlLockingClause.withLockedValue({ $0 }) == nil {
switch self.sqlDb.dialect.name {
case "mysql":
let version = try await self.sqlDb.select()
.column(SQLFunction("version"), as: "version")
.first(decodingColumn: "version", as: String.self) ?? "" // always returns one row
// This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x.
if version.starts(with: "5.") || !(version.first?.isNumber ?? false) {
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClause.update }
} else {
fallthrough
}
default:
self._sqlLockingClause.withLockedValue { $0 = SQLLockingClauseWithSkipLocked.updateSkippingLocked }
}
}

let select = self.sqlDb
.select()
.column("id")
Expand All @@ -65,7 +89,7 @@ public struct FluentQueue: Queue, Sendable {
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
.limit(1)
.lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked)
.lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public struct FluentQueuesDriver: QueuesDriver {
public func shutdown() {}
}

private struct FailingQueue: Queue {
/*private*/ struct FailingQueue: Queue {
let failure: any Error
let context: QueueContext

Expand Down
2 changes: 0 additions & 2 deletions Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ extension SQLExpression {

static func now() -> Self where Self == SQLDateValue<SQLNow> { .now() }

static func bind(_ value: some Encodable & Sendable) -> Self where Self == SQLBind { .init(value) }

static func function(_ name: String, _ args: any SQLExpression...) -> Self where Self == SQLFunction { .init(name, args: args) }

static func group(_ expr: some SQLExpression) -> Self where Self == SQLGroupExpression { .init(expr) }
Expand Down
85 changes: 73 additions & 12 deletions Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Logging
import FluentSQLiteDriver

final class QueuesFluentDriverTests: XCTestCase {
func testApplication() throws {
func testApplication() async throws {
let app = Application(.testing)
defer { app.shutdown() }

Expand All @@ -19,7 +19,7 @@ final class QueuesFluentDriverTests: XCTestCase {

app.queues.use(.fluent())

try app.autoMigrate().wait()
try await app.autoMigrate()

app.get("send-email") { req in
req.queue.dispatch(Email.self, .init(to: "[email protected]"))
Expand All @@ -31,19 +31,21 @@ final class QueuesFluentDriverTests: XCTestCase {
}

XCTAssertEqual(email.sent, [])
try app.queues.queue.worker.run().wait()
try await app.queues.queue.worker.run().get()
XCTAssertEqual(email.sent, [.init(to: "[email protected]")])

try await app.autoRevert()
}

func testFailedJobLoss() throws {
func testFailedJobLoss() async throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.queues.add(FailingJob())
app.queues.use(.fluent())
app.migrations.add(JobModelMigration())
try app.autoMigrate().wait()
try await app.autoMigrate()

let jobId = JobIdentifier()
app.get("test") { req in
Expand All @@ -55,15 +57,17 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

XCTAssertThrowsError(try app.queues.queue.worker.run().wait()) {
await XCTAssertThrowsErrorAsync(try await app.queues.queue.worker.run().get()) {
XCTAssert($0 is FailingJob.Failure)
}

XCTAssertNotNil(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first().wait())
await XCTAssertNotNilAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string).first())

try await app.autoRevert()
}

func testDelayedJobIsRemovedFromProcessingQueue() throws {
func testDelayedJobIsRemovedFromProcessingQueue() async throws {
let app = Application(.testing)
defer { app.shutdown() }

Expand All @@ -74,7 +78,7 @@ final class QueuesFluentDriverTests: XCTestCase {
app.queues.use(.fluent())

app.migrations.add(JobModelMigration())
try app.autoMigrate().wait()
try await app.autoMigrate()

let jobId = JobIdentifier()
app.get("delay-job") { req in
Expand All @@ -88,9 +92,25 @@ final class QueuesFluentDriverTests: XCTestCase {
XCTAssertEqual(res.status, .ok)
}

XCTAssertEqual(try (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
await XCTAssertEqualAsync(try await (app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())! as! any SQLDatabase)
.select().columns("*").from(JobModel.schema).where("id", .equal, jobId.string)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase).wait()?.state, .pending)
.first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending)

try await app.autoRevert()
}

func testCoverageForFailingQueue() {
let app = Application(.testing)
defer { app.shutdown() }
let queue = FailingQueue(
failure: QueuesFluentError.unsupportedDatabase,
context: .init(queueName: .init(string: ""), configuration: .init(), application: app, logger: .init(label: ""), on: app.eventLoopGroup.any())
)
XCTAssertThrowsError(try queue.get(.init()).wait())
XCTAssertThrowsError(try queue.set(.init(), to: JobData(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: .init())).wait())
XCTAssertThrowsError(try queue.clear(.init()).wait())
XCTAssertThrowsError(try queue.push(.init()).wait())
XCTAssertThrowsError(try queue.pop().wait())
}

override func setUp() {
Expand Down Expand Up @@ -135,6 +155,47 @@ struct FailingJob: Job {
}
}

func XCTAssertEqualAsync<T>(
_ expression1: @autoclosure () async throws -> T,
_ expression2: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line
) async where T: Equatable {
do {
let expr1 = try await expression1(), expr2 = try await expression2()
return XCTAssertEqual(expr1, expr2, message(), file: file, line: line)
} catch {
return XCTAssertEqual(try { () -> Bool in throw error }(), false, message(), file: file, line: line)
}
}

func XCTAssertThrowsErrorAsync<T>(
_ expression: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line,
_ callback: (any Error) -> Void = { _ in }
) async {
do {
_ = try await expression()
XCTAssertThrowsError({}(), message(), file: file, line: line, callback)
} catch {
XCTAssertThrowsError(try { throw error }(), message(), file: file, line: line, callback)
}
}

func XCTAssertNotNilAsync(
_ expression: @autoclosure () async throws -> Any?,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath, line: UInt = #line
) async {
do {
let result = try await expression()
XCTAssertNotNil(result, message(), file: file, line: line)
} catch {
return XCTAssertNotNil(try { throw error }(), message(), file: file, line: line)
}
}

func env(_ name: String) -> String? {
return ProcessInfo.processInfo.environment[name]
}
Expand Down

0 comments on commit 5b76c5f

Please sign in to comment.