From 741375952b292cf5537e9d108fdd9a3c3cb5273a Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Sun, 21 Jul 2024 14:24:27 +0700 Subject: [PATCH] Refactor code --- README.md | 78 ++++++++++++++++++- health_checker.go => health/health_checker.go | 7 +- nats.go | 24 +++--- publisher.go | 26 ++++--- publisher_config.go | 4 +- subject_publisher.go | 53 +++++++++++++ subscriber.go | 69 ++++++++-------- subscriber_config.go | 5 +- 8 files changed, 196 insertions(+), 70 deletions(-) rename health_checker.go => health/health_checker.go (94%) create mode 100644 subject_publisher.go diff --git a/README.md b/README.md index 69ea207..969452c 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,85 @@ -# nats +# NATS +NATS is an open-source, lightweight, and high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures. It supports multiple messaging patterns, including publish/subscribe, request/reply, and queuing. Key features include: +- Simplicity: Easy to set up and use with minimal configuration. +- Performance: Low latency and high throughput. +- Scalability: Capable of handling millions of messages per second. +- Fault Tolerance: Supports clustering for high availability. +### Libraries for Google Pub/Sub +- GO: [NATS](https://github.com/core-go/nats). Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample) +- nodejs: [NATS](https://github.com/core-ts/nats). Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample) + +#### A common flow to consume a message from a message queue +![A common flow to consume a message from a message queue](https://cdn-images-1.medium.com/max/800/1*Y4QUN6QnfmJgaKigcNHbQA.png) +- The libraries to implement this flow are: + - [mq](https://github.com/core-go/mq) for GOLANG. Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample) + - [mq-one](https://www.npmjs.com/package/mq-one) for nodejs. Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample) + +### Use Cases of NATS +#### Microservices Communication: +- Scenario: Facilitating communication between microservices in a distributed system. +- Benefit: Provides low-latency, reliable messaging, ensuring efficient inter-service communication. + ![Microservice Architecture](https://cdn-images-1.medium.com/max/800/1*vKeePO_UC73i7tfymSmYNA.png) +#### Financial Services: +- Scenario: Enabling real-time transactions and data updates. +- Benefit: Provides reliable and fast message delivery critical for financial applications. +#### Real-Time Data Streaming: +- Scenario: Streaming data in real-time from various sources to data processing systems. +- Benefit: Low latency ensures real-time data processing and analytics. + ![A typical micro service](https://cdn-images-1.medium.com/max/800/1*d9kyekAbQYBxH-C6w38XZQ.png) +#### Event-Driven Architectures: +- Scenario: Building applications based on event-driven paradigms. +- Benefit: Decouples services, allowing for scalable and maintainable architectures. +#### IoT Messaging: +- Scenario: Handling communication between numerous IoT devices. +- Benefit: Supports lightweight, scalable messaging suitable for IoT environments. +#### Edge Computing: +- Scenario: Managing communication between edge devices and cloud services. +- Benefit: Efficiently handles data transfer and command execution with minimal latency. + +### Comparison of NATS, Kafka, and RabbitMQ +#### NATS: +- Type: Lightweight, high-performance messaging system. +- Use Cases: Microservices communication, IoT messaging, real-time data streaming. +- Delivery Guarantees: At-most-once (standard), at-least-once with JetStream. +- Persistence: Optional (JetStream for persistence). +- Latency: Very low, optimized for speed. +- Scalability: Highly scalable with clustering. +#### Apache Kafka: +- Type: Distributed event streaming platform. +- Use Cases: High-throughput messaging, event sourcing, log aggregation. +- Delivery Guarantees: Configurable (at-least-once, exactly-once). +- Persistence: Durable storage with configurable retention. +- Latency: Higher due to disk persistence. +- Scalability: Highly scalable with partitioned topics. +#### RabbitMQ: +- Type: Message broker. +- Use Cases: Decoupling applications, job queuing, asynchronous communication. +- Delivery Guarantees: At-least-once, exactly-once (with transactions). +- Persistence: Persistent storage of messages. +- Latency: Moderate, designed for reliability. +- Scalability: Scalable with clustering and federation. + +### Key Differences: +- Latency and Performance: NATS offers the lowest latency, Kafka provides high throughput with persistence, RabbitMQ balances reliability and performance. +- Persistence: Kafka and RabbitMQ offer strong persistence guarantees, while NATS focuses on speed with optional persistence. +- Scalability: All three are scalable, but Kafka excels in handling high-throughput event streams, NATS in low-latency scenarios, and RabbitMQ in reliable message delivery. + +### Use Case Suitability: +- NATS: Best for real-time, low-latency communication in microservices and IoT. +- Kafka: Ideal for high-throughput event streaming and log aggregation. +- RabbitMQ: Suitable for reliable message queuing and asynchronous task processing. + ## Installation -Please make sure to initialize a Go module before installing common-go/nats: +Please make sure to initialize a Go module before installing core-go/nats: ```shell -go get -u github.com/common-go/nats +go get -u github.com/core-go/nats ``` Import: ```go -import "github.com/common-go/nats" +import "github.com/core-go/nats" ``` diff --git a/health_checker.go b/health/health_checker.go similarity index 94% rename from health_checker.go rename to health/health_checker.go index 01fe5cf..fa2e8e7 100644 --- a/health_checker.go +++ b/health/health_checker.go @@ -2,7 +2,6 @@ package nats import ( "context" - "github.com/nats-io/nats.go" "net" "time" ) @@ -46,10 +45,9 @@ func (s *HealthChecker) Check(ctx context.Context) (map[string]interface{}, erro } conn, err := opts.Connect() if err != nil { - return nil, err + return res, err } conn.Close() - res["status"] = "success" return res, nil } @@ -57,6 +55,9 @@ func (s *HealthChecker) Build(ctx context.Context, data map[string]interface{}, if err == nil { return data } + if data == nil { + data = make(map[string]interface{}, 0) + } data["error"] = err.Error() return data } diff --git a/nats.go b/nats.go index 95d5042..b587229 100644 --- a/nats.go +++ b/nats.go @@ -10,20 +10,20 @@ import ( ) type ConnConfig struct { - Url string `mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"` - Options nats.Option `mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"` - Retry RetryConfig `mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"` + Url string `yaml:"url" mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"` + Option nats.Option `yaml:"option" mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"` + Retry RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"` } type RetryConfig struct { - Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"` - Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"` - Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"` - Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"` - Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"` - Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"` - Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"` - Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"` - Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"` + Retry1 int64 `yaml:"1" mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"` + Retry2 int64 `yaml:"2" mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"` + Retry3 int64 `yaml:"3" mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"` + Retry4 int64 `yaml:"4" mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"` + Retry5 int64 `yaml:"5" mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"` + Retry6 int64 `yaml:"6" mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"` + Retry7 int64 `yaml:"7" mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"` + Retry8 int64 `yaml:"8" mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"` + Retry9 int64 `yaml:"9" mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"` } func NewConn(retries []time.Duration, url string, options ...nats.Option) (*nats.Conn, error) { diff --git a/publisher.go b/publisher.go index bdd67a5..ab8676b 100644 --- a/publisher.go +++ b/publisher.go @@ -16,38 +16,46 @@ func NewPublisher(conn *nats.Conn, subject string) *Publisher { } func NewPublisherByConfig(p PublisherConfig) (*Publisher, error) { if p.Connection.Retry.Retry1 <= 0 { - conn, err := nats.Connect(p.Connection.Url, p.Connection.Options) + conn, err := nats.Connect(p.Connection.Url, p.Connection.Option) if err != nil { return nil, err } return NewPublisher(conn, p.Subject), nil } else { durations := DurationsFromValue(p.Connection.Retry, "Retry", 9) - conn, err := NewConn(durations, p.Connection.Url, p.Connection.Options) + conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option) if err != nil { return nil, err } return NewPublisher(conn, p.Subject), nil } } -func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) { +func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error { defer p.Conn.Flush() if attributes == nil { - err := p.Conn.Publish(p.Subject, data) - return "", err + return p.Conn.Publish(p.Subject, data) } else { header := MapToHeader(attributes) var msg = &nats.Msg{ Subject: p.Subject, Data: data, Reply: "", - Header: *header, + Header: nats.Header(*header), } - err := p.Conn.PublishMsg(msg) - return "", err + return p.Conn.PublishMsg(msg) } } - +func (p *Publisher) PublishData(ctx context.Context, data []byte) error { + defer p.Conn.Flush() + return p.Conn.Publish(p.Subject, data) +} +func (p *Publisher) PublishMsg(msg *nats.Msg) error { + if msg == nil { + return nil + } + defer p.Conn.Flush() + return p.Conn.PublishMsg(msg) +} func MapToHeader(attributes map[string]string) *http.Header { if attributes == nil || len(attributes) == 0 { return nil diff --git a/publisher_config.go b/publisher_config.go index d0ed9d0..e1188cd 100644 --- a/publisher_config.go +++ b/publisher_config.go @@ -1,6 +1,6 @@ package nats type PublisherConfig struct { - Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"` - Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"` + Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"` + Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"` } diff --git a/subject_publisher.go b/subject_publisher.go new file mode 100644 index 0000000..0752948 --- /dev/null +++ b/subject_publisher.go @@ -0,0 +1,53 @@ +package nats + +import ( + "context" + "github.com/nats-io/nats.go" +) + +type SubjectPublisher struct { + Conn *nats.Conn +} + +func NewSubjectPublisher(conn *nats.Conn) *SubjectPublisher { + return &SubjectPublisher{conn} +} +func NewSubjectPublisherByConfig(p PublisherConfig) (*SubjectPublisher, error) { + if p.Connection.Retry.Retry1 <= 0 { + conn, err := nats.Connect(p.Connection.Url, p.Connection.Option) + if err != nil { + return nil, err + } + return NewSubjectPublisher(conn), nil + } else { + durations := DurationsFromValue(p.Connection.Retry, "Retry", 9) + conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option) + if err != nil { + return nil, err + } + return NewSubjectPublisher(conn), nil + } +} +func (p *SubjectPublisher) Publish(ctx context.Context, subject string, data []byte, attributes map[string]string) error { + defer p.Conn.Flush() + if attributes == nil { + return p.Conn.Publish(subject, data) + } else { + header := MapToHeader(attributes) + var msg = &nats.Msg{ + Subject: subject, + Data: data, + Reply: "", + Header: nats.Header(*header), + } + return p.Conn.PublishMsg(msg) + } +} +func (p *SubjectPublisher) PublishData(ctx context.Context, subject string, data []byte) error { + defer p.Conn.Flush() + return p.Conn.Publish(subject, data) +} +func (p *SubjectPublisher) PublishMsg(subject string, data []byte) error { + defer p.Conn.Flush() + return p.Conn.Publish(subject, data) +} diff --git a/subscriber.go b/subscriber.go index 28e1dad..645b044 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,67 +2,62 @@ package nats import ( "context" - "github.com/common-go/mq" "github.com/nats-io/nats.go" "net/http" "runtime" ) type Subscriber struct { - Conn *nats.Conn - Subject string - Header bool + Conn *nats.Conn + Subject string + LogError func(ctx context.Context, msg string) } -func NewSubscriber(conn *nats.Conn, subject string, header bool) *Subscriber { - return &Subscriber{conn, subject, header} +func NewSubscriber(conn *nats.Conn, subject string, logError func(ctx context.Context, msg string)) *Subscriber { + return &Subscriber{conn, subject, logError} } -func NewSubscriberByConfig(c SubscriberConfig) (*Subscriber, error) { +func NewSubscriberByConfig(c SubscriberConfig, logError func(ctx context.Context, msg string)) (*Subscriber, error) { if c.Connection.Retry.Retry1 <= 0 { - conn, err := nats.Connect(c.Connection.Url, c.Connection.Options) + conn, err := nats.Connect(c.Connection.Url, c.Connection.Option) if err != nil { return nil, err } - return NewSubscriber(conn, c.Subject, c.Header), nil + return NewSubscriber(conn, c.Subject, logError), nil } else { durations := DurationsFromValue(c.Connection.Retry, "Retry", 9) - conn, err := NewConn(durations, c.Connection.Url, c.Connection.Options) + conn, err := NewConn(durations, c.Connection.Url, c.Connection.Option) if err != nil { return nil, err } - return NewSubscriber(conn, c.Subject, c.Header), nil + return NewSubscriber(conn, c.Subject, logError), nil } } - -func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, *mq.Message, error) error) { - if c.Header { - c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) { - attrs := HeaderToMap(msg.Header) - message := &mq.Message{ - Data: msg.Data, - Attributes: attrs, - Raw: msg, - } - handle(ctx, message, nil) - }) - c.Conn.Flush() - runtime.Goexit() - } else { - c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) { - message := &mq.Message{ - Data: msg.Data, - Raw: msg, - } - handle(ctx, message, nil) - }) - c.Conn.Flush() - runtime.Goexit() - } +func (c *Subscriber) SubscribeMsg(ctx context.Context, handle func(context.Context, *nats.Msg)) { + c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) { + handle(ctx, msg) + }) + c.Conn.Flush() + runtime.Goexit() +} +func (c *Subscriber) SubscribeData(ctx context.Context, handle func(context.Context, []byte)) { + c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) { + handle(ctx, msg.Data) + }) + c.Conn.Flush() + runtime.Goexit() +} +func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, []byte, map[string]string)) { + c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) { + attrs := HeaderToMap(http.Header(msg.Header)) + handle(ctx, msg.Data, attrs) + }) + c.Conn.Flush() + runtime.Goexit() } func HeaderToMap(header http.Header) map[string]string { - attributes := make(map[string]string, 0) + attributes := make(map[string]string) for name, values := range header { for _, value := range values { attributes[name] = value diff --git a/subscriber_config.go b/subscriber_config.go index 53b0b6e..8aed13f 100644 --- a/subscriber_config.go +++ b/subscriber_config.go @@ -1,7 +1,6 @@ package nats type SubscriberConfig struct { - Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"` - Header bool `mapstructure:"header" json:"header,omitempty" gorm:"column:header" bson:"header,omitempty" dynamodbav:"header,omitempty" firestore:"header,omitempty"` - Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"` + Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"` + Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"` }