diff --git a/ghost/core/core/server/data/migrations/versions/5.100/2024-10-31-15-27-42-add-jobs-queue-columns.js b/ghost/core/core/server/data/migrations/versions/5.100/2024-10-31-15-27-42-add-jobs-queue-columns.js new file mode 100644 index 00000000000..91c075c20b7 --- /dev/null +++ b/ghost/core/core/server/data/migrations/versions/5.100/2024-10-31-15-27-42-add-jobs-queue-columns.js @@ -0,0 +1,14 @@ +const {combineNonTransactionalMigrations, createAddColumnMigration} = require('../../utils'); + +module.exports = combineNonTransactionalMigrations( + createAddColumnMigration('jobs', 'metadata', { + type: 'string', + maxlength: 2000, + nullable: true + }), + createAddColumnMigration('jobs', 'queue_entry', { + type: 'integer', + nullable: true, + unsigned: true + }) +); diff --git a/ghost/core/core/server/data/schema/schema.js b/ghost/core/core/server/data/schema/schema.js index 17c536b13ae..45e686012a4 100644 --- a/ghost/core/core/server/data/schema/schema.js +++ b/ghost/core/core/server/data/schema/schema.js @@ -985,7 +985,9 @@ module.exports = { started_at: {type: 'dateTime', nullable: true}, finished_at: {type: 'dateTime', nullable: true}, created_at: {type: 'dateTime', nullable: false}, - updated_at: {type: 'dateTime', nullable: true} + updated_at: {type: 'dateTime', nullable: true}, + metadata: {type: 'string', maxlength: 2000, nullable: true}, + queue_entry: {type: 'integer', nullable: true, unsigned: true} }, redirects: { id: {type: 'string', maxlength: 24, nullable: false, primary: true}, diff --git a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js index 6329d8c483d..ac2ef2877bc 100644 --- a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js +++ b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js @@ -1,4 +1,6 @@ const logging = require('@tryghost/logging'); +const JobManager = require('../../services/jobs'); +const path = require('path'); class EmailAnalyticsServiceWrapper { init() { @@ -11,7 +13,7 @@ class EmailAnalyticsServiceWrapper { const MailgunProvider = require('@tryghost/email-analytics-provider-mailgun'); const {EmailRecipientFailure, EmailSpamComplaintEvent, Email} = require('../../models'); const StartEmailAnalyticsJobEvent = require('./events/StartEmailAnalyticsJobEvent'); - + const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events'); const domainEvents = require('@tryghost/domain-events'); const config = require('../../../shared/config'); const settings = require('../../../shared/settings-cache'); @@ -47,7 +49,8 @@ class EmailAnalyticsServiceWrapper { providers: [ new MailgunProvider({config, settings}) ], - queries + queries, + domainEvents }); // We currently cannot trigger a non-offloaded job from the job manager @@ -55,6 +58,19 @@ class EmailAnalyticsServiceWrapper { domainEvents.subscribe(StartEmailAnalyticsJobEvent, async () => { await this.startFetch(); }); + + domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => { + const memberId = event.data.memberId; + await JobManager.addQueuedJob({ + name: `update-member-email-analytics-${memberId}`, + metadata: { + job: path.resolve(__dirname, 'jobs/update-member-email-analytics'), + data: { + memberId + } + } + }); + }); } async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) { diff --git a/ghost/core/core/server/services/email-analytics/jobs/update-member-email-analytics/index.js b/ghost/core/core/server/services/email-analytics/jobs/update-member-email-analytics/index.js new file mode 100644 index 00000000000..843919cc1ca --- /dev/null +++ b/ghost/core/core/server/services/email-analytics/jobs/update-member-email-analytics/index.js @@ -0,0 +1,13 @@ +const queries = require('../../lib/queries'); + +/** + * Updates email analytics for a specific member + * + * @param {Object} options - The options object + * @param {string} options.memberId - The ID of the member to update analytics for + * @returns {Promise} The result of the aggregation query (1/0) + */ +module.exports = async function updateMemberEmailAnalytics({memberId}) { + const result = await queries.aggregateMemberStats(memberId); + return result; +}; \ No newline at end of file diff --git a/ghost/core/core/server/services/jobs/job-service.js b/ghost/core/core/server/services/jobs/job-service.js index 3b8298e73c9..f96684feb95 100644 --- a/ghost/core/core/server/services/jobs/job-service.js +++ b/ghost/core/core/server/services/jobs/job-service.js @@ -8,6 +8,7 @@ const logging = require('@tryghost/logging'); const models = require('../../models'); const sentry = require('../../../shared/sentry'); const domainEvents = require('@tryghost/domain-events'); +const config = require('../../../shared/config'); const errorHandler = (error, workerMeta) => { logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); @@ -24,7 +25,7 @@ const workerMessageHandler = ({name, message}) => { const initTestMode = () => { // Output job queue length every 5 seconds setInterval(() => { - logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`); + logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`); const runningScheduledjobs = Object.keys(jobManager.bree.workers); if (Object.keys(jobManager.bree.workers).length) { @@ -42,7 +43,7 @@ const initTestMode = () => { }, 5000); }; -const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents}); +const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config}); module.exports = jobManager; module.exports.initTestMode = initTestMode; diff --git a/ghost/core/core/server/services/mentions-jobs/job-service.js b/ghost/core/core/server/services/mentions-jobs/job-service.js index 3b8298e73c9..1edd38ba0d6 100644 --- a/ghost/core/core/server/services/mentions-jobs/job-service.js +++ b/ghost/core/core/server/services/mentions-jobs/job-service.js @@ -24,7 +24,7 @@ const workerMessageHandler = ({name, message}) => { const initTestMode = () => { // Output job queue length every 5 seconds setInterval(() => { - logging.warn(`${jobManager.queue.length()} jobs in the queue. Idle: ${jobManager.queue.idle()}`); + logging.warn(`${jobManager.inlineQueue.length()} jobs in the queue. Idle: ${jobManager.inlineQueue.idle()}`); const runningScheduledjobs = Object.keys(jobManager.bree.workers); if (Object.keys(jobManager.bree.workers).length) { @@ -42,7 +42,7 @@ const initTestMode = () => { }, 5000); }; -const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents}); +const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, isDuplicate: true}); module.exports = jobManager; module.exports.initTestMode = initTestMode; diff --git a/ghost/core/test/integration/jobs/job-queue.test.js b/ghost/core/test/integration/jobs/job-queue.test.js new file mode 100644 index 00000000000..e2187a2053a --- /dev/null +++ b/ghost/core/test/integration/jobs/job-queue.test.js @@ -0,0 +1,87 @@ +const assert = require('assert/strict'); +const path = require('path'); +const configUtils = require('../../utils/configUtils'); +const models = require('../../../core/server/models'); +const testUtils = require('../../utils/'); + +// Helper function to wait for job completion +async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + const intervalId = setInterval(async () => { + if (Date.now() - startTime >= maxWaitTimeMs) { + clearInterval(intervalId); + reject(new Error(`Job ${jobName} did not complete within ${maxWaitTimeMs}ms`)); + } + const job = await models.Job.findOne({name: jobName}); + if (!job) { + clearInterval(intervalId); + resolve(); + } + }, checkIntervalMs); + }); +} + +describe('Job Queue', function () { + let jobService; + before(testUtils.setup('default')); // this generates the tables in the db + afterEach(async function () { + await configUtils.restore(); + }); + + describe('enabled by config', function () { + beforeEach(async function () { + configUtils.set('services:jobs:queue:enabled', true); + jobService = require('../../../core/server/services/jobs/job-service'); + }); + + it('should add and execute a job in the queue', async function () { + this.timeout(10000); + const job = { + name: `add-random-numbers-${Date.now()}`, + metadata: { + job: path.resolve(__dirname, './test-job.js'), + data: {} + } + }; + + // Add the job to the queue + const result = await jobService.addQueuedJob(job); + assert.ok(result); + + // Wait for the job to complete + await waitForJobCompletion(job.name, 8000); // Increase wait time + + // Check job status + const jobEntry = await models.Job.findOne({name: job.name}); + + // Verify that the job no longer exists in the queue + assert.equal(jobEntry, null); + }); + }); + + describe('not enabled', function () { + beforeEach(async function () { + configUtils.set('services:jobs:queue:enabled', false); + jobService = require('../../../core/server/services/jobs/job-service'); + }); + + it('should not add a job to the queue when disabled', async function () { + const job = { + name: `add-random-numbers-${Date.now()}`, + metadata: { + job: path.resolve(__dirname, './test-job.js'), + data: {} + } + }; + + // Attempt to add the job to the queue + const result = await jobService.addQueuedJob(job); + assert.equal(result, undefined); + + // Verify that the job doesn't exist in the queue + const jobEntry = await models.Job.findOne({name: job.name}); + assert.equal(jobEntry, null); + }); + }); +}); \ No newline at end of file diff --git a/ghost/core/test/integration/jobs/test-job.js b/ghost/core/test/integration/jobs/test-job.js new file mode 100644 index 00000000000..c2d09320e8c --- /dev/null +++ b/ghost/core/test/integration/jobs/test-job.js @@ -0,0 +1,7 @@ +module.exports = function testJob() { + const num1 = Math.floor(Math.random() * 100); + const num2 = Math.floor(Math.random() * 100); + const result = num1 + num2; + + return result; +}; \ No newline at end of file diff --git a/ghost/core/test/e2e-server/jobs/update-check.test.js b/ghost/core/test/integration/jobs/update-check.test.js similarity index 95% rename from ghost/core/test/e2e-server/jobs/update-check.test.js rename to ghost/core/test/integration/jobs/update-check.test.js index e1bb4161b93..6b02af48fd9 100644 --- a/ghost/core/test/e2e-server/jobs/update-check.test.js +++ b/ghost/core/test/integration/jobs/update-check.test.js @@ -1,11 +1,7 @@ const assert = require('assert/strict'); const http = require('http'); const path = require('path'); - -const models = require('../../../core/server/models'); - -models.init(); - +const testUtils = require('../../utils'); const jobService = require('../../../core/server/services/jobs/job-service'); const JOB_NAME = 'update-check'; @@ -14,6 +10,8 @@ const JOB_PATH = path.resolve(__dirname, '../../../core/server/run-update-check. describe('Run Update Check', function () { let mockUpdateServer; + before(testUtils.setup('default')); + afterEach(function () { if (mockUpdateServer) { mockUpdateServer.close(); diff --git a/ghost/core/test/unit/server/data/schema/integrity.test.js b/ghost/core/test/unit/server/data/schema/integrity.test.js index bcbc1252226..0fca78eb751 100644 --- a/ghost/core/test/unit/server/data/schema/integrity.test.js +++ b/ghost/core/test/unit/server/data/schema/integrity.test.js @@ -35,7 +35,7 @@ const validateRouteSettings = require('../../../../../core/server/services/route */ describe('DB version integrity', function () { // Only these variables should need updating - const currentSchemaHash = 'a4f016480ff73c6f52ee4c86482b45a7'; + const currentSchemaHash = '1110f25f639c22135b9845c72f0be7ef'; const currentFixturesHash = '475f488105c390bb0018db90dce845f1'; const currentSettingsHash = '47a75e8898fab270174a0c905cb3e914'; const currentRoutesHash = '3d180d52c663d173a6be791ef411ed01'; diff --git a/ghost/core/test/utils/fixture-utils.js b/ghost/core/test/utils/fixture-utils.js index 432bd80c8a8..9f4e3f5b560 100644 --- a/ghost/core/test/utils/fixture-utils.js +++ b/ghost/core/test/utils/fixture-utils.js @@ -11,7 +11,6 @@ const knexMigrator = new KnexMigrator(); // Ghost Internals const models = require('../../core/server/models'); const {fixtureManager} = require('../../core/server/data/schema/fixtures'); -const emailAnalyticsService = require('../../core/server/services/email-analytics'); const permissions = require('../../core/server/services/permissions'); const settingsService = require('../../core/server/services/settings/settings-service'); const labsService = require('../../core/shared/labs'); @@ -630,6 +629,9 @@ const fixtures = { }, insertEmailsAndRecipients: function insertEmailsAndRecipients(withFailed = false) { + // NOTE: This require results in the jobs service being loaded prematurely, which breaks any tests relevant to it. + // This MUST be done in here and not at the top of the file to prevent that from happening as test setup is being performed. + const emailAnalyticsService = require('../../core/server/services/email-analytics'); return sequence(_.cloneDeep(DataGenerator.forKnex.emails).map(email => () => { return models.Email.add(email, context.internal); })).then(function () { diff --git a/ghost/core/test/utils/index.js b/ghost/core/test/utils/index.js index b43d59bc6fd..f8d810e153a 100644 --- a/ghost/core/test/utils/index.js +++ b/ghost/core/test/utils/index.js @@ -8,6 +8,7 @@ const _ = require('lodash'); // Ghost Internals const models = require('../../core/server/models'); +models.init(); // Other Test Utilities const e2eUtils = require('./e2e-utils'); diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index 20a69f19a72..90fa93f7977 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -1,6 +1,7 @@ const EventProcessingResult = require('./EventProcessingResult'); const logging = require('@tryghost/logging'); const errors = require('@tryghost/errors'); +const {MemberEmailAnalyticsUpdateEvent} = require('@tryghost/member-events'); /** * @typedef {import('@tryghost/email-service').EmailEventProcessor} EmailEventProcessor @@ -73,13 +74,15 @@ module.exports = class EmailAnalyticsService { * @param {object} dependencies.queries * @param {EmailEventProcessor} dependencies.eventProcessor * @param {object} dependencies.providers + * @param {import('@tryghost/domain-events')} dependencies.domainEvents */ - constructor({config, settings, queries, eventProcessor, providers}) { + constructor({config, settings, queries, eventProcessor, providers, domainEvents}) { this.config = config; this.settings = settings; this.queries = queries; this.eventProcessor = eventProcessor; this.providers = providers; + this.domainEvents = domainEvents; } getStatus() { @@ -511,7 +514,12 @@ module.exports = class EmailAnalyticsService { startTime = Date.now(); logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`); for (const memberId of memberIds) { - await this.aggregateMemberStats(memberId); + if (this.config?.get('services:jobs:queue:enabled')) { + // With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :)) + await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId})); + } else { + await this.aggregateMemberStats(memberId); + } } endTime = Date.now() - startTime; logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members took ${endTime}ms`); diff --git a/ghost/job-manager/lib/JobManager.js b/ghost/job-manager/lib/JobManager.js index abbeb1d8e06..0572fa1e224 100644 --- a/ghost/job-manager/lib/JobManager.js +++ b/ghost/job-manager/lib/JobManager.js @@ -10,6 +10,7 @@ const logging = require('@tryghost/logging'); const isCronExpression = require('./is-cron-expression'); const assembleBreeJob = require('./assemble-bree-job'); const JobsRepository = require('./JobsRepository'); +const JobQueueManager = require('./JobQueueManager'); const worker = async (task, callback) => { try { @@ -27,9 +28,20 @@ const ALL_STATUSES = { queued: 'queued' }; +/** + * @typedef {Object} ScheduledJob + * @property {Function | string} job - Function or path to a module defining a job + * @property {string} [name] - Unique job name, if not provided takes function name or job script filename + * @property {string | Date} [at] - Date, cron or human readable schedule format + * @property {Object} [data] - Data to be passed into the job + * @property {boolean} [offloaded=true] - If true, creates an "offloaded" job running in a worker thread. If false, runs an "inline" job on the same event loop + */ class JobManager { #domainEvents; #completionPromises = new Map(); + #jobQueueManager = null; + #config; + #JobModel; /** * @param {Object} options @@ -37,12 +49,17 @@ class JobManager { * @param {Function} [options.workerMessageHandler] - custom message handler coming from workers * @param {Object} [options.JobModel] - a model which can persist job data in the storage * @param {Object} [options.domainEvents] - domain events emitter + * @param {Object} [options.config] - config + * @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue + * @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing) */ - constructor({errorHandler, workerMessageHandler, JobModel, domainEvents}) { - this.queue = fastq(this, worker, 3); + constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null}) { + this.inlineQueue = fastq(this, worker, 3); this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this); this.#domainEvents = domainEvents; + this.#config = config; + this.#JobModel = JobModel; const combinedMessageHandler = workerMessageHandler ? ({name, message}) => { @@ -74,6 +91,19 @@ class JobManager { if (JobModel) { this._jobsRepository = new JobsRepository({JobModel}); } + + if (jobQueueManager) { + this.#jobQueueManager = jobQueueManager; + } else if (!isDuplicate) { + this.#initializeJobQueueManager(); + } + } + + #initializeJobQueueManager() { + if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) { + this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config}); + this.#jobQueueManager.init(); + } } inlineJobHandler(jobName) { @@ -94,6 +124,34 @@ class JobManager { }; } + /** + * @typedef {Object} QueuedJob + * @property {string} name - The name or identifier of the job. + * @property {Object} metadata - Metadata associated with the job. + * @property {string} metadata.job - The absolute path to the job to execute. + * @property {Object} metadata.data - The data associated with the job. + */ + + /** + * @method addQueuedJob + * @async + * @description Adds a new job to the job repository, which will be polled and executed by the job queue manager. + * @param {QueuedJob} job - The job to be added to the queue. + * @returns {Promise} The added job model. + */ + async addQueuedJob({name, metadata}) { + // Try to initialize JobQueueManager if it's missing + if (!this.#jobQueueManager) { + this.#initializeJobQueueManager(); + } + + if (this.#config?.get('services:jobs:queue:enabled') === true && this.#jobQueueManager) { + const model = await this.#jobQueueManager.addJob({name, metadata}); + return model; + } + return undefined; + } + async _jobMessageHandler({name, message}) { if (name) { if (message === ALL_STATUSES.started) { @@ -128,7 +186,7 @@ class JobManager { this.#completionPromises.delete(name); } - if (this.queue.length() <= 1) { + if (this.inlineQueue.length() <= 1) { if (this.#completionPromises.has('all')) { for (const listeners of this.#completionPromises.get('all')) { listeners.resolve(); @@ -168,7 +226,7 @@ class JobManager { this.#completionPromises.delete(jobMeta.name); } - if (this.queue.length() <= 1) { + if (this.inlineQueue.length() <= 1) { if (this.#completionPromises.has('all')) { for (const listeners of this.#completionPromises.get('all')) { listeners.reject(error); @@ -181,7 +239,7 @@ class JobManager { /** * By default schedules an "offloaded" job. If `offloaded: true` parameter is set, - * puts an "inline" immediate job into the queue. + * puts an "inline" immediate job into the inlineQueue. * * @param {Object} GhostJob - job options * @prop {Function | String} GhostJob.job - function or path to a module defining a job @@ -192,7 +250,7 @@ class JobManager { */ addJob({name, at, job, data, offloaded = true}) { if (offloaded) { - logging.info('Adding offloaded job to the queue'); + logging.info('Adding offloaded job to the inline job queue'); let schedule; if (!name) { @@ -229,9 +287,9 @@ class JobManager { this.bree.add(breeJob); return this.bree.start(name); } else { - logging.info(`Adding one-off job to queue with current length = ${this.queue.length()} called '${name || 'anonymous'}'`); + logging.info(`Adding one-off job to inlineQueue with current length = ${this.inlineQueue.length()} called '${name || 'anonymous'}'`); - this.queue.push(async () => { + this.inlineQueue.push(async () => { try { // NOTE: setting the status here otherwise it is impossible to // distinguish between states when the job fails immediately @@ -325,13 +383,11 @@ class JobManager { /** * Awaits completion of the offloaded one-off job. * CAUTION: it might take a long time to resolve! - * @param {String} name one-off job name + * @param {string} name one-off job name * @returns resolves with a Job model at current state */ async awaitOneOffCompletion(name) { - const persistedJob = await this._jobsRepository.read({ - name - }); + const persistedJob = await this._jobsRepository.read(name); if (!persistedJob || ![ALL_STATUSES.finished, ALL_STATUSES.failed].includes(persistedJob.get('status'))) { // NOTE: can implement exponential backoff here if that's ever needed @@ -366,7 +422,7 @@ class JobManager { const name = 'all'; return new Promise((resolve, reject) => { - if (this.queue.idle()) { + if (this.inlineQueue.idle()) { resolve(); return; } @@ -379,7 +435,7 @@ class JobManager { } /** - * Removes an "offloaded" job from scheduled jobs queue. + * Removes an "offloaded" job from scheduled jobs inlineQueue. * It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68). * The method will throw an Error if job with provided name does not exist. * @@ -398,15 +454,19 @@ class JobManager { async shutdown(options) { await this.bree.stop(); - if (this.queue.idle()) { + if (this.#jobQueueManager) { + await this.#jobQueueManager.shutdown(); + } + + if (this.inlineQueue.idle()) { return; } - logging.warn('Waiting for busy job queue'); + logging.warn('Waiting for busy job in inline job queue'); - await pWaitFor(() => this.queue.idle() === true, options); + await pWaitFor(() => this.inlineQueue.idle() === true, options); - logging.warn('Job queue finished'); + logging.warn('Inline job queue finished'); } } diff --git a/ghost/job-manager/lib/JobQueueManager.js b/ghost/job-manager/lib/JobQueueManager.js new file mode 100644 index 00000000000..14f3275fa5e --- /dev/null +++ b/ghost/job-manager/lib/JobQueueManager.js @@ -0,0 +1,191 @@ +const workerpool = require('workerpool'); +const path = require('path'); +const JobsRepository = require('./JobsRepository'); +const debug = require('@tryghost/debug')('job-manager:JobQueueManager'); +const logging = require('@tryghost/logging'); + +class JobQueueManager { + constructor({JobModel, config, logger = logging, WorkerPool = workerpool}) { + this.jobsRepository = new JobsRepository({JobModel}); + this.config = this.initializeConfig(config?.get('services:jobs:queue') || {}); + this.logger = this.createLogger(logger, this.config.logLevel); + this.WorkerPool = WorkerPool; + this.pool = this.createWorkerPool(); + this.state = this.initializeState(); + } + + createLogger(logger, logLevel) { + return { + info: (message) => { + if (logLevel === 'info') { + logger.info(`[JobQueueManager] ${message}`); + } + }, + error: (message, error) => { + if (logLevel === 'info' || logLevel === 'error') { + logger.error(`[JobQueueManager] ${message}`, error); + } + } + }; + } + + initializeConfig(queueConfig) { + return { + MIN_POLL_INTERVAL: queueConfig.pollMinInterval || 1000, + MAX_POLL_INTERVAL: queueConfig.pollMaxInterval || 60000, + QUEUE_CAPACITY: queueConfig.queueCapacity || 500, + FETCH_COUNT: queueConfig.fetchCount || 500, + INCREASE_INTERVAL_THRESHOLD: 30000, + maxWorkers: queueConfig.maxWorkers, + reportStats: queueConfig.reportStats, + reportInterval: queueConfig.reportInterval || 60000, + logLevel: queueConfig.logLevel + }; + } + + initializeState() { + return { + currentPollInterval: this.config.MIN_POLL_INTERVAL, + lastFoundJobTime: Date.now(), + isPolling: false, + queuedJobs: new Set() + }; + } + + createWorkerPool() { + const poolOptions = { + workerType: 'thread', + workerTerminateTimeout: 10000 + }; + if (this.config.maxWorkers) { + poolOptions.maxWorkers = this.config.maxWorkers; + } + return this.WorkerPool.pool(path.join(__dirname, '/workers/generic-worker.js'), poolOptions); + } + + async init() { + debug('[JobQueueManager] Initializing job queue'); + this.startQueueProcessor(); + if (this.config.reportStats) { + this.reportStats(); + } + } + + async startQueueProcessor() { + const poll = async () => { + if (this.state.isPolling) { + return; + } + + this.state.isPolling = true; + this.logger.info(`Polling for jobs; current interval: ${Math.floor(this.state.currentPollInterval / 1000)}s`); + + try { + await this.processPendingJobs(); + } catch (error) { + this.logger.error('Error in queue filler:', error); + } finally { + this.state.isPolling = false; + this.queueFillerTimeout = setTimeout(poll, this.state.currentPollInterval); + } + }; + + poll(); // Initial poll + } + + async processPendingJobs() { + const stats = await this.getStats(); + if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) { + const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks); + this.logger.info(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}`); + + const jobs = await this.jobsRepository.getQueuedJobs(entriesToAdd); + this.updatePollInterval(jobs); + await this.processJobs(jobs); + } + } + + updatePollInterval(jobs) { + if (jobs.length > 0) { + this.state.lastFoundJobTime = Date.now(); + this.state.currentPollInterval = this.config.MIN_POLL_INTERVAL; + } else { + const timeSinceLastJob = Date.now() - this.state.lastFoundJobTime; + if (timeSinceLastJob > this.config.INCREASE_INTERVAL_THRESHOLD) { + this.state.currentPollInterval = this.config.MAX_POLL_INTERVAL; + } + } + } + + async processJobs(jobs) { + for (const job of jobs) { + const jobMetadata = JSON.parse(job.get('metadata')); + const jobName = jobMetadata.name; + if (this.state.queuedJobs.has(jobName)) { + continue; + } + await this.executeJob(job, jobName, jobMetadata); + } + } + + async executeJob(job, jobName, jobMetadata) { + this.state.queuedJobs.add(jobName); + try { + await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]); + await this.jobsRepository.delete(job.id); + } catch (error) { + await this.handleJobError(job, jobMetadata, error); + } finally { + this.state.queuedJobs.delete(jobName); + } + } + + async handleJobError(job, jobMetadata, error) { + let errorMessage; + if (error instanceof Error) { + errorMessage = error.message; + } else if (typeof error === 'string') { + errorMessage = error; + } else { + errorMessage = JSON.stringify(error); + } + + const updateData = { + status: 'error', + finished_at: new Date(), + metadata: { + ...jobMetadata, + error: errorMessage, + retries: (jobMetadata.retries || 0) + 1 + } + }; + + await this.jobsRepository.update(job.id, updateData); + } + + async addJob({name, metadata}) { + const model = await this.jobsRepository.addQueuedJob({name, metadata}); + return model; + } + + async getStats() { + return this.pool.stats(); + } + + reportStats() { + setInterval(() => { + this.logger.info('-- job queue stats --'); + this.logger.info(JSON.stringify(this.pool.stats(), null, 2)); + }, this.config.reportInterval); + } + + async shutdown() { + try { + await this.pool.terminate(); + } catch (error) { + this.logger.error('Error terminating worker pool:', error); + } + } +} + +module.exports = JobQueueManager; \ No newline at end of file diff --git a/ghost/job-manager/lib/JobsRepository.js b/ghost/job-manager/lib/JobsRepository.js index 96f0e24743c..e57cecce587 100644 --- a/ghost/job-manager/lib/JobsRepository.js +++ b/ghost/job-manager/lib/JobsRepository.js @@ -1,23 +1,137 @@ +const ObjectID = require('bson-objectid').default; +const logging = require('@tryghost/logging'); + +/** + * @class JobsRepository + * @description Repository class for managing job-related operations. + */ class JobsRepository { + /** + * @constructor + * @param {Object} options - The options object. + * @param {Object} options.JobModel - The Job model for database operations. + */ constructor({JobModel}) { + // NOTE: We ought to clean this up. We want to use bookshelf models for all db operations, + // but we use knex directly in a few places still largely for performance reasons. this._JobModel = JobModel; } + /** + * @method add + * @async + * @description Adds a new job to the database. + * @param {Object} data - The job data to be added. + * @returns {Promise} The added job object. + */ async add(data) { const job = await this._JobModel.add(data); - return job; } + /** + * @method read + * @async + * @description Reads a job from the database by name. + * @param {string} name - The name of the job to read. + * @returns {Promise} The job object if found, null otherwise. + */ async read(name) { const job = await this._JobModel.findOne({name}); - return job; } + /** + * @method update + * @async + * @description Updates a job in the database. + * @param {string} id - The ID of the job to update. + * @param {Object} data - The updated job data. + * @returns {Promise} + */ async update(id, data) { await this._JobModel.edit(data, {id}); } + + /** + * @method getNextQueuedJob + * @async + * @description Retrieves the next queued job from the database. + * @returns {Promise} The next queued job object if found, null otherwise. + */ + async getNextQueuedJob() { + const job = await this._JobModel.findOne({ + queue_entry: 1 + }); + return job; + } + + /** + * @method getQueuedJobs + * @async + * @description Retrieves a list of queued jobs from the database. + * @param {number} [limit=50] - The maximum number of jobs to retrieve. + * @returns {Promise} An array of queued job objects. + */ + async getQueuedJobs(limit = 50) { + const jobs = await this._JobModel.findPage({ + filter: 'queue_entry:1', + limit + }); + return jobs.data; + } + + /** + * @typedef {Object} QueuedJob + * @property {string} name - The name or identifier of the job. + * @property {Object} metadata - Metadata associated with the job. + * @property {string} metadata.job - The absolute path to the job to execute. + * @property {Object} metadata.data - The data associated with the job. + */ + + /** + * @method addQueuedJob + * @async + * @description Adds a new queued job to the database. + * @param {QueuedJob} job - The job to be added to the queue. + * @returns {Promise} The added job object. + */ + async addQueuedJob({name, metadata}) { + let job; + await this._JobModel.transaction(async (transacting) => { + // Check if a job with this name already exist + const existingJob = await this._JobModel.findOne({name}, {transacting}); + if (!existingJob) { + // If no existing job, create a new one + job = await this._JobModel.add({ + id: new ObjectID().toHexString(), + name: name, + status: 'queued', + created_at: new Date(), + metadata: JSON.stringify(metadata), + queue_entry: 1 + }, {transacting}); + } + // If existingJob is found, do nothing (equivalent to IGNORE) + }); + + return job; // Will be undefined if the job already existed + } + + /** + * @method delete + * @async + * @description Deletes a job from the database. + * @param {string} id - The ID of the job to delete. + * @returns {Promise} + */ + async delete(id) { + try { + await this._JobModel.destroy({id}); + } catch (error) { + logging.error(`Error deleting job ${id}:`, error); + } + } } -module.exports = JobsRepository; +module.exports = JobsRepository; \ No newline at end of file diff --git a/ghost/job-manager/lib/workers/generic-worker.js b/ghost/job-manager/lib/workers/generic-worker.js new file mode 100644 index 00000000000..05fe50a95b9 --- /dev/null +++ b/ghost/job-manager/lib/workers/generic-worker.js @@ -0,0 +1,64 @@ +const errors = require('@tryghost/errors'); + +/** + * @module generic-worker + * @description A generic worker module for executing jobs in a worker pool. This allows consuming code to pass in a job file + * when calling for the worker pool to execute a job. + */ + +const workerpool = require('workerpool'); + +/** + * @function executeJob + * @description Executes a job by requiring the job module and calling it with the provided data. + * @param {string} jobPath - The absolute file path to the job module. + * @param {Object} jobData - The data to be passed to the job function as the first argument. + * @returns {Promise<*>} The result of the job execution. + * @throws {Error} If the job module doesn't export a function or if the execution fails. + */ +function executeJob(jobPath, jobData) { + let jobModule; + try { + jobModule = require(jobPath); + } catch (err) { + throw new errors.IncorrectUsageError({ + message: `Failed to load job module: ${err.message}`, + err + }); + } + + if (typeof jobModule !== 'function') { + throw new errors.IncorrectUsageError({ + message: `Job module at ${jobPath} does not export a function` + }); + } + + try { + return jobModule(jobData); + } catch (err) { + throw new errors.IncorrectUsageError({ + message: `Failed to execute job: ${err.message}`, + err + }); + } +} + +/** + * @function registerWorker + * @description Registers the executeJob function as a worker method with workerpool. + */ +function registerWorker() { + workerpool.worker({ + executeJob: executeJob + }); +} + +// Only register the worker if this file is being run directly +if (require.main === module) { + registerWorker(); +} + +module.exports = { + executeJob, + registerWorker +}; \ No newline at end of file diff --git a/ghost/job-manager/package.json b/ghost/job-manager/package.json index 1a7418e1a3d..7843d847b8d 100644 --- a/ghost/job-manager/package.json +++ b/ghost/job-manager/package.json @@ -23,6 +23,7 @@ "date-fns": "2.30.0", "delay": "5.0.0", "mocha": "10.2.0", + "rewire": "^7.0.0", "should": "13.2.3", "sinon": "15.2.0" }, @@ -33,6 +34,7 @@ "bree": "6.5.0", "cron-validate": "1.4.5", "fastq": "1.17.1", - "p-wait-for": "3.2.0" + "p-wait-for": "3.2.0", + "workerpool": "^9.1.3" } } diff --git a/ghost/job-manager/test/generic-worker.test.js b/ghost/job-manager/test/generic-worker.test.js new file mode 100644 index 00000000000..34ceede37b6 --- /dev/null +++ b/ghost/job-manager/test/generic-worker.test.js @@ -0,0 +1,56 @@ +const rewire = require('rewire'); +const sinon = require('sinon'); +const path = require('path'); +const GhostErrors = require('@tryghost/errors'); + +describe('Generic Worker', function () { + let genericWorker; + let workerpoolStub; + + beforeEach(function () { + workerpoolStub = { + worker: sinon.stub() + }; + + genericWorker = rewire('../lib/workers/generic-worker'); + genericWorker.__set__('workerpool', workerpoolStub); + }); + + describe('executeJob', function () { + it('should execute a valid job module', function () { + const jobPath = path.join(__dirname, 'mock-job.js'); + const jobData = {test: 'data'}; + const mockJobModule = sinon.stub().returns('job result'); + + genericWorker.__set__('require', p => (p === jobPath ? mockJobModule : require(p))); + + const result = genericWorker.executeJob(jobPath, jobData); + + mockJobModule.calledWith(jobData).should.be.true; + result.should.equal('job result'); + }); + + it('should throw an error if job module does not export a function', function () { + const jobPath = path.join(__dirname, 'invalid-job.js'); + const jobData = {test: 'data'}; + + genericWorker.__set__('require', p => (p === jobPath ? {} : require(p))); + + (() => genericWorker.executeJob(jobPath, jobData)).should.throw(GhostErrors.IncorrectUsageError, { + message: `Job module at ${jobPath} does not export a function` + }); + }); + + it('should throw an error if job execution fails', function () { + const jobPath = path.join(__dirname, 'failing-job.js'); + const jobData = {test: 'data'}; + const mockJobModule = sinon.stub().throws(new Error('Job execution failed')); + + genericWorker.__set__('require', p => (p === jobPath ? mockJobModule : require(p))); + + (() => genericWorker.executeJob(jobPath, jobData)).should.throw(GhostErrors.IncorrectUsageError, { + message: 'Failed to execute job: Job execution failed' + }); + }); + }); +}); \ No newline at end of file diff --git a/ghost/job-manager/test/job-manager.test.js b/ghost/job-manager/test/job-manager.test.js index b8add5cc2b1..6f63e6deea6 100644 --- a/ghost/job-manager/test/job-manager.test.js +++ b/ghost/job-manager/test/job-manager.test.js @@ -21,11 +21,39 @@ const jobModelInstance = { } }; +const queuedJob = { + name: 'test-job', + metadata: { + job: path.resolve(__dirname, './jobs/simple.js'), + data: 'test data' + } +}; + describe('Job Manager', function () { + let stubConfig, stubJobQueueManager, jobManager; + beforeEach(function () { sandbox.stub(logging, 'info'); sandbox.stub(logging, 'warn'); sandbox.stub(logging, 'error'); + + stubConfig = { + get: sinon.stub().returns({ + enabled: true, + queue: { + enabled: true + } + }) + }; + + stubJobQueueManager = { + addJob: sinon.stub().resolves({id: 'job1'}) + }; + + jobManager = new JobManager({ + config: stubConfig, + jobQueueManager: stubJobQueueManager + }); }); afterEach(function () { @@ -33,32 +61,32 @@ describe('Job Manager', function () { }); it('public interface', function () { - const jobManager = new JobManager({}); - should.exist(jobManager.addJob); should.exist(jobManager.hasExecutedSuccessfully); should.exist(jobManager.awaitOneOffCompletion); + should.exist(jobManager.awaitCompletion); + should.exist(jobManager.allSettled); + should.exist(jobManager.removeJob); + should.exist(jobManager.shutdown); + should.exist(jobManager.inlineJobHandler); + should.exist(jobManager.addQueuedJob); }); describe('Add a job', function () { describe('Inline jobs', function () { it('adds a job to a queue', async function () { const spy = sinon.spy(); - const jobManager = new JobManager({ - JobModel: sinon.stub().resolves() - }); - jobManager.addJob({ job: spy, data: 'test data', offloaded: false }); - should(jobManager.queue.idle()).be.false(); + should(jobManager.inlineQueue.idle()).be.false(); // give time to execute the job await delay(1); - should(jobManager.queue.idle()).be.true(); + should(jobManager.inlineQueue.idle()).be.true(); should(spy.called).be.true(); should(spy.args[0][0]).equal('test data'); }); @@ -68,21 +96,18 @@ describe('Job Manager', function () { const jobModelSpy = { findOne: sinon.spy() }; - const jobManager = new JobManager({ - JobModel: jobModelSpy - }); jobManager.addJob({ job: spy, data: 'test data', offloaded: false }); - should(jobManager.queue.idle()).be.false(); + should(jobManager.inlineQueue.idle()).be.false(); // give time to execute the job await delay(1); - should(jobManager.queue.idle()).be.true(); + should(jobManager.inlineQueue.idle()).be.true(); should(spy.called).be.true(); should(spy.args[0][0]).equal('test data'); should(logging.error.called).be.true(); @@ -93,8 +118,6 @@ describe('Job Manager', function () { describe('Offloaded jobs', function () { it('fails to schedule for invalid scheduling expression', function () { - const jobManager = new JobManager({}); - try { jobManager.addJob({ at: 'invalid expression', @@ -106,8 +129,6 @@ describe('Job Manager', function () { }); it('fails to schedule for no job name', function () { - const jobManager = new JobManager({}); - try { jobManager.addJob({ at: 'invalid expression', @@ -119,7 +140,6 @@ describe('Job Manager', function () { }); it('schedules a job using date format', async function () { - const jobManager = new JobManager({}); const timeInTenSeconds = new Date(Date.now() + 10); const jobPath = path.resolve(__dirname, './jobs/simple.js'); @@ -157,7 +177,6 @@ describe('Job Manager', function () { }); it('schedules a job to run immediately', async function () { - const jobManager = new JobManager({}); const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); @@ -189,7 +208,6 @@ describe('Job Manager', function () { }); it('fails to schedule a job with the same name to run immediately one after another', async function () { - const jobManager = new JobManager({}); const clock = FakeTimers.install({now: Date.now()}); const jobPath = path.resolve(__dirname, './jobs/simple.js'); @@ -232,7 +250,7 @@ describe('Job Manager', function () { throw new Error('job error'); }; const spyHandler = sinon.spy(); - const jobManager = new JobManager({errorHandler: spyHandler}); + jobManager = new JobManager({errorHandler: spyHandler, config: stubConfig}); const completion = jobManager.awaitCompletion('will-fail'); jobManager.addJob({ @@ -249,7 +267,7 @@ describe('Job Manager', function () { it('uses worker message handler when job sends a message', async function (){ const workerMessageHandlerSpy = sinon.spy(); - const jobManager = new JobManager({workerMessageHandler: workerMessageHandlerSpy}); + jobManager = new JobManager({workerMessageHandler: workerMessageHandlerSpy, config: stubConfig}); const completion = jobManager.awaitCompletion('will-send-msg'); jobManager.addJob({ @@ -271,8 +289,6 @@ describe('Job Manager', function () { describe('Add one off job', function () { it('throws if name parameter is not provided', async function () { - const jobManager = new JobManager({}); - try { await jobManager.addOneOffJob({ job: () => {} @@ -291,7 +307,7 @@ describe('Job Manager', function () { add: sinon.stub().resolves() }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); await jobManager.addOneOffJob({ job: spy, name: 'unique name', @@ -309,7 +325,7 @@ describe('Job Manager', function () { add: sinon.stub().throws('should not be called') }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); try { await jobManager.addOneOffJob({ @@ -334,7 +350,7 @@ describe('Job Manager', function () { edit: sinon.stub().resolves({name: 'successful-oneoff'}) }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); const completion = jobManager.awaitCompletion('successful-oneoff'); jobManager.addOneOffJob({ @@ -376,7 +392,7 @@ describe('Job Manager', function () { throw new Error('job error'); }; const spyHandler = sinon.spy(); - const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); + jobManager = new JobManager({errorHandler: spyHandler, JobModel, config: stubConfig}); const completion = jobManager.awaitCompletion('failed-oneoff'); await jobManager.addOneOffJob({ @@ -420,7 +436,7 @@ describe('Job Manager', function () { throw new Error('job error'); }; const spyHandler = sinon.spy(); - const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); + jobManager = new JobManager({errorHandler: spyHandler, JobModel, config: stubConfig}); const completion1 = jobManager.awaitCompletion('failed-oneoff'); await jobManager.addOneOffJob({ @@ -458,7 +474,7 @@ describe('Job Manager', function () { add: sinon.stub().resolves() }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); await jobManager.addOneOffJob({ job: spy, name: 'unique name', @@ -475,7 +491,7 @@ describe('Job Manager', function () { add: sinon.stub().throws('should not be called') }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); try { await jobManager.addOneOffJob({ @@ -499,7 +515,7 @@ describe('Job Manager', function () { edit: sinon.stub().resolves({name: 'successful-oneoff'}) }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); const jobCompletion = jobManager.awaitCompletion('successful-oneoff'); @@ -541,7 +557,7 @@ describe('Job Manager', function () { throw new Error('job error'); }; const spyHandler = sinon.spy(); - const jobManager = new JobManager({errorHandler: spyHandler, JobModel}); + jobManager = new JobManager({errorHandler: spyHandler, JobModel, config: stubConfig}); const completion = jobManager.awaitCompletion('failed-oneoff'); @@ -596,7 +612,7 @@ describe('Job Manager', function () { }) }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); let executed = await jobManager.hasExecutedSuccessfully('solovei'); should.equal(executed, false); @@ -631,7 +647,7 @@ describe('Job Manager', function () { add: sinon.stub().resolves() }; - const jobManager = new JobManager({JobModel}); + jobManager = new JobManager({JobModel, config: stubConfig}); await jobManager.addOneOffJob({ job: jobWithDelay, @@ -647,7 +663,7 @@ describe('Job Manager', function () { describe('Remove a job', function () { it('removes a scheduled job from the queue', async function () { - const jobManager = new JobManager({}); + jobManager = new JobManager({config: stubConfig}); const timeInTenSeconds = new Date(Date.now() + 10); const jobPath = path.resolve(__dirname, './jobs/simple.js'); @@ -665,9 +681,26 @@ describe('Job Manager', function () { }); }); + describe('Add a queued job', function () { + it('submits a job to the job queue if enabled', async function () { + stubConfig.get.returns(true); + const result = await jobManager.addQueuedJob(queuedJob); + should(stubJobQueueManager.addJob.calledOnce).be.true(); + should(stubJobQueueManager.addJob.firstCall.args[0]).deepEqual(queuedJob); + should(result).have.property('id', 'job1'); + }); + + it('does not submit a job to the job queue if disabled', async function () { + stubConfig.get.returns(false); + const result = await jobManager.addQueuedJob(queuedJob); + should(stubJobQueueManager.addJob.called).be.false(); + should(result).be.undefined(); + }); + }); + describe('Shutdown', function () { - it('gracefully shuts down an inline jobs', async function () { - const jobManager = new JobManager({}); + it('gracefully shuts down inline jobs', async function () { + jobManager = new JobManager({config: stubConfig}); jobManager.addJob({ job: require('./jobs/timed-job'), @@ -675,15 +708,15 @@ describe('Job Manager', function () { offloaded: false }); - should(jobManager.queue.idle()).be.false(); + should(jobManager.inlineQueue.idle()).be.false(); await jobManager.shutdown(); - should(jobManager.queue.idle()).be.true(); + should(jobManager.inlineQueue.idle()).be.true(); }); it('gracefully shuts down an interval job', async function () { - const jobManager = new JobManager({}); + jobManager = new JobManager({config: stubConfig}); jobManager.addJob({ at: 'every 5 seconds', @@ -700,5 +733,7 @@ describe('Job Manager', function () { should(Object.keys(jobManager.bree.intervals).length).equal(0); }); + + it('gracefully shuts down the job queue worker pool'); }); -}); +}); \ No newline at end of file diff --git a/ghost/job-manager/test/job-queue-manager.test.js b/ghost/job-manager/test/job-queue-manager.test.js new file mode 100644 index 00000000000..d178a4729c8 --- /dev/null +++ b/ghost/job-manager/test/job-queue-manager.test.js @@ -0,0 +1,315 @@ +const sinon = require('sinon'); +const {expect} = require('chai'); +const JobQueueManager = require('../lib/JobQueueManager'); + +describe('JobQueueManager', function () { + let jobQueueManager; + let mockJobModel; + let mockConfig; + let mockLogger; + let mockWorkerPool; + + beforeEach(function () { + mockJobModel = {}; + mockConfig = { + get: sinon.stub().returns({}) + }; + mockLogger = { + info: sinon.stub(), + error: sinon.stub() + }; + mockWorkerPool = { + pool: sinon.stub().returns({ + exec: sinon.stub(), + stats: sinon.stub().returns({}), + terminate: sinon.stub() + }) + }; + + jobQueueManager = new JobQueueManager({ + JobModel: mockJobModel, + config: mockConfig, + logger: mockLogger, + WorkerPool: mockWorkerPool + }); + }); + + afterEach(function () { + sinon.restore(); + }); + + describe('initialization', function () { + it('should initialize with provided dependencies', function () { + expect(jobQueueManager.jobsRepository).to.exist; + expect(jobQueueManager.config).to.exist; + expect(jobQueueManager.logger).to.exist; + expect(jobQueueManager.pool).to.exist; + }); + }); + + describe('init', function () { + it('should start the job queue manager', async function () { + const startQueueProcessorStub = sinon.stub(jobQueueManager, 'startQueueProcessor'); + const reportStatsStub = sinon.stub(jobQueueManager, 'reportStats'); + + await jobQueueManager.init(); + + expect(startQueueProcessorStub.calledOnce).to.be.true; + expect(reportStatsStub.called).to.be.false; + + // Test with reportStats enabled + jobQueueManager.config.reportStats = true; + await jobQueueManager.init(); + expect(reportStatsStub.calledOnce).to.be.true; + }); + + it('should call reportStats when config.reportStats is true', async function () { + const startQueueProcessorStub = sinon.stub(jobQueueManager, 'startQueueProcessor'); + const reportStatsStub = sinon.stub(jobQueueManager, 'reportStats'); + jobQueueManager.config.reportStats = true; + + await jobQueueManager.init(); + + expect(startQueueProcessorStub.calledOnce).to.be.true; + expect(reportStatsStub.calledOnce).to.be.true; + }); + }); + + describe('shutdown', function () { + it('should handle errors during shutdown', async function () { + const error = new Error('Termination error'); + jobQueueManager.pool.terminate.rejects(error); + const loggerErrorStub = sinon.stub(jobQueueManager.logger, 'error'); + + await jobQueueManager.shutdown(); + + expect(jobQueueManager.pool.terminate.calledOnce).to.be.true; + expect(loggerErrorStub.calledWith('Error terminating worker pool:', error)).to.be.true; + }); + + it('should stop the job queue manager without errors', async function () { + jobQueueManager.pool.terminate.resolves(); + const loggerErrorStub = sinon.stub(jobQueueManager.logger, 'error'); + + await jobQueueManager.shutdown(); + + expect(jobQueueManager.pool.terminate.calledOnce).to.be.true; + expect(loggerErrorStub.called).to.be.false; + }); + }); + + describe('addJob', function () { + it('should add a new job', async function () { + const mockJob = {name: 'testJob', metadata: {}}; + const addQueuedJobStub = sinon.stub(jobQueueManager.jobsRepository, 'addQueuedJob').resolves('jobId'); + + const result = await jobQueueManager.addJob(mockJob); + + expect(addQueuedJobStub.calledOnceWith(mockJob)).to.be.true; + expect(result).to.equal('jobId'); + }); + }); + + describe('processPendingJobs', function () { + it('should process pending jobs', async function () { + const mockStats = {pendingTasks: 0}; + const mockJobs = [{get: sinon.stub().returns('{}')}]; + + sinon.stub(jobQueueManager, 'getStats').resolves(mockStats); + sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves(mockJobs); + sinon.stub(jobQueueManager, 'updatePollInterval'); + sinon.stub(jobQueueManager, 'processJobs'); + + await jobQueueManager.processPendingJobs(); + + expect(jobQueueManager.jobsRepository.getQueuedJobs.calledOnce).to.be.true; + expect(jobQueueManager.updatePollInterval.calledOnceWith(mockJobs)).to.be.true; + expect(jobQueueManager.processJobs.calledOnceWith(mockJobs)).to.be.true; + }); + }); + + describe('createLogger', function () { + it('should create a logger with info level', function () { + const logger = jobQueueManager.createLogger(mockLogger, 'info'); + logger.info('Test info'); + logger.error('Test error'); + expect(mockLogger.info.calledOnce).to.be.true; + expect(mockLogger.error.calledOnce).to.be.true; + }); + + it('should create a logger with error level', function () { + const logger = jobQueueManager.createLogger(mockLogger, 'error'); + logger.info('Test info'); + logger.error('Test error'); + expect(mockLogger.info.called).to.be.false; + expect(mockLogger.error.calledOnce).to.be.true; + }); + }); + + describe('initializeConfig', function () { + it('should initialize config with default values', function () { + const config = jobQueueManager.initializeConfig({}); + expect(config.MIN_POLL_INTERVAL).to.equal(1000); + expect(config.MAX_POLL_INTERVAL).to.equal(60000); + expect(config.QUEUE_CAPACITY).to.equal(500); + expect(config.FETCH_COUNT).to.equal(500); + }); + + it('should use provided values in config', function () { + const config = jobQueueManager.initializeConfig({ + pollMinInterval: 2000, + pollMaxInterval: 120000, + queueCapacity: 1000, + fetchCount: 100 + }); + expect(config.MIN_POLL_INTERVAL).to.equal(2000); + expect(config.MAX_POLL_INTERVAL).to.equal(120000); + expect(config.QUEUE_CAPACITY).to.equal(1000); + expect(config.FETCH_COUNT).to.equal(100); + }); + }); + + describe('startQueueProcessor', function () { + it('should start polling for jobs', async function () { + const clock = sinon.useFakeTimers(); + const processPendingJobsStub = sinon.stub(jobQueueManager, 'processPendingJobs').resolves(); + + jobQueueManager.startQueueProcessor(); + + // No need to tick the clock, as polling should start immediately + expect(processPendingJobsStub.calledOnce).to.be.true; + + // Optionally, we can test the next poll + await clock.tickAsync(jobQueueManager.state.currentPollInterval); + expect(processPendingJobsStub.calledTwice).to.be.true; + + clock.restore(); + }); + + it('should handle errors during polling', async function () { + const clock = sinon.useFakeTimers(); + const error = new Error('Test error'); + sinon.stub(jobQueueManager, 'processPendingJobs').rejects(error); + + // Create a stub for the logger.error method + const loggerErrorStub = sinon.stub(); + jobQueueManager.logger.error = loggerErrorStub; + + jobQueueManager.startQueueProcessor(); + + await clock.tickAsync(jobQueueManager.state.currentPollInterval); + expect(loggerErrorStub.calledWith('Error in queue filler:', error)).to.be.true; + + clock.restore(); + }); + }); + + describe('updatePollInterval', function () { + it('should set to MIN_POLL_INTERVAL when jobs are found', function () { + jobQueueManager.state.currentPollInterval = 60000; + jobQueueManager.updatePollInterval([{}]); + expect(jobQueueManager.state.currentPollInterval).to.equal(jobQueueManager.config.MIN_POLL_INTERVAL); + }); + + it('should set to MAX_POLL_INTERVAL when no jobs found for a while', function () { + const clock = sinon.useFakeTimers(); + jobQueueManager.state.lastFoundJobTime = Date.now() - 31000; + jobQueueManager.updatePollInterval([]); + expect(jobQueueManager.state.currentPollInterval).to.equal(jobQueueManager.config.MAX_POLL_INTERVAL); + clock.restore(); + }); + }); + + describe('processJobs', function () { + it('should process new jobs', async function () { + const executeJobStub = sinon.stub(jobQueueManager, 'executeJob').resolves(); + const jobs = [ + { + get: sinon.stub().returns('{"name": "testJob1"}'), + id: '1' + }, + { + get: sinon.stub().returns('{"name": "testJob2"}'), + id: '2' + } + ]; + await jobQueueManager.processJobs(jobs); + expect(executeJobStub.calledTwice).to.be.true; + }); + + it('should skip already queued jobs', async function () { + const executeJobStub = sinon.stub(jobQueueManager, 'executeJob').resolves(); + jobQueueManager.state.queuedJobs.add('testJob1'); + const jobs = [ + { + get: sinon.stub().returns('{"name": "testJob1"}'), + id: '1' + }, + { + get: sinon.stub().returns('{"name": "testJob2"}'), + id: '2' + } + ]; + await jobQueueManager.processJobs(jobs); + expect(executeJobStub.calledOnce).to.be.true; + expect(executeJobStub.calledWith(jobs[1], 'testJob2', {name: 'testJob2'})).to.be.true; + }); + }); + + describe('executeJob', function () { + it('should execute a job successfully', async function () { + const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; + const deleteStub = sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); + + await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); + + expect(jobQueueManager.pool.exec.calledOnce).to.be.true; + expect(deleteStub.calledWith('1')).to.be.true; + expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; + }); + + it('should handle job execution errors', async function () { + const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; + const error = new Error('Test error'); + jobQueueManager.pool.exec.rejects(error); + const handleJobErrorStub = sinon.stub(jobQueueManager, 'handleJobError').resolves(); + + await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); + + expect(handleJobErrorStub.calledWith(job, {job: 'testJob', data: {}}, error)).to.be.true; + expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; + }); + }); + + describe('handleJobError', function () { + it('should handle Error object', async function () { + const job = {id: '1'}; + const jobMetadata = {retries: 0}; + + // Ensure jobsRepository is properly initialized + jobQueueManager.jobsRepository = jobQueueManager.jobsRepository || {}; + + // Create the stub on the jobsRepository + const updateStub = sinon.stub(jobQueueManager.jobsRepository, 'update').resolves(); + + const error = new Error('Test error'); + + await jobQueueManager.handleJobError(job, jobMetadata, error); + + expect(updateStub.called, 'update should be called').to.be.true; + expect(updateStub.calledOnce, 'update should be called once').to.be.true; + + const [calledId, calledUpdateData] = updateStub.args[0]; + + expect(calledId).to.equal('1'); + expect(calledUpdateData).to.deep.include({ + status: 'error', + metadata: { + error: 'Test error', + retries: 1 + } + }); + expect(calledUpdateData.finished_at).to.be.instanceOf(Date); + }); + }); +}); \ No newline at end of file diff --git a/ghost/member-events/index.js b/ghost/member-events/index.js index 6995e32620e..233b401db1a 100644 --- a/ghost/member-events/index.js +++ b/ghost/member-events/index.js @@ -12,5 +12,6 @@ module.exports = { SubscriptionActivatedEvent: require('./lib/SubscriptionActivatedEvent'), SubscriptionCancelledEvent: require('./lib/SubscriptionCancelledEvent'), OfferRedemptionEvent: require('./lib/OfferRedemptionEvent'), - MemberLinkClickEvent: require('./lib/MemberLinkClickEvent') + MemberLinkClickEvent: require('./lib/MemberLinkClickEvent'), + MemberEmailAnalyticsUpdateEvent: require('./lib/MemberEmailAnalyticsUpdateEvent') }; diff --git a/ghost/member-events/lib/MemberEmailAnalyticsUpdateEvent.js b/ghost/member-events/lib/MemberEmailAnalyticsUpdateEvent.js new file mode 100644 index 00000000000..21df9f08218 --- /dev/null +++ b/ghost/member-events/lib/MemberEmailAnalyticsUpdateEvent.js @@ -0,0 +1,23 @@ +/** + * @typedef {object} MemberEmailAnalyticsUpdateEventData + * @prop {string} memberId + */ + +module.exports = class MemberEmailAnalyticsUpdateEvent { + /** + * @param {MemberEmailAnalyticsUpdateEventData} data + * @param {Date} timestamp + */ + constructor(data, timestamp) { + this.data = data; + this.timestamp = timestamp; + } + + /** + * @param {MemberEmailAnalyticsUpdateEventData} data + * @param {Date} [timestamp] + */ + static create(data, timestamp) { + return new MemberEmailAnalyticsUpdateEvent(data, timestamp ?? new Date); + } +}; \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 6093aae4e09..1141abf571f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3053,6 +3053,11 @@ resolved "https://registry.yarnpkg.com/@eslint-community/regexpp/-/regexpp-4.6.1.tgz#0b371c118b8e4ebf9dbddb56120ab4befd791211" integrity sha512-O7x6dMstWLn2ktjcoiNLDkAGG2EjveHL+Vvc+n0fXumkJYAcSqcVYKtwDU+hDZ0uDUsnUagSYaZrOLAYE8un1A== +"@eslint-community/regexpp@^4.6.1": + version "4.11.1" + resolved "https://registry.yarnpkg.com/@eslint-community/regexpp/-/regexpp-4.11.1.tgz#a547badfc719eb3e5f4b556325e542fbe9d7a18f" + integrity sha512-m4DVN9ZqskZoLU5GlWZadwDnYo3vAEydiUayB9widCl9ffWx2IvPnp6n3on5rJmziJSw9Bv+Z3ChDVdMwXCY8Q== + "@eslint/eslintrc@^0.4.3": version "0.4.3" resolved "https://registry.yarnpkg.com/@eslint/eslintrc/-/eslintrc-0.4.3.tgz#9e42981ef035beb3dd49add17acb96e8ff6f394c" @@ -3083,11 +3088,31 @@ minimatch "^3.1.2" strip-json-comments "^3.1.1" +"@eslint/eslintrc@^2.1.4": + version "2.1.4" + resolved "https://registry.yarnpkg.com/@eslint/eslintrc/-/eslintrc-2.1.4.tgz#388a269f0f25c1b6adc317b5a2c55714894c70ad" + integrity sha512-269Z39MS6wVJtsoUl10L60WdkhJVdPG24Q4eZTH3nnF6lpvSShEK3wQjDX9JRWAUPvPh7COouPpU9IrqaZFvtQ== + dependencies: + ajv "^6.12.4" + debug "^4.3.2" + espree "^9.6.0" + globals "^13.19.0" + ignore "^5.2.0" + import-fresh "^3.2.1" + js-yaml "^4.1.0" + minimatch "^3.1.2" + strip-json-comments "^3.1.1" + "@eslint/js@8.44.0": version "8.44.0" resolved "https://registry.yarnpkg.com/@eslint/js/-/js-8.44.0.tgz#961a5903c74139390478bdc808bcde3fc45ab7af" integrity sha512-Ag+9YM4ocKQx9AarydN0KY2j0ErMHNIocPDrVo8zAE44xLTjEtz81OdR68/cydGtk6m6jDb5Za3r2useMzYmSw== +"@eslint/js@8.57.1": + version "8.57.1" + resolved "https://registry.yarnpkg.com/@eslint/js/-/js-8.57.1.tgz#de633db3ec2ef6a3c89e2f19038063e8a122e2c2" + integrity sha512-d9zaMRSTIKDLhctzH12MtXvJKSSUhaHcjV+2Z+GK+EEY7XKpP5yR4x+N3TAcHTcu963nIr+TMcCb4DBCYX1z6Q== + "@extractus/oembed-extractor@3.2.1": version "3.2.1" resolved "https://registry.yarnpkg.com/@extractus/oembed-extractor/-/oembed-extractor-3.2.1.tgz#6158907bc436cc62b0692b4f550bf8a371d3e0d1" @@ -3366,6 +3391,15 @@ debug "^4.1.1" minimatch "^3.0.5" +"@humanwhocodes/config-array@^0.13.0": + version "0.13.0" + resolved "https://registry.yarnpkg.com/@humanwhocodes/config-array/-/config-array-0.13.0.tgz#fb907624df3256d04b9aa2df50d7aa97ec648748" + integrity sha512-DZLEEqFWQFiyK6h5YIeynKx7JlvCYWL0cImfSRXZ9l4Sg2efkFGTuFf6vzXjK1cq6IYkU+Eg/JizXw+TD2vRNw== + dependencies: + "@humanwhocodes/object-schema" "^2.0.3" + debug "^4.3.1" + minimatch "^3.0.5" + "@humanwhocodes/config-array@^0.5.0": version "0.5.0" resolved "https://registry.yarnpkg.com/@humanwhocodes/config-array/-/config-array-0.5.0.tgz#1407967d4c6eecd7388f83acf1eaf4d0c6e58ef9" @@ -3385,6 +3419,11 @@ resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-1.2.1.tgz#b520529ec21d8e5945a1851dfd1c32e94e39ff45" integrity sha512-ZnQMnLV4e7hDlUvw8H+U8ASL02SS2Gn6+9Ac3wGGLIe7+je2AeAOxPY+izIPJDfFDb7eDjev0Us8MO1iFRN8hA== +"@humanwhocodes/object-schema@^2.0.3": + version "2.0.3" + resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-2.0.3.tgz#4a2868d75d6d6963e423bcf90b7fd1be343409d3" + integrity sha512-93zYdMES/c1D69yZiKDBj0V24vqNzB/koF26KPaagAfd3P/4gUlh3Dys5ogAK+Exi9QyzlD8x/08Zt7wIKcDcA== + "@isaacs/ttlcache@1.4.1": version "1.4.1" resolved "https://registry.yarnpkg.com/@isaacs/ttlcache/-/ttlcache-1.4.1.tgz#21fb23db34e9b6220c6ba023a0118a2dd3461ea2" @@ -8665,6 +8704,11 @@ "@uiw/codemirror-extensions-basic-setup" "4.23.6" codemirror "^6.0.0" +"@ungap/structured-clone@^1.2.0": + version "1.2.0" + resolved "https://registry.yarnpkg.com/@ungap/structured-clone/-/structured-clone-1.2.0.tgz#756641adb587851b5ccb3e095daf27ae581c8406" + integrity sha512-zuVdFrMJiuCDQUMCzQaD6KL28MjnqqN8XnAqiEq9PNm/hCPTSGfrXCOfwj1ow4LFb/tNymJPwsNbVePc1xFqrQ== + "@vitejs/plugin-react@4.2.1": version "4.2.1" resolved "https://registry.yarnpkg.com/@vitejs/plugin-react/-/plugin-react-4.2.1.tgz#744d8e4fcb120fc3dbaa471dadd3483f5a304bb9" @@ -16671,6 +16715,14 @@ eslint-scope@^7.2.0: esrecurse "^4.3.0" estraverse "^5.2.0" +eslint-scope@^7.2.2: + version "7.2.2" + resolved "https://registry.yarnpkg.com/eslint-scope/-/eslint-scope-7.2.2.tgz#deb4f92563390f32006894af62a22dba1c46423f" + integrity sha512-dOt21O7lTMhDM+X9mB4GX+DZrZtCUJPL/wlcTqxyrx5IvO0IYtILdtrQGQp+8n5S0gwSVmOf9NQrjMOgfQZlIg== + dependencies: + esrecurse "^4.3.0" + estraverse "^5.2.0" + eslint-utils@^2.0.0, eslint-utils@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/eslint-utils/-/eslint-utils-2.1.0.tgz#d2de5e03424e707dc10c74068ddedae708741b27" @@ -16700,6 +16752,11 @@ eslint-visitor-keys@^3.3.0, eslint-visitor-keys@^3.4.1: resolved "https://registry.yarnpkg.com/eslint-visitor-keys/-/eslint-visitor-keys-3.4.1.tgz#c22c48f48942d08ca824cc526211ae400478a994" integrity sha512-pZnmmLwYzf+kWaM/Qgrvpen51upAktaaiI01nsJD/Yr3lMOdNtq0cxkrrg16w64VtisN6okbs7Q8AfGqj4c9fA== +eslint-visitor-keys@^3.4.3: + version "3.4.3" + resolved "https://registry.yarnpkg.com/eslint-visitor-keys/-/eslint-visitor-keys-3.4.3.tgz#0cd72fe8550e3c2eae156a96a4dddcd1c8ac5800" + integrity sha512-wpc+LXeiyiisxPlEkUzU6svyS1frIO3Mgxj1fdy7Pm8Ygzguax2N3Fa/D/ag1WqbOprdI+uY6wMUl8/a2G+iag== + eslint@8.44.0: version "8.44.0" resolved "https://registry.yarnpkg.com/eslint/-/eslint-8.44.0.tgz#51246e3889b259bbcd1d7d736a0c10add4f0e500" @@ -16791,6 +16848,50 @@ eslint@^7.32.0: text-table "^0.2.0" v8-compile-cache "^2.0.3" +eslint@^8.47.0: + version "8.57.1" + resolved "https://registry.yarnpkg.com/eslint/-/eslint-8.57.1.tgz#7df109654aba7e3bbe5c8eae533c5e461d3c6ca9" + integrity sha512-ypowyDxpVSYpkXr9WPv2PAZCtNip1Mv5KTW0SCurXv/9iOpcrH9PaqUElksqEB6pChqHGDRCFTyrZlGhnLNGiA== + dependencies: + "@eslint-community/eslint-utils" "^4.2.0" + "@eslint-community/regexpp" "^4.6.1" + "@eslint/eslintrc" "^2.1.4" + "@eslint/js" "8.57.1" + "@humanwhocodes/config-array" "^0.13.0" + "@humanwhocodes/module-importer" "^1.0.1" + "@nodelib/fs.walk" "^1.2.8" + "@ungap/structured-clone" "^1.2.0" + ajv "^6.12.4" + chalk "^4.0.0" + cross-spawn "^7.0.2" + debug "^4.3.2" + doctrine "^3.0.0" + escape-string-regexp "^4.0.0" + eslint-scope "^7.2.2" + eslint-visitor-keys "^3.4.3" + espree "^9.6.1" + esquery "^1.4.2" + esutils "^2.0.2" + fast-deep-equal "^3.1.3" + file-entry-cache "^6.0.1" + find-up "^5.0.0" + glob-parent "^6.0.2" + globals "^13.19.0" + graphemer "^1.4.0" + ignore "^5.2.0" + imurmurhash "^0.1.4" + is-glob "^4.0.0" + is-path-inside "^3.0.3" + js-yaml "^4.1.0" + json-stable-stringify-without-jsonify "^1.0.1" + levn "^0.4.1" + lodash.merge "^4.6.2" + minimatch "^3.1.2" + natural-compare "^1.4.0" + optionator "^0.9.3" + strip-ansi "^6.0.1" + text-table "^0.2.0" + esm@^3.2.25, esm@^3.2.4: version "3.2.25" resolved "https://registry.yarnpkg.com/esm/-/esm-3.2.25.tgz#342c18c29d56157688ba5ce31f8431fbb795cc10" @@ -16814,6 +16915,15 @@ espree@^9.6.0: acorn-jsx "^5.3.2" eslint-visitor-keys "^3.4.1" +espree@^9.6.1: + version "9.6.1" + resolved "https://registry.yarnpkg.com/espree/-/espree-9.6.1.tgz#a2a17b8e434690a5432f2f8018ce71d331a48c6f" + integrity sha512-oruZaFkjorTpF32kDSI5/75ViwGeZginGGy2NoOSg3Q9bnwlnmDm4HLnkl0RE3n+njDXR037aY1+x58Z/zFdwQ== + dependencies: + acorn "^8.9.0" + acorn-jsx "^5.3.2" + eslint-visitor-keys "^3.4.1" + esprima@1.2.2: version "1.2.2" resolved "https://registry.yarnpkg.com/esprima/-/esprima-1.2.2.tgz#76a0fd66fcfe154fd292667dc264019750b1657b" @@ -27761,6 +27871,13 @@ rewire@6.0.0: dependencies: eslint "^7.32.0" +rewire@^7.0.0: + version "7.0.0" + resolved "https://registry.yarnpkg.com/rewire/-/rewire-7.0.0.tgz#41db5482370c88758ffc9a719f7c92a761fa8fbf" + integrity sha512-DyyNyzwMtGYgu0Zl/ya0PR/oaunM+VuCuBxCuhYJHHaV0V+YvYa3bBGxb5OZ71vndgmp1pYY8F4YOwQo1siRGw== + dependencies: + eslint "^8.47.0" + rfdc@^1.4.1: version "1.4.1" resolved "https://registry.yarnpkg.com/rfdc/-/rfdc-1.4.1.tgz#778f76c4fb731d93414e8f925fbecf64cce7f6ca" @@ -31729,6 +31846,11 @@ workerpool@^6.0.2, workerpool@^6.0.3, workerpool@^6.1.5, workerpool@^6.4.0, work resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-6.5.1.tgz#060f73b39d0caf97c6db64da004cd01b4c099544" integrity sha512-Fs4dNYcsdpYSAfVxhnl1L5zTksjvOJxtC5hzMNl+1t9B8hTJTdKDyZ5ju7ztgPy+ft9tBFXoOlDNiOT9WUXZlA== +workerpool@^9.1.3: + version "9.1.3" + resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-9.1.3.tgz#34b81f50f777a0e549c6dfaa0926575735e3f4b4" + integrity sha512-LhUrk4tbxJRDQmRrrFWA9EnboXI79fe0ZNTy3u8m+dqPN1EkVSIsQYAB8OF/fkyhG8Rtup+c/bzj/+bzbG8fqg== + wrap-ansi@^6.0.1: version "6.2.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-6.2.0.tgz#e9393ba07102e6c91a3b221478f0257cd2856e53"