From c30f2375b57fe180562568250136618830a36d20 Mon Sep 17 00:00:00 2001 From: alice <90381261+alice-yyds@users.noreply.github.com> Date: Fri, 10 Jan 2025 14:27:35 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20upload=20file=20=E6=B5=81=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86=E6=9C=80=E4=BD=B3=E5=AE=9E=E8=B7=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zh/docs/kitex/Stream+Error+Handling.md | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 content/zh/docs/kitex/Stream+Error+Handling.md diff --git a/content/zh/docs/kitex/Stream+Error+Handling.md b/content/zh/docs/kitex/Stream+Error+Handling.md new file mode 100644 index 0000000000..e3b37e03ce --- /dev/null +++ b/content/zh/docs/kitex/Stream+Error+Handling.md @@ -0,0 +1,83 @@ +--- +title: "流错误处理最佳实践" +date: 2025-01-10 +weight: 1 +keywords: ["流错误处理最佳实践"] +description: "" +--- + +## 前言 + +与 PingPong RPC 不同,流的错误可以发生在一个流处理的任何时候,例如 server 可以在发送多条消息后,再返回一个错误。但是一旦一个流发送完错误后,就不能再发送任何消息。 + +## 错误类型 + +### 业务异常 + +**使用范例**:例如 ChatGPT 场景,我们需要不停检查用户账户余额是否能继续调用大模型生成返回。 + +Server 实现: + +```go +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { + // 检查用户账户余额 + for isHasBalance (req.UserId) { + stream.Send(ctx, res) + } + // 返回用户余额不足错误 + bizErr := kerrors.NewBizStatusErrorWithExtra( + 10001, "insufficient user balance", map[string]string{"testKey": "testVal"}, + ) + return bizErr +} +``` + +Client 实现: + +```go +svrStream, err = streamClient.ServerStreamWithErr(ctx, req) + +var err error +for { + res, err = stream.Recv(ctx) + if err != nil { + break + } +} +bizErr, ok := kerrors.FromBizStatusError(err) +if ok { + println(bizErr.BizStatusCode(), bizErr.BizMessage(), bizErr.BizExtra()) +} +``` + +### 其他错误 + +如果 Server 返回的 Error 为非业务异常,框架会统一封装为 `(*thrift.ApplicationException)` 。此时只能拿到错误的 Message 。 + +Server 实现: + +```go +func (si *streamingService) ServerStreamWithErr(ctx context.Context, req *Request, stream streamx.ServerStreamingServer[Response]) error { + // ... + return errors.New("test error") +} +``` + +Client 实现: + +```go +svrStream, err = streamClient.ServerStreamWithErr(ctx, req) +test.Assert(t, err == nil, err) + +var err error +for { + res, err = stream.Recv(ctx) + if err != nil { + break + } +} +ex, ok := err.(*thrift.ApplicationException) +if ok { + println(ex.TypeID(), ex.Msg()) +} +```