Skip to content

Commit

Permalink
Add a couple of tests adapted from the Redis driver and fix the bugs …
Browse files Browse the repository at this point in the history
…they revealed
  • Loading branch information
gwynne committed Feb 3, 2024
1 parent c4a0fd9 commit 225b98c
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 11 deletions.
3 changes: 3 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let package = Package(
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.28.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
],
targets: [
.target(
Expand All @@ -31,6 +32,8 @@ let package = Package(
.testTarget(
name: "QueuesFluentDriverTests",
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.target(name: "QueuesFluentDriver"),
]
),
Expand Down
3 changes: 3 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ let package = Package(
.package(url: "https://github.com/vapor/fluent-kit.git", from: "1.45.1"),
.package(url: "https://github.com/vapor/sql-kit.git", from: "3.28.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.13.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.6.0"),
],
targets: [
.target(
Expand All @@ -35,6 +36,8 @@ let package = Package(
.testTarget(
name: "QueuesFluentDriverTests",
dependencies: [
.product(name: "XCTVapor", package: "vapor"),
.product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"),
.target(name: "QueuesFluentDriver"),
],
swiftSettings: swiftSettings
Expand Down
12 changes: 6 additions & 6 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public struct FluentQueue: Queue, Sendable {
public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.sqlDb
.update(JobModel.sqlTable)
.set(JobModel.sqlColumn(\.$state), to: SQLBind(QueuesFluentJobState.pending))
.set(JobModel.sqlColumnName(\.$state), to: SQLBind(QueuesFluentJobState.pending))
.where(JobModel.sqlColumn(\.$id), .equal, SQLBind(id.string))
.run()
}
Expand All @@ -82,15 +82,15 @@ public struct FluentQueue: Queue, Sendable {
.from(JobModel.sqlTable)
.where(JobModel.sqlColumn(\.$state), .equal, SQLBind(QueuesFluentJobState.pending))
.where(JobModel.sqlColumn(\.$queue), .equal, SQLBind(self.queueName.string))
.where(SQLFunction("coalesce", args: JobModel.sqlColumn(\.$data.$delayUntil), SQLFunction("now")), .lessThanOrEqual, SQLFunction("now"))
.where(SQLFunction("coalesce", args: JobModel.sqlColumn(\.$data.$delayUntil), SQLFunction(self.sqlDb.dialect.name == "sqlite" ? "datetime" : "now")), .lessThanOrEqual, SQLFunction(self.sqlDb.dialect.name == "sqlite" ? "datetime" : "now"))
.orderBy(JobModel.sqlColumn(\.$data.$delayUntil))
.limit(1)
.lockingClause(SQLLockingClauseWithSkipLocked.updateSkippingLocked)

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.sqlTable)
.set(JobModel.sqlColumn(\.$state), to: SQLBind(QueuesFluentJobState.processing))
.set(JobModel.sqlColumn(\.$updatedAt), to: SQLFunction("now"))
.set(JobModel.sqlColumnName(\.$state), to: SQLBind(QueuesFluentJobState.processing))
.set(JobModel.sqlColumnName(\.$updatedAt), to: SQLFunction(self.sqlDb.dialect.name == "sqlite" ? "datetime" : "now"))
.where(JobModel.sqlColumn(\.$id), .equal, SQLGroupExpression(select.query))
.returning(JobModel.sqlColumn(\.$id))
.first(decodingColumn: "\(JobModel.key(for: \.$id))", as: String.self)
Expand All @@ -107,8 +107,8 @@ public struct FluentQueue: Queue, Sendable {

try await database
.update(JobModel.sqlTable)
.set(JobModel.sqlColumn(\.$state), to: SQLBind(QueuesFluentJobState.processing))
.set(JobModel.sqlColumn(\.$updatedAt), to: SQLFunction("now"))
.set(JobModel.sqlColumnName(\.$state), to: SQLBind(QueuesFluentJobState.processing))
.set(JobModel.sqlColumnName(\.$updatedAt), to: SQLFunction(self.sqlDb.dialect.name == "sqlite" ? "datetime" : "now"))
.where(JobModel.sqlColumn(\.$id), .equal, SQLBind(id))
.where(JobModel.sqlColumn(\.$state), .equal, SQLBind(QueuesFluentJobState.pending))
.run()
Expand Down
6 changes: 3 additions & 3 deletions Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public struct JobMetadataMigrate: AsyncMigration {
try await (database as! any SQLDatabase)
.create(index: "i_\(JobModel.schema)_\(JobModel.key(for: \.$state))_\(JobModel.key(for: \.$queue))")
.on(JobModel.sqlTable)
.column(JobModel.sqlColumn(\.$state))
.column(JobModel.sqlColumn(\.$queue))
.column(JobModel.sqlColumn(\.$data.$delayUntil))
.column(JobModel.sqlColumnName(\.$state))
.column(JobModel.sqlColumnName(\.$queue))
.column(JobModel.sqlColumnName(\.$data.$delayUntil))
.run()
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/Model+SQLColumn.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ extension FluentKit.Fields {
static func key<P>(for keypath: KeyPath<Self, P>) -> FieldKey
where P: QueryAddressableProperty
{
Self.path(for: keypath.appending(path: \.queryableProperty))[0]
Self.init()[keyPath: keypath].queryablePath[0]
}

static func sqlColumnName<P>(_ keypath: KeyPath<Self, P>) -> SQLIdentifier
Expand Down
145 changes: 144 additions & 1 deletion Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,148 @@
import XCTest
import QueuesFluentDriver
import XCTVapor
import FluentKit
import Logging
@testable import QueuesFluentDriver
@preconcurrency import Queues
import FluentSQLiteDriver

final class QueuesFluentDriverTests: XCTestCase {
func testApplication() throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.migrations.add(JobMetadataMigrate())

let email = Email()
app.queues.add(email)

app.queues.use(.fluent())

try app.autoMigrate().wait()

app.get("send-email") { req in
req.queue.dispatch(Email.self, .init(to: "[email protected]"))
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "send-email") { res in
XCTAssertEqual(res.status, .ok)
}

XCTAssertEqual(email.sent, [])
try app.queues.queue.worker.run().wait()
XCTAssertEqual(email.sent, [.init(to: "[email protected]")])
}

func testFailedJobLoss() throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)
app.queues.add(FailingJob())
app.queues.use(.fluent())
app.migrations.add(JobMetadataMigrate())
try app.autoMigrate().wait()

let jobId = JobIdentifier()
app.get("test") { req in
req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobId)
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "test") { res in
XCTAssertEqual(res.status, .ok)
}

XCTAssertThrowsError(try app.queues.queue.worker.run().wait()) {
XCTAssert($0 is FailingJob.Failure)
}

XCTAssertNotNil(try app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())!
.query(JobModel.self).filter(\.$id == jobId.string).first().wait())
}

func testDelayedJobIsRemovedFromProcessingQueue() throws {
let app = Application(.testing)
defer { app.shutdown() }

app.databases.use(.sqlite(.memory), as: .sqlite)

app.queues.add(DelayedJob())

app.queues.use(.fluent())

app.migrations.add(JobMetadataMigrate())
try app.autoMigrate().wait()

let jobId = JobIdentifier()
app.get("delay-job") { req in
req.queue.dispatch(DelayedJob.self, .init(name: "vapor"),
delayUntil: Date().addingTimeInterval(3600),
id: jobId)
.map { HTTPStatus.ok }
}

try app.testable().test(.GET, "delay-job") { res in
XCTAssertEqual(res.status, .ok)
}

XCTAssertEqual(try app.databases.database(logger: .init(label: ""), on: app.eventLoopGroup.any())!
.query(JobModel.self).filter(\.$id == jobId.string).first().wait()?.state, .pending)
}

override func setUp() {
XCTAssert(isLoggingConfigured)
}
}

final class Email: Job {
struct Message: Codable, Equatable {
let to: String
}

var sent: [Message] = []

func dequeue(_ context: QueueContext, _ message: Message) -> EventLoopFuture<Void> {
self.sent.append(message)
context.logger.info("sending email \(message)")
return context.eventLoop.makeSucceededFuture(())
}
}

final class DelayedJob: Job {
struct Message: Codable, Equatable {
let name: String
}

func dequeue(_ context: QueueContext, _ message: Message) -> EventLoopFuture<Void> {
context.logger.info("Hello \(message.name)")
return context.eventLoop.makeSucceededFuture(())
}
}

struct FailingJob: Job {
struct Failure: Error {}

func dequeue(_ context: QueueContext, _ message: [String: String]) -> EventLoopFuture<Void> {
context.eventLoop.makeFailedFuture(Failure())
}

func error(_ context: QueueContext, _ error: any Error, _ payload: [String: String]) -> EventLoopFuture<Void> {
context.eventLoop.makeFailedFuture(Failure())
}
}

func env(_ name: String) -> String? {
return ProcessInfo.processInfo.environment[name]
}

let isLoggingConfigured: Bool = {
LoggingSystem.bootstrap { label in
var handler = StreamLogHandler.standardOutput(label: label)
handler.logLevel = env("LOG_LEVEL").flatMap { .init(rawValue: $0) } ?? .info
return handler
}
return true
}()

0 comments on commit 225b98c

Please sign in to comment.