Skip to content

Commit

Permalink
fix: none async events
Browse files Browse the repository at this point in the history
  • Loading branch information
ido-pluto committed Feb 27, 2024
1 parent 2e6fa9c commit 9326aaa
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 156 deletions.
18 changes: 6 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
"async-retry": "^1.3.3",
"chalk": "^5.3.0",
"commander": "^10.0.0",
"emittery": "^1.0.3",
"eventemitter3": "^5.0.1",
"fs-extra": "^11.1.1",
"level": "^8.0.0",
"lifecycle-utils": "^1.3.1",
Expand Down
84 changes: 49 additions & 35 deletions src/download/download-engine/download-engine-file.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
import {PromisePool, Stoppable} from "@supercharge/promise-pool";
import ProgressStatusFile from "./progress-status-file.js";
import {ChunkStatus, DownloadEngineFileOptions, DownloadFile, DownloadProgressInfo} from "./types.js";
import Emittery from "emittery";

export type DownloadEngineFileOptionsWithDefaults = DownloadEngineFileOptions &
{
chunkSize: number;
parallelStreams: number;
};
import {ChunkStatus, DownloadFile, DownloadProgressInfo} from "./types.js";
import BaseDownloadEngineFetchStream from "./streams/download-engine-fetch-stream/base-download-engine-fetch-stream.js";
import BaseDownloadEngineWriteStream from "./streams/download-engine-write-stream/base-download-engine-write-stream.js";
import retry from "async-retry";
import {EventEmitter} from "eventemitter3";

export type DownloadEngineFileOptions = {
chunkSize?: number;
parallelStreams?: number;
retry?: retry.Options
comment?: string;
fetchStream: BaseDownloadEngineFetchStream,
writeStream: BaseDownloadEngineWriteStream,
onFinishAsync?: () => Promise<void>
onCloseAsync?: () => Promise<void>
};

export interface DownloadEngineFileEvents {
start: undefined;
paused: undefined;
resumed: undefined;
progress: ProgressStatusFile;
save: DownloadProgressInfo;
finished: undefined;
closed: undefined;
export type DownloadEngineFileOptionsWithDefaults = DownloadEngineFileOptions & {
chunkSize: number;
parallelStreams: number;
};

[key: string]: any;
}
export type DownloadEngineFileEvents = {
start: () => void
paused: () => void
resumed: () => void
progress: (progress: ProgressStatusFile) => void
save: (progress: DownloadProgressInfo) => void
finished: () => void
closed: () => void
[key: string]: any
};

const DEFAULT_OPTIONS: Omit<DownloadEngineFileOptionsWithDefaults, "fetchStream" | "writeStream"> = {
chunkSize: 1024 * 1024 * 5,
parallelStreams: 4
};

export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvents> {
export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileEvents> {
public readonly file: DownloadFile;
public options: DownloadEngineFileOptionsWithDefaults;

protected _progress: DownloadProgressInfo = {
part: 0,
Expand All @@ -37,7 +50,6 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent

protected _closed = false;
protected _progressStatus: ProgressStatusFile;
protected _options: DownloadEngineFileOptionsWithDefaults;
protected _activePool?: Stoppable;
protected _activeStreamBytes: { [key: number]: number } = {};

Expand Down Expand Up @@ -70,13 +82,13 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent
super();
this.file = file;
this._progressStatus = new ProgressStatusFile(file.totalSize, file.parts.length, file.localFileName, options.comment);
this._options = {...DEFAULT_OPTIONS, ...options};
this.options = {...DEFAULT_OPTIONS, ...options};
this._initProgress();
}

protected _emptyChunksForPart(part: number) {
const partInfo = this.file.parts[part];
const chunksCount = Math.ceil(partInfo.size / this._options.chunkSize);
const chunksCount = Math.ceil(partInfo.size / this.options.chunkSize);
return new Array(chunksCount).fill(ChunkStatus.NOT_STARTED);
}

Expand All @@ -87,17 +99,17 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent
this._progress = {
part: 0,
chunks: this._emptyChunksForPart(0),
chunkSize: this._options.chunkSize
chunkSize: this.options.chunkSize
};
}
}

async download() {
await this.emit("start");
this.emit("start");
for (let i = this._progress.part; i < this.file.parts.length; i++) {
if (i > this._progress.part) {
this._progress.part = i;
this._progress.chunkSize = this._options.chunkSize;
this._progress.chunkSize = this.options.chunkSize;
this._progress.chunks = this._emptyChunksForPart(i);
}

Expand All @@ -109,12 +121,13 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent
this._activeStreamBytes = {};
await this._downloadPart();
}
await this.emit("finished");
this.emit("finished");
await this.options.onFinishAsync?.();
}

protected async _downloadPart() {
try {
await PromisePool.withConcurrency(this._options.parallelStreams)
await PromisePool.withConcurrency(this.options.parallelStreams)
.for(this._progress.chunks)
.process(async (status, index, pool) => {
await this._pausedPromise;
Expand All @@ -127,24 +140,24 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent

const start = index * this._progress.chunkSize;
const end = Math.min(start + this._progress.chunkSize, this._activePart.size);
const buffer = await this._options.fetchStream.fetchBytes(this._activePart.downloadURL!, start, end, (length: number) => {
const buffer = await this.options.fetchStream.fetchBytes(this._activePart.downloadURL!, start, end, (length: number) => {

Check warning on line 143 in src/download/download-engine/download-engine-file.ts

View workflow job for this annotation

GitHub Actions / test

This line has a length of 141. Maximum allowed is 140
this._activeStreamBytes[index] = length;
this._sendProgressDownloadPart();
});

await this._options.writeStream.write(start, buffer);
await this.options.writeStream.write(start, buffer);
this._progress.chunks[index] = ChunkStatus.COMPLETE;
delete this._activeStreamBytes[index];

await this._saveProgress();
this._saveProgress();
});
} finally {
this._activePool = undefined;
}
}

protected async _saveProgress() {
await this.emit("save", this._progress);
protected _saveProgress() {
this.emit("save", this._progress);
this._sendProgressDownloadPart();
}

Expand Down Expand Up @@ -183,9 +196,10 @@ export default class DownloadEngineFile extends Emittery<DownloadEngineFileEvent
this._activePool.stop();
}
this.pause();
await this._options.writeStream.close();
await this._options.fetchStream.close();
await this.emit("closed");
await this.options.writeStream.close();
await this.options.fetchStream.close();
this.emit("closed");
await this.options.onCloseAsync?.();
}

[Symbol.dispose]() {
Expand Down
56 changes: 35 additions & 21 deletions src/download/download-engine/engine/base-download-engine.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
import {DownloadEngineFileOptions, DownloadFile} from "../types.js";
import DownloadEngineFile, {DownloadEngineFileEvents} from "../download-engine-file.js";
import {DownloadFile, DownloadProgressInfo} from "../types.js";
import DownloadEngineFile, {DownloadEngineFileOptions} from "../download-engine-file.js";
import BaseDownloadEngineFetchStream, {
BaseDownloadEngineFetchStreamOptions
} from "../streams/download-engine-fetch-stream/base-download-engine-fetch-stream.js";
import UrlInputError from "./error/url-input-error.js";
import Emittery from "emittery";
import {EventEmitter} from "eventemitter3";
import ProgressStatisticsBuilder, {TransferProgressWithStatus} from "../../transfer-visualize/progress-statistics-builder.js";
import DownloadAlreadyStartedError from "./error/download-already-started-error.js";
import retry from "async-retry";

export type InputURLOptions = { partsURL: string[] } | { url: string };
export type BaseDownloadEngineOptions<FetchStrategy = "xhr" | "fetch" | "localFile"> =
DownloadEngineFileOptions
& BaseDownloadEngineFetchStreamOptions
&
{
comment?: string;
fetchStrategy?: FetchStrategy;
};

export interface BaseDownloadEngineEvents extends Omit<DownloadEngineFileEvents, "progress"> {
progress: TransferProgressWithStatus;
}

export default class BaseDownloadEngine extends Emittery<BaseDownloadEngineEvents> {
public readonly options: BaseDownloadEngineOptions;
export type BaseDownloadEngineOptions = InputURLOptions & BaseDownloadEngineFetchStreamOptions & {
chunkSize?: number;
parallelStreams?: number;
retry?: retry.Options
comment?: string;
};

export type BaseDownloadEngineEvents = {
start: () => void
paused: () => void
resumed: () => void
progress: (progress: TransferProgressWithStatus) => void
save: (progress: DownloadProgressInfo) => void
finished: () => void
closed: () => void
[key: string]: any
};

export default class BaseDownloadEngine extends EventEmitter<BaseDownloadEngineEvents> {
public readonly options: DownloadEngineFileOptions;
protected readonly _engine: DownloadEngineFile;
protected _progressStatisticsBuilder = new ProgressStatisticsBuilder();
protected _downloadStarted = false;
Expand All @@ -36,7 +43,7 @@ export default class BaseDownloadEngine extends Emittery<BaseDownloadEngineEvent
return this._engine.file.totalSize;
}

protected constructor(engine: DownloadEngineFile, options: BaseDownloadEngineOptions) {
protected constructor(engine: DownloadEngineFile, options: DownloadEngineFileOptions) {
super();
this.options = options;
this._engine = engine;
Expand All @@ -48,6 +55,9 @@ export default class BaseDownloadEngine extends Emittery<BaseDownloadEngineEvent
this._engine.on("start", () => {
return this.emit("start");
});
this._engine.on("save", (info) => {
return this.emit("save", info);
});
this._engine.on("finished", () => {
return this.emit("finished");
});
Expand All @@ -69,9 +79,13 @@ export default class BaseDownloadEngine extends Emittery<BaseDownloadEngineEvent
if (this._downloadStarted) {
throw new DownloadAlreadyStartedError();
}
this._downloadStarted = true;
await this._engine.download();
await this.close();

try {
this._downloadStarted = true;
await this._engine.download();
} finally {
await this.close();
}
}

pause() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import DownloadEngineFetchStreamXhr from "../streams/download-engine-fetch-strea
import DownloadEngineWriteStreamBrowser, {
DownloadEngineWriteStreamBrowserWriter
} from "../streams/download-engine-write-stream/download-engine-write-stream-browser.js";
import BaseDownloadEngine, {BaseDownloadEngineOptions, InputURLOptions} from "./base-download-engine.js";
import BaseDownloadEngine, {BaseDownloadEngineOptions} from "./base-download-engine.js";
import BaseDownloadEngineWriteStream from "../streams/download-engine-write-stream/base-download-engine-write-stream.js";
import BaseDownloadEngineFetchStream from "../streams/download-engine-fetch-stream/base-download-engine-fetch-stream.js";

export type DownloadEngineOptionsBrowser = Omit<BaseDownloadEngineOptions<"fetch" | "xhr">, "fetchStream" | "writeStream"> &
InputURLOptions & {
export type DownloadEngineOptionsBrowser = BaseDownloadEngineOptions & {
onWrite?: DownloadEngineWriteStreamBrowserWriter,
progress?: DownloadProgressInfo
progress?: DownloadProgressInfo,
fetchStrategy?: "xhr" | "fetch",
};

export type DownloadEngineOptionsCustomFetchBrowser = DownloadEngineOptionsBrowser & {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import {BaseDownloadEngineEvents} from "./base-download-engine.js";
import Emittery from "emittery";
import {EventEmitter} from "eventemitter3";
import ProgressStatisticsBuilder, {AnyEngine} from "../../transfer-visualize/progress-statistics-builder.js";
import DownloadAlreadyStartedError from "./error/download-already-started-error.js";

interface DownloadEngineMultiDownloadEvents<Engine = AnyEngine> extends BaseDownloadEngineEvents {
childDownloadStarted: Engine;
childDownloadClosed: Engine;
}
type DownloadEngineMultiDownloadEvents<Engine = AnyEngine> = BaseDownloadEngineEvents & {
childDownloadStarted: (engine: Engine) => void
childDownloadClosed: (engine: Engine) => void
};

export default class DownloadEngineMultiDownload<Engine extends AnyEngine = AnyEngine> extends Emittery<DownloadEngineMultiDownloadEvents> {
export default class DownloadEngineMultiDownload<Engine extends AnyEngine = AnyEngine> extends EventEmitter<DownloadEngineMultiDownloadEvents> {

Check warning on line 11 in src/download/download-engine/engine/download-engine-multi-download.ts

View workflow job for this annotation

GitHub Actions / test

This line has a length of 144. Maximum allowed is 140
protected _aborted = false;
protected _activeEngine?: Engine;
protected _progressStatisticsBuilder = new ProgressStatisticsBuilder();
Expand All @@ -34,16 +34,16 @@ export default class DownloadEngineMultiDownload<Engine extends AnyEngine = AnyE
throw new DownloadAlreadyStartedError();
}

await this.emit("start");
this.emit("start");
for (const engine of this._engines) {
if (this._aborted) return;
this._activeEngine = engine;

await this.emit("childDownloadStarted", engine);
this.emit("childDownloadStarted", engine);
await engine.download();
await this.emit("childDownloadClosed", engine);
this.emit("childDownloadClosed", engine);
}
await this.emit("finished");
this.emit("finished");
await this.close();
}

Expand All @@ -59,6 +59,6 @@ export default class DownloadEngineMultiDownload<Engine extends AnyEngine = AnyE
if (this._aborted) return;
this._aborted = true;
await this._activeEngine?.close();
await this.emit("closed");
this.emit("closed");
}
}
Loading

0 comments on commit 9326aaa

Please sign in to comment.