From 6d5da262c49f025ca583af2c6b922f276ee0299e Mon Sep 17 00:00:00 2001 From: alice <90381261+alice-yyds@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:00:12 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20sync=20=E6=B5=81=E4=B8=AD=E9=97=B4?= =?UTF-8?q?=E4=BB=B6=E6=9C=80=E4=BD=B3=E5=AE=9E=E8=B7=B5=20(#1184)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...00\344\275\263\345\256\236\350\267\265.md" | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 "content/zh/docs/kitex/Kitex+StreamX+-+\346\265\201\344\270\255\351\227\264\344\273\266\346\234\200\344\275\263\345\256\236\350\267\265.md" diff --git "a/content/zh/docs/kitex/Kitex+StreamX+-+\346\265\201\344\270\255\351\227\264\344\273\266\346\234\200\344\275\263\345\256\236\350\267\265.md" "b/content/zh/docs/kitex/Kitex+StreamX+-+\346\265\201\344\270\255\351\227\264\344\273\266\346\234\200\344\275\263\345\256\236\350\267\265.md" new file mode 100644 index 0000000000..02716409fe --- /dev/null +++ "b/content/zh/docs/kitex/Kitex+StreamX+-+\346\265\201\344\270\255\351\227\264\344\273\266\346\234\200\344\275\263\345\256\236\350\267\265.md" @@ -0,0 +1,93 @@ +--- +title: "流中间件最佳实践" +date: 2025-01-10 +weight: 1 +keywords: ["流中间件最佳实践"] +description: "" +--- + +## 中间件类型 + +### Stream Recv/Send Middleware + +**触发时机**:流收发消息时调用 + +#### 类型定义 + +```go +type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error) +type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error) + +type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint +type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint +``` + +**参数说明**: + +- stream 直接获取当前的流对象 +- res/req 均代表真实请求和响应。 +- Next 函数调用前后的行为: + +| 中间件类型 | Next 调用前 | Next 调用后 | +| -------------------- | ----------------------------------------------------------------------- | ---------------------------------------------------- | +| StreamRecvMiddleware | - 数据未真正收,刚调用 stream.Recv() 函数

- res 参数为空 | - 数据已收到或遇到错误

- res 参数有真实值 | +| StreamSendMiddleware | - 数据未真正发送,刚调用 stream.Send() 函数

- req 参数为真实请求 | - 数据发送完成或遇到错误

- req 参数为真实请求 | + +#### 使用范例 + +**使用场景**:流收/发消息时,注入相关业务逻辑。 + +```go +svr, err := xxx.NewServer( + //... + streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { + return func(ctx context.Context, stream streamx.Stream, res any) (err error) { + // ctx 依然含有用户透传的 token + token, ok := metainfo.GetPersistentValue(ctx, "user_token") + // 检查 token 是否有账户余额继续维持会话 + if !hasBalance(token) { + return fmt.Errorf("user dont have enough balance: token=%s", token) + } + return next(ctx, stream, res) + } + }), +) +``` + +## 注入中间件 + +#### 注入 Client Middleware + +```go +cli, err := xxx.NewClient( + "a.b.c", + streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { + return func(ctx context.Context, stream streamx.Stream, res any) (err error) { + return next(ctx, stream, res) + } + }), + streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { + return func(ctx context.Context, stream streamx.Stream, req any) (err error) { + return next(ctx, stream, req) + } + }), +) +``` + +#### 注入 Server Middleware + +```go +server, err := xxx.NewServer( + // .... + streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint { + return func(ctx context.Context, stream streamx.Stream, res any) (err error) { + return next(ctx, stream, res) + } + }), + streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint { + return func(ctx context.Context, stream streamx.Stream, req any) (err error) { + return next(ctx, stream, req) + } + }), +) +```