forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
100 lines (85 loc) · 3.04 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package kafka
import "github.com/segmentio/kafka-go/protocol"
// Broker represents a kafka broker in a kafka cluster.
type Broker struct {
Host string
Port int
ID int
Rack string
}
// Topic represents a topic in a kafka cluster.
type Topic struct {
// Name of the topic.
Name string
// True if the topic is internal.
Internal bool
// The list of partition currently available on this topic.
Partitions []Partition
// An error that may have occurred while attempting to read the topic
// metadata.
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker. Programs may use the standard errors.Is
// function to test the error against kafka error codes.
Error error
}
// Partition carries the metadata associated with a kafka partition.
type Partition struct {
// Name of the topic that the partition belongs to, and its index in the
// topic.
Topic string
ID int
// Leader, replicas, and ISR for the partition.
//
// When no physical host is known to be running a broker, the Host and Port
// fields will be set to the zero values. The logical broker ID is always
// set to the value known to the kafka cluster, even if the broker is not
// currently backed by a physical host.
Leader Broker
Replicas []Broker
Isr []Broker
// Available only with metadata API level >= 6:
OfflineReplicas []Broker
// An error that may have occurred while attempting to read the partition
// metadata.
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker. Programs may use the standard errors.Is
// function to test the error against kafka error codes.
Error error
}
// Marshal encodes v into a binary representation of the value in the kafka data
// format.
//
// If v is a, or contains struct types, the kafka struct fields are interpreted
// and may contain one of these values:
//
// nullable valid on bytes and strings, encodes as a nullable value
// compact valid on strings, encodes as a compact string
//
// The kafka struct tags should not contain min and max versions. If you need to
// encode types based on specific versions of kafka APIs, use the Version type
// instead.
func Marshal(v interface{}) ([]byte, error) {
return protocol.Marshal(-1, v)
}
// Unmarshal decodes a binary representation from b into v.
//
// See Marshal for details.
func Unmarshal(b []byte, v interface{}) error {
return protocol.Unmarshal(b, -1, v)
}
// Version represents a version number for kafka APIs.
type Version int16
// Marshal is like the top-level Marshal function, but will only encode struct
// fields for which n falls within the min and max versions specified on the
// struct tag.
func (n Version) Marshal(v interface{}) ([]byte, error) {
return protocol.Marshal(int16(n), v)
}
// Unmarshal is like the top-level Unmarshal function, but will only decode
// struct fields for which n falls within the min and max versions specified on
// the struct tag.
func (n Version) Unmarshal(b []byte, v interface{}) error {
return protocol.Unmarshal(b, int16(n), v)
}