Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplemented email analytics prioritizing email opens #20914

Merged
merged 15 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// For information on writing migrations, see https://www.notion.so/ghost/Database-migrations-eb5b78c435d741d2b34a582d57c24253

const logging = require('@tryghost/logging');

// For DML - data changes
const {createTransactionalMigration} = require('../../utils');

module.exports = createTransactionalMigration(
async function up(knex) {
try {
await knex.raw(`
DELETE FROM jobs
WHERE name = 'email-analytics-latest-opened' OR name = 'email-analytics-latest-others' OR name = 'email-analytics-missing';
`);
9larsons marked this conversation as resolved.
Show resolved Hide resolved
} catch (error) {
logging.info(`Failed to delete email analytics jobs: ${error.message}`);
}
},
// down is a no-op
async function down() {}
);
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,30 @@ class EmailAnalyticsServiceWrapper {
});
}

async fetchLatest({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest started');
async fetchLatestOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatest({maxEvents});
const totalEvents = await this.service.fetchLatestOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest opens)`);
return totalEvents;
}

async fetchLatestNonOpenedEvents({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch latest non-opened events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchLatestNonOpenedEvents({maxEvents});
const fetchEndDate = new Date();

logging.info(`[EmailAnalytics] Fetched ${totalEvents} events and aggregated stats in ${fetchEndDate.getTime() - fetchStartDate.getTime()}ms (latest)`);
return totalEvents;
}

async fetchMissing({maxEvents} = {maxEvents: Infinity}) {
logging.info('[EmailAnalytics] Fetch missing started');
logging.info('[EmailAnalytics] Fetch missing events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchMissing({maxEvents});
Expand All @@ -83,7 +94,7 @@ class EmailAnalyticsServiceWrapper {
if (maxEvents < 300) {
return 0;
}
logging.info('[EmailAnalytics] Fetch scheduled started');
logging.info('[EmailAnalytics] Fetch scheduled events started');

const fetchStartDate = new Date();
const totalEvents = await this.service.fetchScheduled({maxEvents});
Expand All @@ -100,13 +111,34 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = true;

// NOTE: Data shows we can process ~2500 events per minute on Pro for a large-ish db (150k members).
// This can vary locally, but we should be conservative with the number of events we fetch.
try {
const c1 = await this.fetchLatest({maxEvents: Infinity});
const c2 = await this.fetchMissing({maxEvents: Infinity});

// Only fetch scheduled if we didn't fetch a lot of normal events
await this.fetchScheduled({maxEvents: 20000 - c1 - c2});

// Prioritize opens since they are the most important (only data directly displayed to users)
const c1 = await this.fetchLatestOpenedEvents({maxEvents: 10000});
if (c1 >= 10000) {
cmraible marked this conversation as resolved.
Show resolved Hide resolved
this._restartFetch('high opened event count');
return;
}

// Set limits on how much we fetch without checkings for opened events. During surge events (following newsletter send)
// we want to make sure we don't spend too much time collecting delivery data.
const c2 = await this.fetchLatestNonOpenedEvents({maxEvents: 10000 - c1});
cmraible marked this conversation as resolved.
Show resolved Hide resolved
const c3 = await this.fetchMissing({maxEvents: 10000 - c1 - c2});

// Always restart immediately instead of waiting for the next scheduled job if we're fetching a lot of events
if ((c1 + c2 + c3) > 10000) {
this._restartFetch('high event count');
return;
}

// Only backfill if we're not currently fetching a lot of events
const c4 = await this.fetchScheduled({maxEvents: 10000});
if (c4 > 0) {
this._restartFetch('scheduled backfill');
return;
}

this.fetching = false;
} catch (e) {
logging.error(e, 'Error while fetching email analytics');
Expand All @@ -116,6 +148,12 @@ class EmailAnalyticsServiceWrapper {
}
this.fetching = false;
}

_restartFetch(reason) {
this.fetching = false;
logging.info(`[EmailAnalytics] Restarting fetch due to ${reason}`);
this.startFetch();
}
}

module.exports = EmailAnalyticsServiceWrapper;
184 changes: 154 additions & 30 deletions ghost/core/core/server/services/email-analytics/lib/queries.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,181 @@
const _ = require('lodash');
const debug = require('@tryghost/debug')('services:email-analytics');
const db = require('../../../data/db');
const logging = require('@tryghost/logging');
const {default: ObjectID} = require('bson-objectid');

const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;

/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */

/**
* Creates a job in the jobs table if it does not already exist.
* @param {EmailAnalyticsJobName} jobName - The name of the job to create.
* @returns {Promise<void>}
*/
async function createJobIfNotExists(jobName) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
started_at: new Date(),
created_at: new Date(),
status: 'started'
}).onConflict('name').ignore();
}

module.exports = {
async shouldFetchStats() {
// don't fetch stats from Mailgun if we haven't sent any emails
const [emailCount] = await db.knex('emails').count('id as count');
return emailCount && emailCount.count > 0;
},

async getLastSeenEventTimestamp() {
/**
* Retrieves the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {EmailAnalyticsEvent[]} [events=['delivered', 'opened', 'failed']] - The email analytics events to consider.
* @returns {Promise<Date|null>} The timestamp of the last seen event, or null if no events are found.
*/
async getLastEventTimestamp(jobName, events = ['delivered', 'opened', 'failed']) {
const startDate = new Date();

let maxOpenedAt;
let maxDeliveredAt;
let maxFailedAt;
const lastJobRunTimestamp = await this.getLastJobRunTimestamp(jobName);

if (lastJobRunTimestamp) {
debug(`Using job data for ${jobName}`);
maxOpenedAt = events.includes('opened') ? lastJobRunTimestamp : null;
maxDeliveredAt = events.includes('delivered') ? lastJobRunTimestamp : null;
maxFailedAt = events.includes('failed') ? lastJobRunTimestamp : null;
} else {
debug(`Job data not found for ${jobName}, using email_recipients data`);
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
if (events.includes('opened')) {
maxOpenedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first()).maxOpenedAt;
}
if (events.includes('delivered')) {
maxDeliveredAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first()).maxDeliveredAt;
}
if (events.includes('failed')) {
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
}

await createJobIfNotExists(jobName);
}

// three separate queries is much faster than using max/greatest (with coalesce to handle nulls) across columns
let {maxDeliveredAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(delivered_at) as maxDeliveredAt')).first() || {};
let {maxOpenedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(opened_at) as maxOpenedAt')).first() || {};
let {maxFailedAt} = await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first() || {};
// Convert string dates to Date objects for SQLite compatibility
[maxOpenedAt, maxDeliveredAt, maxFailedAt] = [maxOpenedAt, maxDeliveredAt, maxFailedAt].map(date => (
date && !(date instanceof Date) ? new Date(date) : date
));

if (maxDeliveredAt && !(maxDeliveredAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxDeliveredAt = new Date(maxDeliveredAt);
}
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
debug(`getLastEventTimestamp: finished in ${Date.now() - startDate}ms`);

if (maxOpenedAt && !(maxOpenedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxOpenedAt = new Date(maxOpenedAt);
}
return lastSeenEventTimestamp;
},

if (maxFailedAt && !(maxFailedAt instanceof Date)) {
// SQLite returns a string instead of a Date
maxFailedAt = new Date(maxFailedAt);
}
/**
* Retrieves the job data for the specified job name.
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve data for.
* @returns {Promise<Object|null>} The job data, or null if no job data is found.
*/
async getJobData(jobName) {
return await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();
},

/**
* Retrieves the timestamp of the last job run for the specified job name.
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve the last run timestamp for.
* @returns {Promise<Date|null>} The timestamp of the last job run, or null if no job data is found.
*/
async getLastJobRunTimestamp(jobName) {
const jobData = await this.getJobData(jobName);
return jobData ? jobData.finished_at || jobData.started_at : null;
},

const lastSeenEventTimestamp = _.max([maxDeliveredAt, maxOpenedAt, maxFailedAt]);
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
/**
* Sets the timestamp of the last seen event for the specified email analytics events.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'completed'|'started'} field - The field to update.
* @param {Date} date - The timestamp of the last seen event.
* @returns {Promise<void>}
* @description
* Updates the `finished_at` or `started_at` column of the specified job in the `jobs` table with the provided timestamp.
* This is used to keep track of the last time the job was run to avoid expensive queries following reboot.
*/
async setJobTimestamp(jobName, field, date) {
// Convert string dates to Date objects for SQLite compatibility
try {
debug(`Setting ${field} timestamp for job ${jobName} to ${date}`);
const updateField = field === 'completed' ? 'finished_at' : 'started_at';
const status = field === 'completed' ? 'finished' : 'started';
const result = await db.knex('jobs').update({[updateField]: date, updated_at: new Date(), status: status}).where('name', jobName);
if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
[updateField]: date,
updated_at: date,
status: status
});
}
} catch (err) {
debug(`Error setting ${field} timestamp for job ${jobName}: ${err.message}`);
}
},

return lastSeenEventTimestamp;
/**
* Sets the status of the specified email analytics job.
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
* @param {'started'|'finished'|'failed'} status - The new status of the job.
* @returns {Promise<void>}
* @description
* Updates the `status` column of the specified job in the `jobs` table with the provided status.
* This is used to keep track of the current state of the job.
*/
async setJobStatus(jobName, status) {
debug(`Setting status for job ${jobName} to ${status}`);
try {
const result = await db.knex('jobs')
.update({
status: status,
updated_at: new Date()
})
.where('name', jobName);

if (result === 0) {
await db.knex('jobs').insert({
id: new ObjectID().toHexString(),
name: jobName,
status: status,
created_at: new Date(),
updated_at: new Date()
});
}
} catch (err) {
debug(`Error setting status for job ${jobName}: ${err.message}`);
throw err;
}
},

async aggregateEmailStats(emailId) {
const {totalCount} = await db.knex('emails').select(db.knex.raw('email_count as totalCount')).where('id', emailId).first() || {totalCount: 0};
// use IS NULL here because that will typically match far fewer rows than IS NOT NULL making the query faster
const [undeliveredCount] = await db.knex('email_recipients').count('id as count').whereRaw('email_id = ? AND delivered_at IS NULL', [emailId]);
const [openedCount] = await db.knex('email_recipients').count('id as count').whereRaw('email_id = ? AND opened_at IS NOT NULL', [emailId]);
async aggregateEmailStats(emailId, updateOpenedCount) {
const [deliveredCount] = await db.knex('email_recipients').count('id as count').whereRaw('email_id = ? AND delivered_at IS NOT NULL', [emailId]);
const [failedCount] = await db.knex('email_recipients').count('id as count').whereRaw('email_id = ? AND failed_at IS NOT NULL', [emailId]);

await db.knex('emails').update({
delivered_count: totalCount - undeliveredCount.count,
opened_count: openedCount.count,
const updateData = {
delivered_count: deliveredCount.count,
failed_count: failedCount.count
}).where('id', emailId);
};

if (updateOpenedCount) {
const [openedCount] = await db.knex('email_recipients').count('id as count').whereRaw('email_id = ? AND opened_at IS NOT NULL', [emailId]);
updateData.opened_count = openedCount.count;
}

await db.knex('emails').update(updateData).where('id', emailId);
},

async aggregateMemberStats(memberId) {
Expand All @@ -78,4 +202,4 @@ module.exports = {
.update(updateQuery)
.where('id', memberId);
}
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ Object {
Object {
"created_at": StringMatching /\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.000Z/,
"delivered_count": 1,
"email_count": 6,
"email_count": 0,
"error": null,
"error_data": null,
"failed_count": 1,
Expand All @@ -517,7 +517,7 @@ Object {
},
Object {
"created_at": StringMatching /\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.000Z/,
"delivered_count": 3,
"delivered_count": 0,
"email_count": 3,
"error": "Everything went south",
"error_data": null,
Expand Down Expand Up @@ -690,7 +690,7 @@ Object {
Object {
"created_at": StringMatching /\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.000Z/,
"delivered_count": 1,
"email_count": 6,
"email_count": 0,
"error": null,
"error_data": null,
"failed_count": 1,
Expand Down Expand Up @@ -736,7 +736,7 @@ Object {
"emails": Array [
Object {
"created_at": StringMatching /\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.000Z/,
"delivered_count": 3,
"delivered_count": 0,
"email_count": 3,
"error": "Everything went south",
"error_data": null,
Expand Down
Loading
Loading