Skip to content

Commit

Permalink
feat(stream): add .withResponse() (#654)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertCraigie authored Jan 13, 2025
1 parent 17ffaeb commit b54477f
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions src/lib/MessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
type MessageCreateParamsBase,
type TextBlock,
} from '@anthropic-ai/sdk/resources/messages';
import { type ReadableStream } from '@anthropic-ai/sdk/_shims/index';
import { type ReadableStream, type Response } from '@anthropic-ai/sdk/_shims/index';
import { Stream } from '@anthropic-ai/sdk/streaming';
import { partialParse } from '../_vendor/partial-json-parser/parser';

Expand Down Expand Up @@ -41,8 +41,8 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {

controller: AbortController = new AbortController();

#connectedPromise: Promise<void>;
#resolveConnectedPromise: () => void = () => {};
#connectedPromise: Promise<Response | null>;
#resolveConnectedPromise: (response: Response | null) => void = () => {};
#rejectConnectedPromise: (error: AnthropicError) => void = () => {};

#endPromise: Promise<void>;
Expand All @@ -57,7 +57,7 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
#catchingPromiseCreated = false;

constructor() {
this.#connectedPromise = new Promise<void>((resolve, reject) => {
this.#connectedPromise = new Promise<Response | null>((resolve, reject) => {
this.#resolveConnectedPromise = resolve;
this.#rejectConnectedPromise = reject;
});
Expand All @@ -75,6 +75,33 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
this.#endPromise.catch(() => {});
}

/**
* Returns the `MessageStream` data, the raw `Response` instance and the ID of the request,
* returned vie the `request-id` header which is useful for debugging requests and resporting
* issues to Anthropic.
*
* This is the same as the `APIPromise.withResponse()` method.
*
* This method will raise an error if you created the stream using `MessageStream.fromReadableStream`
* as no `Response` is available.
*/
async withResponse(): Promise<{
data: MessageStream;
response: Response;
request_id: string | null | undefined;
}> {
const response = await this.#connectedPromise;
if (!response) {
throw new Error('Could not resolve a `Response` object');
}

return {
data: this,
response,
request_id: response.headers.get('request-id'),
};
}

/**
* Intended for use on the frontend, consuming a stream produced with
* `.toReadableStream()` on the backend.
Expand Down Expand Up @@ -136,11 +163,10 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
signal.addEventListener('abort', () => this.controller.abort());
}
this.#beginRequest();
const stream = await messages.create(
{ ...params, stream: true },
{ ...options, signal: this.controller.signal },
);
this._connected();
const { response, data: stream } = await messages
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
.withResponse();
this._connected(response);
for await (const event of stream) {
this.#addStreamEvent(event);
}
Expand All @@ -150,9 +176,9 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
this.#endRequest();
}

protected _connected() {
protected _connected(response: Response | null) {
if (this.ended) return;
this.#resolveConnectedPromise();
this.#resolveConnectedPromise(response);
this._emit('connect');
}

Expand Down Expand Up @@ -424,7 +450,7 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
signal.addEventListener('abort', () => this.controller.abort());
}
this.#beginRequest();
this._connected();
this._connected(null);
const stream = Stream.fromReadableStream<MessageStreamEvent>(readableStream, this.controller);
for await (const event of stream) {
this.#addStreamEvent(event);
Expand Down

0 comments on commit b54477f

Please sign in to comment.