forked from connectrpc/connect-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
option.go
478 lines (421 loc) · 15.6 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connect
import (
"compress/gzip"
"context"
"io"
"net/http"
)
// A ClientOption configures a [Client].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a valid ClientOption.
type ClientOption interface {
applyToClient(*clientConfig)
}
// WithAcceptCompression makes a compression algorithm available to a client.
// Clients ask servers to compress responses using any of the registered
// algorithms. The first registered algorithm is treated as the least
// preferred, and the last registered algorithm is the most preferred.
//
// It's safe to use this option liberally: servers will ignore any
// compression algorithms they don't support. To compress requests, pair this
// option with [WithSendCompression]. To remove support for a
// previously-registered compression algorithm, use WithAcceptCompression with
// nil decompressor and compressor constructors.
//
// Clients accept gzipped responses by default, using a compressor backed by the
// standard library's [gzip] package with the default compression level. Use
// [WithSendGzip] to compress requests with gzip.
func WithAcceptCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) ClientOption {
if newDecompressor == nil && newCompressor == nil {
return &compressionOption{Name: name}
}
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithClientOptions composes multiple ClientOptions into one.
func WithClientOptions(options ...ClientOption) ClientOption {
return &clientOptionsOption{options}
}
// WithGRPC configures clients to use the HTTP/2 gRPC protocol.
func WithGRPC() ClientOption {
return &grpcOption{web: false}
}
// WithGRPCWeb configures clients to use the gRPC-Web protocol.
func WithGRPCWeb() ClientOption {
return &grpcOption{web: true}
}
// WithProtoJSON configures a client to send JSON-encoded data instead of
// binary Protobuf. It uses the standard Protobuf JSON mapping as implemented
// by [google.golang.org/protobuf/encoding/protojson]: fields are named using
// lowerCamelCase, zero values are omitted, missing required fields are errors,
// enums are emitted as strings, etc.
func WithProtoJSON() ClientOption {
return WithCodec(&protoJSONCodec{codecNameJSON})
}
// WithSendCompression configures the client to use the specified algorithm to
// compress request messages. If the algorithm has not been registered using
// [WithAcceptCompression], the client will return errors at runtime.
//
// Because some servers don't support compression, clients default to sending
// uncompressed requests.
func WithSendCompression(name string) ClientOption {
return &sendCompressionOption{Name: name}
}
// WithSendGzip configures the client to gzip requests. Since clients have
// access to a gzip compressor by default, WithSendGzip doesn't require
// [WithSendCompresion].
//
// Some servers don't support gzip, so clients default to sending uncompressed
// requests.
func WithSendGzip() ClientOption {
return WithSendCompression(compressionGzip)
}
// A HandlerOption configures a [Handler].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a HandlerOption.
type HandlerOption interface {
applyToHandler(*handlerConfig)
}
// WithCompression configures handlers to support a compression algorithm.
// Clients may send messages compressed with that algorithm and/or request
// compressed responses. The [Compressor] and [Decompressor] produced by the
// supplied constructors must use the same algorithm. Internally, Connect pools
// compressors and decompressors.
//
// By default, handlers support gzip using the standard library's
// [compress/gzip] package at the default compression level.
//
// Calling WithCompression with an empty name or nil constructors is a no-op.
func WithCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) HandlerOption {
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithHandlerOptions composes multiple HandlerOptions into one.
func WithHandlerOptions(options ...HandlerOption) HandlerOption {
return &handlerOptionsOption{options}
}
// WithRecover adds an interceptor that recovers from panics. The supplied
// function receives the context, [Spec], request headers, and the recovered
// value (which may be nil). It must return an error to send back to the
// client. It may also log the panic, emit metrics, or execute other
// error-handling logic. Handler functions must be safe to call concurrently.
//
// To preserve compatibility with [net/http]'s semantics, this interceptor
// doesn't handle panics with [http.ErrAbortHandler].
//
// By default, handlers don't recover from panics. Because the standard
// library's [http.Server] recovers from panics by default, this option isn't
// usually necessary to prevent crashes. Instead, it helps servers collect
// RPC-specific data during panics and send a more detailed error to
// clients.
func WithRecover(handle func(context.Context, Spec, http.Header, any) error) HandlerOption {
return WithInterceptors(&recoverHandlerInterceptor{handle: handle})
}
// WithRequireConnectProtocolHeader configures the Handler to require requests
// using the Connect RPC protocol to include the Connect-Protocol-Version
// header. This ensures that HTTP proxies and net/http middleware can easily
// identify valid Connect requests, even if they use a common Content-Type like
// application/json. However, it makes ad-hoc requests with tools like cURL
// more laborious.
//
// This option has no effect if the client uses the gRPC or gRPC-Web protocols.
func WithRequireConnectProtocolHeader() HandlerOption {
return &requireConnectProtocolHeaderOption{}
}
// Option implements both [ClientOption] and [HandlerOption], so it can be
// applied both client-side and server-side.
type Option interface {
ClientOption
HandlerOption
}
// WithCodec registers a serialization method with a client or handler.
// Handlers may have multiple codecs registered, and use whichever the client
// chooses. Clients may only have a single codec.
//
// By default, handlers and clients support binary Protocol Buffer data using
// [google.golang.org/protobuf/proto]. Handlers also support JSON by default,
// using the standard Protobuf JSON mapping. Users with more specialized needs
// may override the default codecs by registering a new codec under the "proto"
// or "json" names. When supplying a custom "proto" codec, keep in mind that
// some unexported, protocol-specific messages are serialized using Protobuf -
// take care to fall back to the standard Protobuf implementation if
// necessary.
//
// Registering a codec with an empty name is a no-op.
func WithCodec(codec Codec) Option {
return &codecOption{Codec: codec}
}
// WithCompressMinBytes sets a minimum size threshold for compression:
// regardless of compressor configuration, messages smaller than the configured
// minimum are sent uncompressed.
//
// The default minimum is zero. Setting a minimum compression threshold may
// improve overall performance, because the CPU cost of compressing very small
// messages usually isn't worth the small reduction in network I/O.
func WithCompressMinBytes(min int) Option {
return &compressMinBytesOption{Min: min}
}
// WithReadMaxBytes limits the performance impact of pathologically large
// messages sent by the other party. For handlers, WithReadMaxBytes limits the size
// of a message that the client can send. For clients, WithReadMaxBytes limits the
// size of a message that the server can respond with. Limits apply to each Protobuf
// message, not to the stream as a whole.
//
// Setting WithReadMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any request size.
//
// Handlers may also use [http.MaxBytesHandler] to limit the total size of the
// HTTP request stream (rather than the per-message size). Connect handles
// [http.MaxBytesError] specially, so clients still receive errors with the
// appropriate error code and informative messages.
func WithReadMaxBytes(max int) Option {
return &readMaxBytesOption{Max: max}
}
// WithSendMaxBytes prevents sending messages too large for the client/handler
// to handle without significant performance overhead. For handlers, WithSendMaxBytes
// limits the size of a message that the handler can respond with. For clients,
// WithSendMaxBytes limits the size of a message that the client can send. Limits
// apply to each message, not to the stream as a whole.
//
// Setting WithSendMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any message size.
func WithSendMaxBytes(max int) Option {
return &sendMaxBytesOption{Max: max}
}
// WithInterceptors configures a client or handler's interceptor stack. Repeated
// WithInterceptors options are applied in order, so
//
// WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)
//
// Unary interceptors compose like an onion. The first interceptor provided is
// the outermost layer of the onion: it acts first on the context and request,
// and last on the response and error.
//
// Stream interceptors also behave like an onion: the first interceptor
// provided is the outermost wrapper for the [StreamingClientConn] or
// [StreamingHandlerConn]. It's the first to see sent messages and the last to
// see received messages.
//
// Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:
//
// client.Send() client.Receive()
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// = = = = = = = = = = = = = = = =
// network
// = = = = = = = = = = = = = = = =
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// handler.Receive() handler.Send()
// | ^
// | |
// '-> handler logic >-'
//
// Note that in clients, Send handles the request message(s) and Receive
// handles the response message(s). For handlers, it's the reverse. Depending
// on your interceptor's logic, you may need to wrap one method in clients and
// the other in handlers.
func WithInterceptors(interceptors ...Interceptor) Option {
return &interceptorsOption{interceptors}
}
// WithOptions composes multiple Options into one.
func WithOptions(options ...Option) Option {
return &optionsOption{options}
}
type clientOptionsOption struct {
options []ClientOption
}
func (o *clientOptionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
type codecOption struct {
Codec Codec
}
func (o *codecOption) applyToClient(config *clientConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codec = o.Codec
}
func (o *codecOption) applyToHandler(config *handlerConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codecs[o.Codec.Name()] = o.Codec
}
type compressionOption struct {
Name string
CompressionPool *compressionPool
}
func (o *compressionOption) applyToClient(config *clientConfig) {
if o.Name == "" {
return
}
if o.CompressionPool == nil {
delete(config.CompressionPools, o.Name)
var names []string
for _, name := range config.CompressionNames {
if name == o.Name {
continue
}
names = append(names, name)
}
config.CompressionNames = names
return
}
config.CompressionPools[o.Name] = o.CompressionPool
config.CompressionNames = append(config.CompressionNames, o.Name)
}
func (o *compressionOption) applyToHandler(config *handlerConfig) {
if o.Name == "" || o.CompressionPool == nil {
return
}
config.CompressionPools[o.Name] = o.CompressionPool
config.CompressionNames = append(config.CompressionNames, o.Name)
}
type compressMinBytesOption struct {
Min int
}
func (o *compressMinBytesOption) applyToClient(config *clientConfig) {
config.CompressMinBytes = o.Min
}
func (o *compressMinBytesOption) applyToHandler(config *handlerConfig) {
config.CompressMinBytes = o.Min
}
type readMaxBytesOption struct {
Max int
}
func (o *readMaxBytesOption) applyToClient(config *clientConfig) {
config.ReadMaxBytes = o.Max
}
func (o *readMaxBytesOption) applyToHandler(config *handlerConfig) {
config.ReadMaxBytes = o.Max
}
type sendMaxBytesOption struct {
Max int
}
func (o *sendMaxBytesOption) applyToClient(config *clientConfig) {
config.SendMaxBytes = o.Max
}
func (o *sendMaxBytesOption) applyToHandler(config *handlerConfig) {
config.SendMaxBytes = o.Max
}
type handlerOptionsOption struct {
options []HandlerOption
}
func (o *handlerOptionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type requireConnectProtocolHeaderOption struct{}
func (o *requireConnectProtocolHeaderOption) applyToHandler(config *handlerConfig) {
config.RequireConnectProtocolHeader = true
}
type grpcOption struct {
web bool
}
func (o *grpcOption) applyToClient(config *clientConfig) {
config.Protocol = &protocolGRPC{web: o.web}
}
type interceptorsOption struct {
Interceptors []Interceptor
}
func (o *interceptorsOption) applyToClient(config *clientConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) applyToHandler(config *handlerConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) chainWith(current Interceptor) Interceptor {
if len(o.Interceptors) == 0 {
return current
}
if current == nil && len(o.Interceptors) == 1 {
return o.Interceptors[0]
}
if current == nil && len(o.Interceptors) > 1 {
return newChain(o.Interceptors)
}
return newChain(append([]Interceptor{current}, o.Interceptors...))
}
type optionsOption struct {
options []Option
}
func (o *optionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
func (o *optionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type sendCompressionOption struct {
Name string
}
func (o *sendCompressionOption) applyToClient(config *clientConfig) {
config.RequestCompressionName = o.Name
}
func withGzip() Option {
return &compressionOption{
Name: compressionGzip,
CompressionPool: newCompressionPool(
func() Decompressor { return &gzip.Reader{} },
func() Compressor { return gzip.NewWriter(io.Discard) },
),
}
}
func withProtoBinaryCodec() Option {
return WithCodec(&protoBinaryCodec{})
}
func withProtoJSONCodecs() HandlerOption {
return WithHandlerOptions(
WithCodec(&protoJSONCodec{codecNameJSON}),
WithCodec(&protoJSONCodec{codecNameJSONCharsetUTF8}),
)
}