Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
LucianBuzzo committed Oct 31, 2024
1 parent ebca4d0 commit c0d43f3
Showing 1 changed file with 131 additions and 1 deletion.
132 changes: 131 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as crypto from "crypto";
import { Prisma, PrismaClient } from "@prisma/client";
import { Prisma, PrismaClient, PrismaPromise } from "@prisma/client";
import logger from "debug";
import difference from "lodash/difference";
import flatMap from "lodash/flatMap";
Expand All @@ -11,6 +11,100 @@ const VALID_OPERATIONS = ["SELECT", "UPDATE", "INSERT", "DELETE"] as const;

const debug = logger("yates");

const BatchTxIdCounter = {
id: 0,
nextId() {
return ++this.id;
},
};
export interface ErrorWithBatchIndex {
batchRequestIdx?: number;
}

export function hasBatchIndex(
value: object,
): value is Required<ErrorWithBatchIndex> {
// @ts-ignore
return typeof value["batchRequestIdx"] === "number";
}
export function waitForBatch<T extends PromiseLike<unknown>[]>(
promises: T,
): Promise<{ [K in keyof T]: Awaited<T[K]> }> {
if (promises.length === 0) {
return Promise.resolve([] as { [K in keyof T]: Awaited<T[K]> });
}
return new Promise((resolve, reject) => {
const successfulResults = new Array(promises.length) as {
[K in keyof T]: Awaited<T[K]>;
};
let bestError: unknown = null;
let done = false;
let settledPromisesCount = 0;

const settleOnePromise = () => {
if (done) {
return;
}
settledPromisesCount++;
if (settledPromisesCount === promises.length) {
done = true;
if (bestError) {
reject(bestError);
} else {
resolve(successfulResults);
}
}
};

const immediatelyReject = (error: unknown) => {
if (!done) {
done = true;
reject(error);
}
};

for (let i = 0; i < promises.length; i++) {
promises[i].then(
(result) => {
successfulResults[i] = result;
settleOnePromise();
},
(error) => {
if (!hasBatchIndex(error)) {
immediatelyReject(error);
return;
}

if (error.batchRequestIdx === i) {
immediatelyReject(error);
} else {
if (!bestError) {
bestError = error;
}
settleOnePromise();
}
},
);
}
});
}

export function getLockCountPromise<V = void>(
knock: number,
cb: () => V | void = () => {},
) {
let resolve: (v: V | void) => void;
const lock = new Promise<V | void>((res) => (resolve = res));

return {
then(onFulfilled) {
if (--knock === 0) resolve(cb());

return onFulfilled?.(lock as unknown as V | void);
},
} as PromiseLike<V | void>;
}

type Operation = (typeof VALID_OPERATIONS)[number];
export type Models = Prisma.ModelName;

Expand Down Expand Up @@ -189,6 +283,42 @@ export const createClient = (
// Set default options
const { txMaxWait = 30000, txTimeout = 30000 } = options;

// biome-ignore lint/suspicious/noExplicitAny: TODO fix this
(prisma as any)._transactionWithArray = async function ({
promises,
options,
}: {
promises: Array<PrismaPromise<any>>;
options?: any;
}): Promise<any> {
const id = BatchTxIdCounter.nextId();
const lock = getLockCountPromise(promises.length);

const requests = promises.map((request, index) => {
if (request?.[Symbol.toStringTag] !== "PrismaPromise") {
throw new Error(
`All elements of the array need to be Prisma Client promises. Hint: Please make sure you are not awaiting the Prisma client calls you intended to pass in the $transaction function.`,
);
}

const isolationLevel =
options?.isolationLevel ??
this._engineConfig.transactionOptions.isolationLevel;
const transaction = {
kind: "batch",
id,
index,
isolationLevel,
lock,
yates_id: options?.new_tx_id,
} as const;
// @ts-ignore
return request.requestTransaction?.(transaction) ?? request;
});

return waitForBatch(requests);
};

// biome-ignore lint/suspicious/noExplicitAny: TODO fix this
(prisma as any)._transactionWithCallback = async function ({
callback,
Expand Down

0 comments on commit c0d43f3

Please sign in to comment.