From 4e31dfa70e49762c714a709bab7d84bfd93362bf Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Tue, 1 Oct 2024 07:48:27 -0500 Subject: [PATCH] Implement preservable jobs and configurable jobs table name (#11) Preserved jobs, if enabled, are marked with a `completed` state in the database. Turning preservation off if it was previously on does _not_ clear old jobs from the table; it just prevents new ones from being preserved. Configuring the jobs table name requires passing the same name (and optionally, space) to both the queue driver _and_ the migration. This is necessary because there's no way for the migration to see the driver's configuration. Closes #9. Closes #10. --- .github/workflows/test.yml | 9 +- Package.swift | 7 +- Package@swift-5.9.swift | 63 --------- Sources/QueuesFluentDriver/FluentQueue.swift | 41 +++--- .../FluentQueuesDriver.swift | 32 +++-- Sources/QueuesFluentDriver/JobModel.swift | 8 +- .../QueuesFluentDriver/JobModelMigrate.swift | 68 ++++----- .../Queues.Provider+Fluent.swift | 21 ++- .../SQLKit+Convenience.swift | 1 + .../QueuesFluentDriverTests.swift | 129 +++++++++++++++--- 10 files changed, 232 insertions(+), 147 deletions(-) delete mode 100644 Package@swift-5.9.swift diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2c9c6f4..f776667 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -27,15 +27,10 @@ jobs: fail-fast: false matrix: swift-image: - - swift:5.8-jammy - swift:5.9-jammy - swift:5.10-noble - - swiftlang/swift:nightly-6.0-jammy + - swift:6.0-noble - swiftlang/swift:nightly-main-jammy - include: - - sanitize: '--sanitize=thread' - - swift-image: swift:5.8-jammy - sanitize: '' runs-on: ubuntu-latest container: ${{ matrix.swift-image }} services: @@ -50,7 +45,7 @@ jobs: SANITIZE: ${{ matrix.sanitize }} POSTGRES_HOST: psql MYSQL_HOST: mysql - run: SWIFT_DETERMINISTIC_HASHING=1 swift test ${SANITIZE} --enable-code-coverage + run: SWIFT_DETERMINISTIC_HASHING=1 swift test --sanitize=thread --enable-code-coverage - name: Upload coverage data uses: vapor/swift-codecov-action@v0.3 with: diff --git a/Package.swift b/Package.swift index d0017c2..4588aed 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.8 +// swift-tools-version:5.9 import PackageDescription import class Foundation.ProcessInfo @@ -6,6 +6,9 @@ let package = Package( name: "QueuesFluentDriver", platforms: [ .macOS(.v10_15), + .iOS(.v13), + .watchOS(.v6), + .tvOS(.v13), ], products: [ .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), @@ -53,6 +56,8 @@ let package = Package( var swiftSettings: [SwiftSetting] { [ .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ExistentialAny"), .enableUpcomingFeature("ConciseMagicFile"), .enableUpcomingFeature("DisableOutwardActorInference"), + .enableExperimentalFeature("StrictConcurrency=complete"), ] } diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift deleted file mode 100644 index 4588aed..0000000 --- a/Package@swift-5.9.swift +++ /dev/null @@ -1,63 +0,0 @@ -// swift-tools-version:5.9 -import PackageDescription -import class Foundation.ProcessInfo - -let package = Package( - name: "QueuesFluentDriver", - platforms: [ - .macOS(.v10_15), - .iOS(.v13), - .watchOS(.v6), - .tvOS(.v13), - ], - products: [ - .library(name: "QueuesFluentDriver", targets: ["QueuesFluentDriver"]), - ], - dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.100.0"), - .package(url: "https://github.com/vapor/fluent.git", from: "4.10.0"), - .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.48.4"), - .package(url: "https://github.com/vapor/sql-kit.git", from: "3.30.0"), - .package(url: "https://github.com/vapor/queues.git", from: "1.15.0"), - .package(url: "https://github.com/vapor/console-kit.git", from: "4.14.3"), - ] + (ProcessInfo.processInfo.environment["CI"] != nil ? [ - .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.7.1"), - .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.9.1"), - .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.5.0"), - ] : []), - targets: [ - .target( - name: "QueuesFluentDriver", - dependencies: [ - .product(name: "Vapor", package: "vapor"), - .product(name: "Fluent", package: "fluent"), - .product(name: "FluentKit", package: "fluent-kit"), - .product(name: "FluentSQL", package: "fluent-kit"), - .product(name: "SQLKit", package: "sql-kit"), - .product(name: "Queues", package: "queues") - ], - swiftSettings: swiftSettings - ), - .testTarget( - name: "QueuesFluentDriverTests", - dependencies: [ - .product(name: "XCTVapor", package: "vapor"), - .product(name: "ConsoleKitTerminal", package: "console-kit"), - .target(name: "QueuesFluentDriver"), - ] + (ProcessInfo.processInfo.environment["CI"] != nil ? [ - .product(name: "FluentSQLiteDriver", package: "fluent-sqlite-driver"), - .product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"), - .product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"), - ] : []), - swiftSettings: swiftSettings - ), - ] -) - -var swiftSettings: [SwiftSetting] { [ - .enableUpcomingFeature("ForwardTrailingClosures"), - .enableUpcomingFeature("ExistentialAny"), - .enableUpcomingFeature("ConciseMagicFile"), - .enableUpcomingFeature("DisableOutwardActorInference"), - .enableExperimentalFeature("StrictConcurrency=complete"), -] } diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 32cd897..0eec3d0 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -8,15 +8,17 @@ public struct FluentQueue: AsyncQueue, Sendable { // See `Queue.context`. public let context: QueueContext - let sqlDb: any SQLDatabase - + let sqlDB: any SQLDatabase + let preservesCompletedJobs: Bool + let jobsTable: SQLQualifiedTable + let _sqlLockingClause: NIOLockedValueBox<(any SQLExpression)?> = .init(nil) // needs a lock for the queue to be `Sendable` // See `Queue.get(_:)`. public func get(_ id: JobIdentifier) async throws -> JobData { - guard let job = try await self.sqlDb.select() + guard let job = try await self.sqlDB.select() .columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at") - .from(JobModel.schema) + .from(self.jobsTable) .where("id", .equal, id) .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase) else { @@ -28,7 +30,7 @@ public struct FluentQueue: AsyncQueue, Sendable { // See `Queue.set(_:to:)`. public func set(_ id: JobIdentifier, to jobStorage: JobData) async throws { - try await self.sqlDb.insert(into: JobModel.schema) + try await self.sqlDB.insert(into: self.jobsTable) .columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at") .values( .bind(id), @@ -48,14 +50,21 @@ public struct FluentQueue: AsyncQueue, Sendable { // See `Queue.clear(_:)`. public func clear(_ id: JobIdentifier) async throws { - try await self.sqlDb.delete(from: JobModel.schema) - .where("id", .equal, id) - .run() + if self.preservesCompletedJobs { + try await self.sqlDB.update(self.jobsTable) + .set("state", to: .literal(StoredJobState.completed)) + .where("id", .equal, id) + .run() + } else { + try await self.sqlDB.delete(from: self.jobsTable) + .where("id", .equal, id) + .run() + } } // See `Queue.push(_:)`. public func push(_ id: JobIdentifier) async throws { - try await self.sqlDb.update(JobModel.schema) + try await self.sqlDB.update(self.jobsTable) .set("state", to: .literal(StoredJobState.pending)) .set("updated_at", to: .now()) .where("id", .equal, id) @@ -69,9 +78,9 @@ public struct FluentQueue: AsyncQueue, Sendable { // is purely synchronous, and `SQLDatabase.version` is not implemented in MySQLKit at the time // of this writing. if self._sqlLockingClause.withLockedValue({ $0 }) == nil { - switch self.sqlDb.dialect.name { + switch self.sqlDB.dialect.name { case "mysql": - let version = try await self.sqlDb.select() + let version = try await self.sqlDB.select() .column(.function("version"), as: "version") .first(decodingColumn: "version", as: String.self)! // always returns one row // This is a really lazy check and it knows it; we know MySQLNIO doesn't support versions older than 5.x. @@ -87,7 +96,7 @@ public struct FluentQueue: AsyncQueue, Sendable { let select = SQLSubquery.select { $0 .column("id") - .from(JobModel.schema) + .from(self.jobsTable) .where("state", .equal, .literal(StoredJobState.pending)) .where("queue_name", .equal, self.queueName) .where(.dateValue(.function("coalesce", .column("delay_until"), SQLNow())), .lessThanOrEqual, .now()) @@ -97,8 +106,8 @@ public struct FluentQueue: AsyncQueue, Sendable { .lockingClause(self._sqlLockingClause.withLockedValue { $0! }) // we've always set it by the time we get here } - if self.sqlDb.dialect.supportsReturning { - return try await self.sqlDb.update(JobModel.schema) + if self.sqlDB.dialect.supportsReturning { + return try await self.sqlDB.update(self.jobsTable) .set("state", to: .literal(StoredJobState.processing)) .set("updated_at", to: .now()) .where("id", .equal, select) @@ -106,7 +115,7 @@ public struct FluentQueue: AsyncQueue, Sendable { .first(decodingColumn: "id", as: String.self) .map(JobIdentifier.init(string:)) } else { - return try await self.sqlDb.transaction { transaction in + return try await self.sqlDB.transaction { transaction in guard let id = try await transaction.raw("\(select)") // using raw() to make sure we run on the transaction connection .first(decodingColumn: "id", as: String.self) else { @@ -114,7 +123,7 @@ public struct FluentQueue: AsyncQueue, Sendable { } try await transaction - .update(JobModel.schema) + .update(self.jobsTable) .set("state", to: .literal(StoredJobState.processing)) .set("updated_at", to: .now()) .where("id", .equal, id) diff --git a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift index f30c95a..7bde913 100644 --- a/Sources/QueuesFluentDriver/FluentQueuesDriver.swift +++ b/Sources/QueuesFluentDriver/FluentQueuesDriver.swift @@ -9,10 +9,21 @@ import struct Queues.JobIdentifier import struct Queues.JobData public struct FluentQueuesDriver: QueuesDriver { - let databaseId: DatabaseID? + let databaseID: DatabaseID? + let preservesCompletedJobs: Bool + let jobsTableName: String + let jobsTableSpace: String? - init(on databaseId: DatabaseID? = nil) { - self.databaseId = databaseId + init( + on databaseID: DatabaseID? = nil, + preserveCompletedJobs: Bool = false, + jobsTableName: String = "_jobs_meta", + jobsTableSpace: String? = nil + ) { + self.databaseID = databaseID + self.preservesCompletedJobs = preserveCompletedJobs + self.jobsTableName = jobsTableName + self.jobsTableSpace = jobsTableSpace } public func makeQueue(with context: QueueContext) -> any Queue { @@ -21,16 +32,21 @@ public struct FluentQueuesDriver: QueuesDriver { /// /// `Fluent.Databases.database(_:logger:on:)` never returns nil; its optionality is an API mistake. /// If a nonexistent `DatabaseID` is requested, it triggers a `fatalError()`. - let baseDb = context + let baseDB = context .application .databases - .database(self.databaseId, logger: context.logger, on: context.eventLoop)! - - guard let sqlDb = baseDb as? any SQLDatabase else { + .database(self.databaseID, logger: context.logger, on: context.eventLoop)! + + guard let sqlDB = baseDB as? any SQLDatabase else { return FailingQueue(failure: QueuesFluentError.unsupportedDatabase, context: context) } - return FluentQueue(context: context, sqlDb: sqlDb) + return FluentQueue( + context: context, + sqlDB: sqlDB, + preservesCompletedJobs: self.preservesCompletedJobs, + jobsTable: .init(self.jobsTableName, space: self.jobsTableSpace) + ) } public func shutdown() {} diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index caca166..9f5186e 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -11,13 +11,15 @@ enum StoredJobState: String, Codable, CaseIterable { /// Job is in progress. case processing + + /// Job is completed. + /// + /// > Note: This state is only used if the driver is configured to preserve completed jobs. + case completed } /// Encapsulates a job's metadata and `JobData`. struct JobModel: Codable, Sendable { - /// The name of the model's table. - static let schema = "_jobs_meta" - /// The job identifier. Corresponds directly to a `JobIdentifier`. let id: String? diff --git a/Sources/QueuesFluentDriver/JobModelMigrate.swift b/Sources/QueuesFluentDriver/JobModelMigrate.swift index 037c406..54f6789 100644 --- a/Sources/QueuesFluentDriver/JobModelMigrate.swift +++ b/Sources/QueuesFluentDriver/JobModelMigrate.swift @@ -1,28 +1,34 @@ -import protocol SQLKit.SQLDatabase -import enum SQLKit.SQLColumnConstraintAlgorithm -import enum SQLKit.SQLDataType -import enum SQLKit.SQLLiteral -import struct SQLKit.SQLRaw +import SQLKit public struct JobModelMigration: AsyncSQLMigration { + private let jobsTableString: String + private let jobsTable: SQLQualifiedTable + /// Public initializer. - public init() {} - + public init( + jobsTableName: String = "_jobs_meta", + jobsTableSpace: String? = nil + ) { + self.jobsTableString = "\(jobsTableSpace.map { "\($0)_" } ?? "")\(jobsTableName)" + self.jobsTable = .init(jobsTableName, space: jobsTableSpace) + } + // See `AsyncSQLMigration.prepare(on:)`. public func prepare(on database: any SQLDatabase) async throws { - let stateEnumType: String - + let stateEnumType: any SQLExpression + switch database.dialect.enumSyntax { case .typeName: - stateEnumType = "\(JobModel.schema)_storedjobstatus" - try await database.create(enum: stateEnumType) - .value("pending") - .value("processing") - .run() + stateEnumType = .identifier("\(self.jobsTableString)_storedjobstatus") + var builder = database.create(enum: stateEnumType) + builder = StoredJobState.allCases.reduce(builder, { $0.value($1.rawValue) }) + try await builder.run() case .inline: - stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')" + // This is technically a misuse of SQLFunction, but it produces the correct syntax + stateEnumType = .function("enum", StoredJobState.allCases.map { .literal($0.rawValue) }) default: - stateEnumType = "varchar(16)" + // This is technically a misuse of SQLFunction, but it produces the correct syntax + stateEnumType = .function("varchar", .literal(16)) } /// This whole pile of nonsense is only here because of @@ -39,20 +45,20 @@ public struct JobModelMigration: AsyncSQLMigration { autoTimestampConstraints = [] } - try await database.create(table: JobModel.schema) - .column("id", type: .text, .primaryKey(autoIncrement: false)) - .column("queue_name", type: .text, .notNull) - .column("job_name", type: .text, .notNull) - .column("queued_at", type: manualTimestampType, .notNull) - .column("delay_until", type: manualTimestampType, .default(SQLLiteral.null)) - .column("state", type: .custom(SQLRaw(stateEnumType)), .notNull) - .column("max_retry_count", type: .int, .notNull) - .column("attempts", type: .int, .notNull) - .column("payload", type: .blob, .notNull) - .column("updated_at", type: .timestamp, autoTimestampConstraints) + try await database.create(table: self.jobsTable) + .column("id", type: .text, .primaryKey(autoIncrement: false)) + .column("queue_name", type: .text, .notNull) + .column("job_name", type: .text, .notNull) + .column("queued_at", type: manualTimestampType, .notNull) + .column("delay_until", type: manualTimestampType, .default(SQLLiteral.null)) + .column("state", type: .custom(stateEnumType), .notNull) + .column("max_retry_count", type: .int, .notNull) + .column("attempts", type: .int, .notNull) + .column("payload", type: .blob, .notNull) + .column("updated_at", type: .timestamp, autoTimestampConstraints) .run() - try await database.create(index: "i_\(JobModel.schema)_state_queue_delayUntil") - .on(JobModel.schema) + try await database.create(index: "i_\(self.jobsTableString)_state_queue_delayUntil") + .on(self.jobsTable) .column("state") .column("queue_name") .column("delay_until") @@ -61,10 +67,10 @@ public struct JobModelMigration: AsyncSQLMigration { // See `AsyncSQLMigration.revert(on:)`. public func revert(on database: any SQLDatabase) async throws { - try await database.drop(table: JobModel.schema).run() + try await database.drop(table: self.jobsTable).run() switch database.dialect.enumSyntax { case .typeName: - try await database.drop(enum: "\(JobModel.schema)_storedjobstatus").run() + try await database.drop(enum: "\(self.jobsTableString)_storedjobstatus").run() default: break } diff --git a/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift b/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift index 08e0297..35e15f3 100644 --- a/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift +++ b/Sources/QueuesFluentDriver/Queues.Provider+Fluent.swift @@ -18,8 +18,25 @@ extension Application.Queues.Provider { /// - Parameters: /// - databaseId: A Fluent `DatabaseID` configured for a compatible database, or `nil` to use the /// default database. + /// - preservesCompletedJobs: Defaults to `false`. If `true`, completed jobs are marked with a completed + /// state rather than being removed from the database. + /// - jobsTableName: The name of the database table in which jobs data is stored. Defaults to `_jobs_meta`. + /// - jobsTableSpace: The "space" (as defined by Fluent) in which the database table exists. Defaults to `nil`, + /// indicating the default space. Most users will not need this parameter. /// - Returns: An appropriately configured provider for `Application.Queues.use(_:)`. - public static func fluent(_ databaseId: DatabaseID? = nil) -> Self { - .init { $0.queues.use(custom: FluentQueuesDriver(on: databaseId)) } + public static func fluent( + _ databaseID: DatabaseID? = nil, + preservesCompletedJobs: Bool = false, + jobsTableName: String = "_jobs_meta", + jobsTableSpace: String? = nil + ) -> Self { + .init { + $0.queues.use(custom: FluentQueuesDriver( + on: databaseID, + preserveCompletedJobs: preservesCompletedJobs, + jobsTableName: jobsTableName, + jobsTableSpace: jobsTableSpace + )) + } } } diff --git a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift index 6e9c719..75cba81 100644 --- a/Sources/QueuesFluentDriver/SQLKit+Convenience.swift +++ b/Sources/QueuesFluentDriver/SQLKit+Convenience.swift @@ -75,6 +75,7 @@ extension SQLExpression { static func now() -> Self where Self == SQLDateValue { .now() } static func function(_ name: String, _ args: any SQLExpression...) -> Self where Self == SQLFunction { .init(name, args: args) } + static func function(_ name: String, _ args: [any SQLExpression]) -> Self where Self == SQLFunction { .init(name, args: args) } static func group(_ expr: some SQLExpression) -> Self where Self == SQLGroupExpression { .init(expr) } diff --git a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift index f12e4d5..dc9f23d 100644 --- a/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift +++ b/Tests/QueuesFluentDriverTests/QueuesFluentDriverTests.swift @@ -17,11 +17,15 @@ import FluentMySQLDriver #endif import NIOSSL +extension DatabaseID { + static var mysql2: Self { .init(string: "mysql2") } +} + final class QueuesFluentDriverTests: XCTestCase { var app: Application! var dbid: DatabaseID! - private func useDbs(_ app: Application) throws { + private func useDBs(_ app: Application) throws { #if canImport(FluentSQLiteDriver) app.databases.use(.sqlite(.memory), as: .sqlite) #endif @@ -48,18 +52,28 @@ final class QueuesFluentDriverTests: XCTestCase { database: env("MYSQL_NAME") ?? env("DATABASE_NAME") ?? "test_database", tlsConfiguration: config )), as: .mysql) + if env("MYSQL_B") != nil { + app.databases.use(DatabaseConfigurationFactory.mysql(configuration: .init( + hostname: env("MYSQL_HOST_B") ?? env("DATABASE_HOST") ?? "localhost", + port: (env("MYSQL_PORT_B") ?? env("DATABASE_PORT")).flatMap(Int.init(_:)) ?? MySQLConfiguration.ianaPortNumber, + username: env("MYSQL_USERNAME_B") ?? env("DATABASE_USERNAME") ?? "test_username", + password: env("MYSQL_PASSWORD_B") ?? env("DATABASE_PASSWORD") ?? "test_password", + database: env("MYSQL_NAME_B") ?? env("DATABASE_NAME") ?? "test_database", + tlsConfiguration: config + )), as: .mysql2) + } #endif } - private func withEachDatabase(_ closure: () async throws -> Void) async throws { - func run(_ dbid: DatabaseID) async throws { + private func withEachDatabase(preserveJobs: Bool = false, tableName: String = "_jobs_meta", _ closure: () async throws -> Void) async throws { + func run(_ dbid: DatabaseID, defaultSpace: String? = nil) async throws { self.dbid = dbid self.app = try await Application.make(.testing) self.app.logger[metadataKey: "test-dbid"] = "\(dbid.string)" - try self.useDbs(self.app) - self.app.migrations.add(JobModelMigration(), to: self.dbid) - self.app.queues.use(.fluent(self.dbid)) + try self.useDBs(self.app) + self.app.migrations.add(JobModelMigration(jobsTableName: tableName, jobsTableSpace: defaultSpace), to: self.dbid) + self.app.queues.use(.fluent(self.dbid, preservesCompletedJobs: preserveJobs, jobsTableName: tableName, jobsTableSpace: defaultSpace)) try await self.app.autoMigrate() @@ -67,6 +81,7 @@ final class QueuesFluentDriverTests: XCTestCase { catch { try? await self.app.autoRevert() try? await self.app.asyncShutdown() + self.app = nil throw error } @@ -80,11 +95,14 @@ final class QueuesFluentDriverTests: XCTestCase { #endif #if canImport(FluentPostgresDriver) - try await run(.psql) + try await run(.psql, defaultSpace: "public") #endif #if canImport(FluentMySQLDriver) - try await run(.mysql) + try await run(.mysql, defaultSpace: env("MYSQL_NAME") ?? env("DATABASE_NAME") ?? "test_database") + if env("MYSQL_B") != nil { + try await run(.mysql2, defaultSpace: env("MYSQL_NAME_B") ?? env("DATABASE_NAME") ?? "test_database") + } #endif } @@ -107,11 +125,11 @@ final class QueuesFluentDriverTests: XCTestCase { } } func testFailedJobLoss() async throws { try await self.withEachDatabase { - let jobId = JobIdentifier() + let jobID = JobIdentifier() self.app.queues.add(FailingJob()) self.app.get("test") { req in - try await req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobId) + try await req.queue.dispatch(FailingJob.self, ["foo": "bar"], id: jobID) return HTTPStatus.ok } try await self.app.testable().test(.GET, "test") { res async in @@ -123,18 +141,18 @@ final class QueuesFluentDriverTests: XCTestCase { await XCTAssertNotNilAsync( try await (self.app.db(self.dbid) as! any SQLDatabase).select() .columns("*") - .from(JobModel.schema) - .where("id", .equal, jobId) + .from("_jobs_meta") + .where("id", .equal, jobID) .first() ) } } func testDelayedJobIsRemovedFromProcessingQueue() async throws { try await self.withEachDatabase { - let jobId = JobIdentifier() + let jobID = JobIdentifier() self.app.queues.add(DelayedJob()) self.app.get("delay-job") { req in - try await req.queue.dispatch(DelayedJob.self, .init(name: "vapor"), delayUntil: .init(timeIntervalSinceNow: 3600.0), id: jobId) + try await req.queue.dispatch(DelayedJob.self, .init(name: "vapor"), delayUntil: .init(timeIntervalSinceNow: 3600.0), id: jobID) return HTTPStatus.ok } try await self.app.testable().test(.GET, "delay-job") { res async in @@ -144,13 +162,63 @@ final class QueuesFluentDriverTests: XCTestCase { await XCTAssertEqualAsync( try await (self.app.db(self.dbid) as! any SQLDatabase).select() .columns("*") - .from(JobModel.schema) - .where("id", .equal, jobId) + .from("_jobs_meta") + .where("id", .equal, jobID) .first(decoding: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)?.state, .pending ) } } + func testCustomTableNameAndJobDeletionByDefault() async throws { try await self.withEachDatabase(tableName: "_jobs_custom") { + let email = Email() + + self.app.queues.add(email) + self.app.get("send-email") { req in + try await req.queue.dispatch(Email.self, .init(to: "gwynne@vapor.codes")) + return HTTPStatus.ok + } + + try await self.app.testable().test(.GET, "send-email") { res async in + XCTAssertEqual(res.status, .ok) + } + + await XCTAssertEqualAsync(await email.sent, []) + try await self.app.queues.queue.worker.run().get() + await XCTAssertEqualAsync(await email.sent, [.init(to: "gwynne@vapor.codes")]) + await XCTAssertEqualAsync( + try await (self.app.db(self.dbid) as! any SQLDatabase).select() + .column(SQLFunction("count", args: SQLIdentifier("id")), as: "count") + .from("_jobs_custom") + .first(decodingColumn: "count", as: Int.self), + 0 + ) + } } + + func testJobPreservation() async throws { try await self.withEachDatabase(preserveJobs: true) { + let email = Email() + + self.app.queues.add(email) + self.app.get("send-email") { req in + try await req.queue.dispatch(Email.self, .init(to: "gwynne@vapor.codes")) + return HTTPStatus.ok + } + + try await self.app.testable().test(.GET, "send-email") { res async in + XCTAssertEqual(res.status, .ok) + } + + await XCTAssertEqualAsync(await email.sent, []) + try await self.app.queues.queue.worker.run().get() + await XCTAssertEqualAsync(await email.sent, [.init(to: "gwynne@vapor.codes")]) + await XCTAssertEqualAsync( + try await (self.app.db(self.dbid) as! any SQLDatabase).select() + .column(SQLFunction("count", args: SQLIdentifier("id")), as: "count") + .from("_jobs_meta") + .first(decodingColumn: "count", as: Int.self), + 1 + ) + } } + func testCoverageForFailingQueue() async throws { self.app = try await Application.make(.testing) let queue = FailingQueue( @@ -166,6 +234,22 @@ final class QueuesFluentDriverTests: XCTestCase { self.app = nil } + func testCoverageForJobModel() { + let date = Date() + let model = JobModel(id: .init(string: "test"), queue: .init(string: "test"), jobData: .init(payload: [], maxRetryCount: 0, jobName: "", delayUntil: nil, queuedAt: date)) + + XCTAssertEqual(model.id, "test") + XCTAssertEqual(model.queueName, "test") + XCTAssertEqual(model.jobName, "") + XCTAssertEqual(model.queuedAt, date) + XCTAssertNil(model.delayUntil) + XCTAssertEqual(model.state, .pending) + XCTAssertEqual(model.maxRetryCount, 0) + XCTAssertEqual(model.attempts, 0) + XCTAssertEqual(model.payload, Data()) + XCTAssertNotNil(model.updatedAt) + } + func testSQLKitUtilities() async throws { try await self.withEachDatabase { func serialized(_ expr: some SQLExpression) -> String { var serializer = SQLSerializer(database: self.app.db(self.dbid) as! any SQLDatabase) @@ -189,6 +273,7 @@ final class QueuesFluentDriverTests: XCTestCase { XCTAssertEqual(serialized(.literal(Bool?(true))), "\(serialized(SQLLiteral.boolean(true)))") XCTAssertEqual(serialized(.literal(Bool?.none)), "NULL") XCTAssertEqual(serialized(.null()), "NULL") + XCTAssertEqual(serialized(SQLLockingClauseWithSkipLocked.shareSkippingLocked), serialized(SQLLockingClause.share) != "" ? "\(serialized(SQLLockingClause.share)) SKIP LOCKED" : "") await XCTAssertNotNilAsync(try await (self.app.db(self.dbid) as! any SQLDatabase).transaction { $0.eventLoop.makeSucceededFuture(()) }.get()) } } @@ -263,6 +348,18 @@ func XCTAssertThrowsErrorAsync( } } +func XCTAssertNoThrowAsync( + _ expression: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + _ = try await expression() + } catch { + XCTAssertNoThrow(try { throw error }(), message(), file: file, line: line) + } +} + func XCTAssertNotNilAsync( _ expression: @autoclosure () async throws -> Any?, _ message: @autoclosure () -> String = "",