From 1bb300c9b50a4cf097c34643bd79799918004b60 Mon Sep 17 00:00:00 2001 From: ido Date: Sun, 26 May 2024 19:05:54 +0300 Subject: [PATCH] feat(multi-downloader): parallel downloads --- README.md | 1 + package-lock.json | 9 ++++ package.json | 1 + .../engine/download-engine-multi-download.ts | 48 +++++++++++++------ src/download/node-download.ts | 6 +-- 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index fddf700..bd6b473 100644 --- a/README.md +++ b/README.md @@ -279,6 +279,7 @@ import {downloadFile, downloadSequence} from "ipull"; const downloader = await downloadSequence( { cliProgress: true, + // parallelDownloads: 2, download 2 files in parallel, default is 1 }, downloadFile({ url: "https://example.com/file1.large", diff --git a/package-lock.json b/package-lock.json index b0a33d6..9dc5d7b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "1.0.0", "license": "MIT", "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@tinyhttp/content-disposition": "^2.2.0", "async-retry": "^1.3.3", "chalk": "^5.3.0", @@ -2081,6 +2082,14 @@ "integrity": "sha512-+Fj43pSMwJs4KRrH/938Uf+uAELIgVBmQzg/q1YG10djyfA3TnrU8N8XzqCh/okZdszqBQTZf96idMfE5lnwTA==", "dev": true }, + "node_modules/@supercharge/promise-pool": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/@supercharge/promise-pool/-/promise-pool-3.2.0.tgz", + "integrity": "sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg==", + "engines": { + "node": ">=8" + } + }, "node_modules/@tinyhttp/content-disposition": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/@tinyhttp/content-disposition/-/content-disposition-2.2.0.tgz", diff --git a/package.json b/package.json index 2f9b3c6..f3eccb4 100644 --- a/package.json +++ b/package.json @@ -124,6 +124,7 @@ "xmlhttprequest-ssl": "^2.1.1" }, "dependencies": { + "@supercharge/promise-pool": "^3.2.0", "@tinyhttp/content-disposition": "^2.2.0", "async-retry": "^1.3.3", "chalk": "^5.3.0", diff --git a/src/download/download-engine/engine/download-engine-multi-download.ts b/src/download/download-engine/engine/download-engine-multi-download.ts index 1619c0d..637b3ce 100644 --- a/src/download/download-engine/engine/download-engine-multi-download.ts +++ b/src/download/download-engine/engine/download-engine-multi-download.ts @@ -3,6 +3,9 @@ import {FormattedStatus} from "../../transfer-visualize/format-transfer-status.j import ProgressStatisticsBuilder, {ProgressStatusWithIndex} from "../../transfer-visualize/progress-statistics-builder.js"; import BaseDownloadEngine, {BaseDownloadEngineEvents} from "./base-download-engine.js"; import DownloadAlreadyStartedError from "./error/download-already-started-error.js"; +import {PromisePool} from "@supercharge/promise-pool"; + +const DEFAULT_PARALLEL_DOWNLOADS = 1; type DownloadEngineMultiAllowedEngines = BaseDownloadEngine; @@ -11,18 +14,24 @@ type DownloadEngineMultiDownloadEvents void }; +export type DownloadEngineMultiDownloadOptions = { + parallelDownloads?: number +}; + export default class DownloadEngineMultiDownload extends EventEmitter { public readonly downloads: Engine[]; + public readonly options: DownloadEngineMultiDownloadOptions; protected _aborted = false; - protected _activeEngine?: Engine; + protected _activeEngines = new Set(); protected _progressStatisticsBuilder = new ProgressStatisticsBuilder(); protected _downloadStatues: (ProgressStatusWithIndex | FormattedStatus)[] = []; protected _closeFiles: (() => Promise)[] = []; - protected constructor(engines: (DownloadEngineMultiAllowedEngines | DownloadEngineMultiDownload)[]) { + protected constructor(engines: (DownloadEngineMultiAllowedEngines | DownloadEngineMultiDownload)[], options: DownloadEngineMultiDownloadOptions) { super(); this.downloads = DownloadEngineMultiDownload._extractEngines(engines); + this.options = options; this._init(); } @@ -51,19 +60,25 @@ export default class DownloadEngineMultiDownload { - if (this._activeEngine) { + if (this._activeEngines.size) { throw new DownloadAlreadyStartedError(); } this.emit("start"); - for (const engine of this.downloads) { - if (this._aborted) return; - this._activeEngine = engine; + await PromisePool + .withConcurrency(this.options.parallelDownloads ?? DEFAULT_PARALLEL_DOWNLOADS) + .for(this.downloads) + .process(async (engine) => { + if (this._aborted) return; + this._activeEngines.add(engine); + + this.emit("childDownloadStarted", engine); + await engine.download(); + this.emit("childDownloadClosed", engine); + + this._activeEngines.delete(engine); + }); - this.emit("childDownloadStarted", engine); - await engine.download(); - this.emit("childDownloadClosed", engine); - } this.emit("finished"); await this._finishEnginesDownload(); await this.close(); @@ -90,17 +105,20 @@ export default class DownloadEngineMultiDownload engine.pause()); } public resume(): void { - this._activeEngine?.resume(); + this._activeEngines.forEach(engine => engine.resume()); } public async close() { if (this._aborted) return; this._aborted = true; - await this._activeEngine?.close(); + + const closePromises = Array.from(this._activeEngines) + .map(engine => engine.close()); + await Promise.all(closePromises); this.emit("closed"); } @@ -115,7 +133,7 @@ export default class DownloadEngineMultiDownload(engines: (Engine | Promise)[]) { - return new DownloadEngineMultiDownload(await Promise.all(engines)); + public static async fromEngines(engines: (Engine | Promise)[], options: DownloadEngineMultiDownloadOptions = {}) { + return new DownloadEngineMultiDownload(await Promise.all(engines), options); } } diff --git a/src/download/node-download.ts b/src/download/node-download.ts index 662ca90..f407b4d 100644 --- a/src/download/node-download.ts +++ b/src/download/node-download.ts @@ -1,6 +1,6 @@ import DownloadEngineNodejs, {DownloadEngineOptionsNodejs} from "./download-engine/engine/download-engine-nodejs.js"; import BaseDownloadEngine from "./download-engine/engine/base-download-engine.js"; -import DownloadEngineMultiDownload from "./download-engine/engine/download-engine-multi-download.js"; +import DownloadEngineMultiDownload, {DownloadEngineMultiDownloadOptions} from "./download-engine/engine/download-engine-multi-download.js"; import CliAnimationWrapper, {CliProgressDownloadEngineOptions} from "./transfer-visualize/transfer-cli/cli-animation-wrapper.js"; import {CLI_LEVEL} from "./transfer-visualize/transfer-cli/transfer-cli.js"; @@ -29,7 +29,7 @@ export async function downloadFile(options: DownloadFileOptions) { return await downloader; } -export type DownloadSequenceOptions = CliProgressDownloadEngineOptions & { +export type DownloadSequenceOptions = CliProgressDownloadEngineOptions & DownloadEngineMultiDownloadOptions & { fetchStrategy?: "localFile" | "fetch"; }; @@ -45,7 +45,7 @@ export async function downloadSequence(options: DownloadSequenceOptions | Downlo } downloadOptions.cliLevel = CLI_LEVEL.HIGH; - const downloader = DownloadEngineMultiDownload.fromEngines(downloads); + const downloader = DownloadEngineMultiDownload.fromEngines(downloads, downloadOptions); const wrapper = new CliAnimationWrapper(downloader, downloadOptions); await wrapper.attachAnimation();