Skip to content

Commit

Permalink
Unbreak Postgres and MySQL with their respective enum nonsense
Browse files Browse the repository at this point in the history
  • Loading branch information
gwynne committed May 6, 2024
1 parent 5b76c5f commit 9a78b80
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
37 changes: 25 additions & 12 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@preconcurrency import Queues
@preconcurrency import SQLKit
import NIOConcurrencyHelpers
import struct Foundation.Data

/// An implementation of `Queue` which stores job data and metadata in a Fluent database.
public struct FluentQueue: Queue, Sendable {
Expand All @@ -14,24 +15,36 @@ public struct FluentQueue: Queue, Sendable {
// See `Queue.get(_:)`.
public func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
self.sqlDb.select()
.columns("payload", "max_retry_count", "job_name", "delay_until", "queued_at", "attempts")
.columns("payload", "max_retry_count", "queue_name", "state", "job_name", "delay_until", "queued_at", "attempts", "updated_at")
.from(JobModel.schema)
.where("id", .equal, id.string)
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing {
try $0.decode(model: JobData.self, keyDecodingStrategy: .convertFromSnakeCase)
try $0.decode(model: JobModel.self, keyDecodingStrategy: .convertFromSnakeCase)
}.map {
.init(payload: .init($0.payload), maxRetryCount: $0.maxRetryCount, jobName: $0.jobName, delayUntil: $0.delayUntil, queuedAt: $0.queuedAt)
}
}

// See `Queue.set(_:to:)`.
public func set(_ id: JobIdentifier, to jobStorage: JobData) -> EventLoopFuture<Void> {
self.sqlDb.eventLoop.makeFutureWithTask {
try await self.sqlDb.insert(into: JobModel.schema)
.model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
.onConflict { try $0
.set(excludedContentOf: JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase)
}
.columns("id", "queue_name", "job_name", "queued_at", "delay_until", "state", "max_retry_count", "attempts", "payload", "updated_at")
.values(
SQLBind(id.string),
SQLBind(self.queueName.string),
SQLBind(jobStorage.jobName),
SQLBind(jobStorage.queuedAt),
SQLBind(jobStorage.delayUntil),
SQLLiteral.string(StoredJobState.pending.rawValue),
SQLBind(jobStorage.maxRetryCount),
SQLBind(jobStorage.attempts),
SQLBind(Data(jobStorage.payload)),
.now()
)
// .model(JobModel(id: id, queue: self.queueName, jobData: jobStorage), keyEncodingStrategy: .convertToSnakeCase) // because enums!
.run()
}
}
Expand All @@ -41,7 +54,7 @@ public struct FluentQueue: Queue, Sendable {
self.get(id).flatMap { _ in
self.sqlDb.delete(from: JobModel.schema)
.where("id", .equal, id.string)
.where("state", .notEqual, StoredJobState.completed)
.where("state", .notEqual, SQLLiteral.string(StoredJobState.completed.rawValue))
.run()
}
}
Expand All @@ -50,7 +63,7 @@ public struct FluentQueue: Queue, Sendable {
public func push(_ id: JobIdentifier) -> EventLoopFuture<Void> {
self.sqlDb
.update(JobModel.schema)
.set("state", to: StoredJobState.pending)
.set("state", to: SQLLiteral.string(StoredJobState.pending.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id.string)
.run()
Expand Down Expand Up @@ -84,7 +97,7 @@ public struct FluentQueue: Queue, Sendable {
.select()
.column("id")
.from(JobModel.schema)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.where("queue_name", .equal, self.queueName.string)
.where(.dateValue(.function("coalesce", SQLColumn("delay_until"), SQLNow())), .lessThanOrEqual, .now())
.orderBy("delay_until")
Expand All @@ -93,7 +106,7 @@ public struct FluentQueue: Queue, Sendable {

if self.sqlDb.dialect.supportsReturning {
return try await self.sqlDb.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, .group(select.query))
.returning("id")
Expand All @@ -109,10 +122,10 @@ public struct FluentQueue: Queue, Sendable {

try await transaction
.update(JobModel.schema)
.set("state", to: StoredJobState.processing)
.set("state", to: SQLLiteral.string(StoredJobState.processing.rawValue))
.set("updated_at", to: .now())
.where("id", .equal, id)
.where("state", .equal, StoredJobState.pending)
.where("state", .equal, SQLLiteral.string(StoredJobState.pending.rawValue))
.run()

return JobIdentifier(string: id)
Expand Down
5 changes: 3 additions & 2 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import struct Foundation.Date
import struct Foundation.Data
import struct Queues.JobData
import struct Queues.JobIdentifier
import struct Queues.QueueName
Expand Down Expand Up @@ -45,7 +46,7 @@ struct JobModel: Codable, Sendable {
let attempts: Int

/// The job's payload.
let payload: [UInt8]
let payload: Data

/// The standard automatic update tracking timestamp.
let updatedAt: Date
Expand All @@ -59,7 +60,7 @@ struct JobModel: Codable, Sendable {
self.state = .pending
self.maxRetryCount = jobData.maxRetryCount
self.attempts = jobData.attempts ?? 0
self.payload = jobData.payload
self.payload = Data(jobData.payload)
self.updatedAt = .init()
}
}
2 changes: 1 addition & 1 deletion Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public struct JobModelMigration: AsyncSQLMigration {
.value("completed")
.run()
case .inline:
stateEnumType = "enum(\(StoredJobState.allCases.map(\.rawValue).joined(separator: ",")))"
stateEnumType = "enum('\(StoredJobState.allCases.map(\.rawValue).joined(separator: "','"))')"
default:
stateEnumType = "varchar(16)"
}
Expand Down
2 changes: 2 additions & 0 deletions Sources/QueuesFluentDriver/SQLKit+Convenience.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ struct SQLNow: SQLExpression {
switch serializer.dialect.name {
case "sqlite": // For SQLite, write out the literal string 'now' (see below)
SQLLiteral.string("now").serialize(to: &serializer)
case "postgresql": // For Postgres, "current_timestamp" is a keyword, not a function, so use "now()" instead.
SQLFunction("now").serialize(to: &serializer)
default: // Everywhere else, just call the SQL standard function.
SQLFunction("current_timestamp").serialize(to: &serializer)
}
Expand Down

0 comments on commit 9a78b80

Please sign in to comment.