Skip to content

Commit

Permalink
feat(multi-downloader): parallel downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
ido-pluto committed May 26, 2024
1 parent dfbe6bf commit 1bb300c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 18 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ import {downloadFile, downloadSequence} from "ipull";
const downloader = await downloadSequence(
{
cliProgress: true,
// parallelDownloads: 2, download 2 files in parallel, default is 1
},
downloadFile({
url: "https://example.com/file1.large",
Expand Down
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"xmlhttprequest-ssl": "^2.1.1"
},
"dependencies": {
"@supercharge/promise-pool": "^3.2.0",
"@tinyhttp/content-disposition": "^2.2.0",
"async-retry": "^1.3.3",
"chalk": "^5.3.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import {FormattedStatus} from "../../transfer-visualize/format-transfer-status.j
import ProgressStatisticsBuilder, {ProgressStatusWithIndex} from "../../transfer-visualize/progress-statistics-builder.js";
import BaseDownloadEngine, {BaseDownloadEngineEvents} from "./base-download-engine.js";
import DownloadAlreadyStartedError from "./error/download-already-started-error.js";
import {PromisePool} from "@supercharge/promise-pool";

const DEFAULT_PARALLEL_DOWNLOADS = 1;

type DownloadEngineMultiAllowedEngines = BaseDownloadEngine;

Expand All @@ -11,18 +14,24 @@ type DownloadEngineMultiDownloadEvents<Engine = DownloadEngineMultiAllowedEngine
childDownloadClosed: (engine: Engine) => void
};

export type DownloadEngineMultiDownloadOptions = {
parallelDownloads?: number
};

export default class DownloadEngineMultiDownload<Engine extends DownloadEngineMultiAllowedEngines = DownloadEngineMultiAllowedEngines> extends EventEmitter<DownloadEngineMultiDownloadEvents> {
public readonly downloads: Engine[];
public readonly options: DownloadEngineMultiDownloadOptions;
protected _aborted = false;
protected _activeEngine?: Engine;
protected _activeEngines = new Set<Engine>();
protected _progressStatisticsBuilder = new ProgressStatisticsBuilder();
protected _downloadStatues: (ProgressStatusWithIndex | FormattedStatus)[] = [];
protected _closeFiles: (() => Promise<void>)[] = [];


protected constructor(engines: (DownloadEngineMultiAllowedEngines | DownloadEngineMultiDownload)[]) {
protected constructor(engines: (DownloadEngineMultiAllowedEngines | DownloadEngineMultiDownload)[], options: DownloadEngineMultiDownloadOptions) {
super();
this.downloads = DownloadEngineMultiDownload._extractEngines(engines);
this.options = options;
this._init();
}

Expand Down Expand Up @@ -51,19 +60,25 @@ export default class DownloadEngineMultiDownload<Engine extends DownloadEngineMu
}

public async download(): Promise<void> {
if (this._activeEngine) {
if (this._activeEngines.size) {
throw new DownloadAlreadyStartedError();
}

this.emit("start");
for (const engine of this.downloads) {
if (this._aborted) return;
this._activeEngine = engine;
await PromisePool
.withConcurrency(this.options.parallelDownloads ?? DEFAULT_PARALLEL_DOWNLOADS)
.for(this.downloads)
.process(async (engine) => {
if (this._aborted) return;
this._activeEngines.add(engine);

this.emit("childDownloadStarted", engine);
await engine.download();
this.emit("childDownloadClosed", engine);

this._activeEngines.delete(engine);
});

this.emit("childDownloadStarted", engine);
await engine.download();
this.emit("childDownloadClosed", engine);
}
this.emit("finished");
await this._finishEnginesDownload();
await this.close();
Expand All @@ -90,17 +105,20 @@ export default class DownloadEngineMultiDownload<Engine extends DownloadEngineMu
}

public pause(): void {
this._activeEngine?.pause();
this._activeEngines.forEach(engine => engine.pause());
}

public resume(): void {
this._activeEngine?.resume();
this._activeEngines.forEach(engine => engine.resume());
}

public async close() {
if (this._aborted) return;
this._aborted = true;
await this._activeEngine?.close();

const closePromises = Array.from(this._activeEngines)
.map(engine => engine.close());
await Promise.all(closePromises);
this.emit("closed");

}
Expand All @@ -115,7 +133,7 @@ export default class DownloadEngineMultiDownload<Engine extends DownloadEngineMu
.flat();
}

public static async fromEngines<Engine extends DownloadEngineMultiAllowedEngines>(engines: (Engine | Promise<Engine>)[]) {
return new DownloadEngineMultiDownload(await Promise.all(engines));
public static async fromEngines<Engine extends DownloadEngineMultiAllowedEngines>(engines: (Engine | Promise<Engine>)[], options: DownloadEngineMultiDownloadOptions = {}) {
return new DownloadEngineMultiDownload(await Promise.all(engines), options);
}
}
6 changes: 3 additions & 3 deletions src/download/node-download.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import DownloadEngineNodejs, {DownloadEngineOptionsNodejs} from "./download-engine/engine/download-engine-nodejs.js";
import BaseDownloadEngine from "./download-engine/engine/base-download-engine.js";
import DownloadEngineMultiDownload from "./download-engine/engine/download-engine-multi-download.js";
import DownloadEngineMultiDownload, {DownloadEngineMultiDownloadOptions} from "./download-engine/engine/download-engine-multi-download.js";
import CliAnimationWrapper, {CliProgressDownloadEngineOptions} from "./transfer-visualize/transfer-cli/cli-animation-wrapper.js";
import {CLI_LEVEL} from "./transfer-visualize/transfer-cli/transfer-cli.js";

Expand Down Expand Up @@ -29,7 +29,7 @@ export async function downloadFile(options: DownloadFileOptions) {
return await downloader;
}

export type DownloadSequenceOptions = CliProgressDownloadEngineOptions & {
export type DownloadSequenceOptions = CliProgressDownloadEngineOptions & DownloadEngineMultiDownloadOptions & {
fetchStrategy?: "localFile" | "fetch";
};

Expand All @@ -45,7 +45,7 @@ export async function downloadSequence(options: DownloadSequenceOptions | Downlo
}

downloadOptions.cliLevel = CLI_LEVEL.HIGH;
const downloader = DownloadEngineMultiDownload.fromEngines(downloads);
const downloader = DownloadEngineMultiDownload.fromEngines(downloads, downloadOptions);
const wrapper = new CliAnimationWrapper(downloader, downloadOptions);

await wrapper.attachAnimation();
Expand Down

0 comments on commit 1bb300c

Please sign in to comment.