From ba68fc42655ed702956a41d969d7d05d308b0917 Mon Sep 17 00:00:00 2001 From: Nicolas Pavie Date: Thu, 24 Oct 2024 16:12:11 +0200 Subject: [PATCH] fix: replace fetch in intervals by timeout recursive calls - In the pipeline data loading, Intervals would lead to refetches while previous data where not yet finished to be retrieved. - Applied the same logic for jobs monitoring just in case to reduce a bit the polling frequency --- src/main/data/middlewares/pipeline.ts | 266 +++++++++++++++----------- 1 file changed, 155 insertions(+), 111 deletions(-) diff --git a/src/main/data/middlewares/pipeline.ts b/src/main/data/middlewares/pipeline.ts index 1201a91..3afefe4 100644 --- a/src/main/data/middlewares/pipeline.ts +++ b/src/main/data/middlewares/pipeline.ts @@ -64,6 +64,7 @@ import { dialog, ipcMain } from 'electron' import { IPC } from 'shared/constants/ipc' import { pathToFileURL } from 'url' import { MainWindow, MainWindowInstance } from 'main/windows' +import { AbortError } from 'node-fetch' // prettier-ignore /** @@ -193,110 +194,129 @@ async function downloadJobResults(j: Job, targetFolder: string) { * @param dispatch redux store dispatch function */ function startMonitor(j: Job, ws: Webservice, getState, dispatch) { - let monitor = null const fetchJobData = pipelineAPI.fetchJobData(j) - monitor = setInterval(() => { - if (selectStatus(getState()) == PipelineStatus.STOPPED) { - error('The pipeline has stopped working while executing job', j) - clearInterval(monitor) - } else - fetchJobData(ws) - .then((value) => { - // info('received job data ', value) - // don't log the job messages, it's too verbose - info('received job data ', { - ...value, - messages: ['removed to keep log cleaner'], + const maxAttempt = 3 + const timeoutMonitor = async (attempt) => { + if (attempt >= maxAttempt) { + error( + 'Job monitoring failed after', + maxAttempt, + 'attempts to fetch the job', + j + ) + return + } + const isFinished = await fetchJobData(ws) + .then((value) => { + // info('received job data ', value) + // don't log the job messages, it's too verbose + info('received job data ', { + ...value, + messages: ['removed to keep log cleaner'], + }) + let updatedJob = { + ...j, + jobData: value, + } + const finished = [ + JobStatus.ERROR, + JobStatus.FAIL, + JobStatus.SUCCESS, + ].includes(value.status) + if (finished) { + updatedJob.state = JobState.ENDED + } + const newJobName = `${ + updatedJob.jobData.nicename ?? + updatedJob.jobData.script.nicename + }_${timestamp()}` + const downloadFolder = selectDownloadPath(getState()) + if (updatedJob.jobData?.results?.namedResults) { + // If job has results, download them + downloadJobResults( + updatedJob, + `${downloadFolder}/${newJobName}` + ) + .then((downloadedJob) => { + dispatch(updateJob(downloadedJob)) + // Only delete job if it has been downloaded + if (downloadedJob.jobData.downloadedFolder) { + const deleteJob = + pipelineAPI.deleteJob(downloadedJob) + deleteJob().then((response) => { + info( + downloadedJob.jobData.jobId, + 'delete response', + response.status, + response.statusText + ) + }) + } + }) + .catch((e) => { + error('Error downloading job results', e) + }) + } else if (finished) { + info('job is finished without results') + // job is finished wihout results : keep the log + downloadJobLog( + updatedJob, + `${downloadFolder}/${newJobName}` + ).then((jobWithLog) => { + dispatch(updateJob(jobWithLog)) + const deleteJob = pipelineAPI.deleteJob(jobWithLog) + deleteJob().then((response) => { + info( + jobWithLog.jobData.jobId, + 'delete response', + response.status, + response.statusText + ) + }) }) - let updatedJob = { + } else { + dispatch(updateJob(updatedJob)) + } + return finished + }) + .catch((e) => { + error('Error fetching data for job', j, e) + dispatch( + updateJob({ ...j, - jobData: value, - } - const finished = [ - JobStatus.ERROR, - JobStatus.FAIL, - JobStatus.SUCCESS, - ].includes(value.status) - if (finished) { - clearInterval(monitor) - updatedJob.state = JobState.ENDED - } - const newJobName = `${ - updatedJob.jobData.nicename ?? - updatedJob.jobData.script.nicename - }_${timestamp()}` - const downloadFolder = selectDownloadPath(getState()) - if (updatedJob.jobData?.results?.namedResults) { - // If job has results, download them - downloadJobResults( - updatedJob, - `${downloadFolder}/${newJobName}` - ) - .then((downloadedJob) => { - dispatch(updateJob(downloadedJob)) - // Only delete job if it has been downloaded - if (downloadedJob.jobData.downloadedFolder) { - const deleteJob = - pipelineAPI.deleteJob(downloadedJob) - deleteJob().then((response) => { - info( - downloadedJob.jobData.jobId, - 'delete response', - response.status, - response.statusText - ) - }) - } - }) - .catch((e) => { - error('Error downloading job results', e) - }) - } else if (finished) { - info('job is finished without results') - // job is finished wihout results : keep the log - downloadJobLog( - updatedJob, - `${downloadFolder}/${newJobName}` - ).then((jobWithLog) => { - dispatch(updateJob(jobWithLog)) - const deleteJob = pipelineAPI.deleteJob(jobWithLog) - deleteJob().then((response) => { - info( - jobWithLog.jobData.jobId, - 'delete response', - response.status, - response.statusText - ) - }) - }) - } else { - dispatch(updateJob(updatedJob)) - } - }) - .catch((e) => { - error('Error fetching data for job', j, e) - if (j.jobRequestError) { - clearInterval(monitor) - } - dispatch( - updateJob({ - ...j, - jobData: { - ...j.jobData, - status: JobStatus.ERROR, + jobData: { + ...j.jobData, + status: JobStatus.ERROR, + }, + errors: [ + { + error: + e instanceof ParserException + ? e.parsedText + : String(e), }, - errors: [ - { - error: - e instanceof ParserException - ? e.parsedText - : String(e), - }, - ], - }) - ) - }) - }, 1000) + ], + }) + ) + if (j.jobRequestError) { + //clearInterval(monitor) + return true + } else { + // relaunch a new attempt to get the job data + timeoutMonitor(attempt + 1) + return true // deativate the default monitor + } + }) + if (selectStatus(getState()) == PipelineStatus.STOPPED) { + error('The pipeline has stopped working while executing job', j) + } + if (!isFinished) { + // wait 1 sec before refetching if job is not in finished state + setTimeout(() => timeoutMonitor(0), 1000) + } + } + // Start the monitor + timeoutMonitor(0) } // Store managed pipeline instance @@ -359,32 +379,41 @@ export function pipelineMiddleware({ getState, dispatch }) { break case useWebservice.type: // Action dispatched when the pipeline instance is launched - const newWebservice = action.payload - let fetchScriptsInterval = null + const newWebservice = action.payload as Webservice const fetchAlive = pipelineAPI.fetchAlive() const fetchScripts = pipelineAPI.fetchScripts() - fetchScriptsInterval = setInterval(() => { + let maxAttempt = 2 + const loadPipelineData = async (attempt: number) => { + if (attempt == maxAttempt) { + error( + 'useWebservice', + `${maxAttempt} attempts to fetch webservice data failed, stopping pipeline.`, + 'Please check pipeline logs.' + ) + getPipelineInstance(state)?.stop(action.payload) + dispatch(setStatus(PipelineStatus.STOPPED)) + return + } if (selectStatus(getState()) == PipelineStatus.STOPPED) { error( 'useWebservice', 'Pipeline has been stopped during webservice monitoring.', 'Please check pipeline logs.' ) - clearInterval(fetchScriptsInterval) + return } else if (newWebservice) { fetchAlive(newWebservice) .then((alive) => { - dispatch(setAlive(alive)) - }) - .then(() => fetchScripts(newWebservice)) - .then((scripts: Array