Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jul 21, 2024
1 parent 6836a7a commit 7413759
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 70 deletions.
78 changes: 74 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,85 @@
# nats
# NATS
NATS is an open-source, lightweight, and high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures. It supports multiple messaging patterns, including publish/subscribe, request/reply, and queuing. Key features include:
- <b>Simplicity</b>: Easy to set up and use with minimal configuration.
- <b>Performance</b>: Low latency and high throughput.
- <b>Scalability</b>: Capable of handling millions of messages per second.
- <b>Fault Tolerance</b>: Supports clustering for high availability.
### Libraries for Google Pub/Sub
- GO: [NATS](https://github.com/core-go/nats). Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample)
- nodejs: [NATS](https://github.com/core-ts/nats). Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)

#### A common flow to consume a message from a message queue
![A common flow to consume a message from a message queue](https://cdn-images-1.medium.com/max/800/1*Y4QUN6QnfmJgaKigcNHbQA.png)
- The libraries to implement this flow are:
- [mq](https://github.com/core-go/mq) for GOLANG. Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample)
- [mq-one](https://www.npmjs.com/package/mq-one) for nodejs. Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)

### Use Cases of NATS
#### Microservices Communication:
- <b>Scenario</b>: Facilitating communication between microservices in a distributed system.
- <b>Benefit</b>: Provides low-latency, reliable messaging, ensuring efficient inter-service communication.
![Microservice Architecture](https://cdn-images-1.medium.com/max/800/1*vKeePO_UC73i7tfymSmYNA.png)
#### Financial Services:
- <b>Scenario</b>: Enabling real-time transactions and data updates.
- <b>Benefit</b>: Provides reliable and fast message delivery critical for financial applications.
#### Real-Time Data Streaming:
- <b>Scenario</b>: Streaming data in real-time from various sources to data processing systems.
- <b>Benefit</b>: Low latency ensures real-time data processing and analytics.
![A typical micro service](https://cdn-images-1.medium.com/max/800/1*d9kyekAbQYBxH-C6w38XZQ.png)
#### Event-Driven Architectures:
- <b>Scenario</b>: Building applications based on event-driven paradigms.
- <b>Benefit</b>: Decouples services, allowing for scalable and maintainable architectures.
#### IoT Messaging:
- <b>Scenario</b>: Handling communication between numerous IoT devices.
- <b>Benefit</b>: Supports lightweight, scalable messaging suitable for IoT environments.
#### Edge Computing:
- <b>Scenario</b>: Managing communication between edge devices and cloud services.
- <b>Benefit</b>: Efficiently handles data transfer and command execution with minimal latency.

### Comparison of NATS, Kafka, and RabbitMQ
#### NATS:
- <b>Type</b>: Lightweight, high-performance messaging system.
- <b>Use Cases</b>: Microservices communication, IoT messaging, real-time data streaming.
- <b>Delivery Guarantees</b>: At-most-once (standard), at-least-once with JetStream.
- <b>Persistence</b>: Optional (JetStream for persistence).
- <b>Latency</b>: Very low, optimized for speed.
- <b>Scalability</b>: Highly scalable with clustering.
#### Apache Kafka:
- <b>Type</b>: Distributed event streaming platform.
- <b>Use Cases</b>: High-throughput messaging, event sourcing, log aggregation.
- <b>Delivery Guarantees</b>: Configurable (at-least-once, exactly-once).
- <b>Persistence</b>: Durable storage with configurable retention.
- <b>Latency</b>: Higher due to disk persistence.
- <b>Scalability</b>: Highly scalable with partitioned topics.
#### RabbitMQ:
- <b>Type</b>: Message broker.
- <b>Use Cases</b>: Decoupling applications, job queuing, asynchronous communication.
- <b>Delivery Guarantees</b>: At-least-once, exactly-once (with transactions).
- <b>Persistence</b>: Persistent storage of messages.
- <b>Latency</b>: Moderate, designed for reliability.
- <b>Scalability</b>: Scalable with clustering and federation.

### Key Differences:
- <b>Latency and Performance</b>: NATS offers the lowest latency, Kafka provides high throughput with persistence, RabbitMQ balances reliability and performance.
- <b>Persistence</b>: Kafka and RabbitMQ offer strong persistence guarantees, while NATS focuses on speed with optional persistence.
- <b>Scalability</b>: All three are scalable, but Kafka excels in handling high-throughput event streams, NATS in low-latency scenarios, and RabbitMQ in reliable message delivery.

### Use Case Suitability:
- <b>NATS</b>: Best for real-time, low-latency communication in microservices and IoT.
- <b>Kafka</b>: Ideal for high-throughput event streaming and log aggregation.
- <b>RabbitMQ</b>: Suitable for reliable message queuing and asynchronous task processing.


## Installation

Please make sure to initialize a Go module before installing common-go/nats:
Please make sure to initialize a Go module before installing core-go/nats:

```shell
go get -u github.com/common-go/nats
go get -u github.com/core-go/nats
```

Import:

```go
import "github.com/common-go/nats"
import "github.com/core-go/nats"
```
7 changes: 4 additions & 3 deletions health_checker.go → health/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nats

import (
"context"
"github.com/nats-io/nats.go"
"net"
"time"
)
Expand Down Expand Up @@ -46,17 +45,19 @@ func (s *HealthChecker) Check(ctx context.Context) (map[string]interface{}, erro
}
conn, err := opts.Connect()
if err != nil {
return nil, err
return res, err
}
conn.Close()
res["status"] = "success"
return res, nil
}

func (s *HealthChecker) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} {
if err == nil {
return data
}
if data == nil {
data = make(map[string]interface{}, 0)
}
data["error"] = err.Error()
return data
}
24 changes: 12 additions & 12 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ import (
)

type ConnConfig struct {
Url string `mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"`
Options nats.Option `mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"`
Retry RetryConfig `mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
Url string `yaml:"url" mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"`
Option nats.Option `yaml:"option" mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"`
Retry RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
}
type RetryConfig struct {
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
Retry1 int64 `yaml:"1" mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
Retry2 int64 `yaml:"2" mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
Retry3 int64 `yaml:"3" mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
Retry4 int64 `yaml:"4" mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
Retry5 int64 `yaml:"5" mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
Retry6 int64 `yaml:"6" mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
Retry7 int64 `yaml:"7" mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
Retry8 int64 `yaml:"8" mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
Retry9 int64 `yaml:"9" mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
}

func NewConn(retries []time.Duration, url string, options ...nats.Option) (*nats.Conn, error) {
Expand Down
26 changes: 17 additions & 9 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,46 @@ func NewPublisher(conn *nats.Conn, subject string) *Publisher {
}
func NewPublisherByConfig(p PublisherConfig) (*Publisher, error) {
if p.Connection.Retry.Retry1 <= 0 {
conn, err := nats.Connect(p.Connection.Url, p.Connection.Options)
conn, err := nats.Connect(p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewPublisher(conn, p.Subject), nil
} else {
durations := DurationsFromValue(p.Connection.Retry, "Retry", 9)
conn, err := NewConn(durations, p.Connection.Url, p.Connection.Options)
conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewPublisher(conn, p.Subject), nil
}
}
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
defer p.Conn.Flush()
if attributes == nil {
err := p.Conn.Publish(p.Subject, data)
return "", err
return p.Conn.Publish(p.Subject, data)
} else {
header := MapToHeader(attributes)
var msg = &nats.Msg{
Subject: p.Subject,
Data: data,
Reply: "",
Header: *header,
Header: nats.Header(*header),
}
err := p.Conn.PublishMsg(msg)
return "", err
return p.Conn.PublishMsg(msg)
}
}

func (p *Publisher) PublishData(ctx context.Context, data []byte) error {
defer p.Conn.Flush()
return p.Conn.Publish(p.Subject, data)
}
func (p *Publisher) PublishMsg(msg *nats.Msg) error {
if msg == nil {
return nil
}
defer p.Conn.Flush()
return p.Conn.PublishMsg(msg)
}
func MapToHeader(attributes map[string]string) *http.Header {
if attributes == nil || len(attributes) == 0 {
return nil
Expand Down
4 changes: 2 additions & 2 deletions publisher_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package nats

type PublisherConfig struct {
Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
}
53 changes: 53 additions & 0 deletions subject_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package nats

import (
"context"
"github.com/nats-io/nats.go"
)

type SubjectPublisher struct {
Conn *nats.Conn
}

func NewSubjectPublisher(conn *nats.Conn) *SubjectPublisher {
return &SubjectPublisher{conn}
}
func NewSubjectPublisherByConfig(p PublisherConfig) (*SubjectPublisher, error) {
if p.Connection.Retry.Retry1 <= 0 {
conn, err := nats.Connect(p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewSubjectPublisher(conn), nil
} else {
durations := DurationsFromValue(p.Connection.Retry, "Retry", 9)
conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewSubjectPublisher(conn), nil
}
}
func (p *SubjectPublisher) Publish(ctx context.Context, subject string, data []byte, attributes map[string]string) error {
defer p.Conn.Flush()
if attributes == nil {
return p.Conn.Publish(subject, data)
} else {
header := MapToHeader(attributes)
var msg = &nats.Msg{
Subject: subject,
Data: data,
Reply: "",
Header: nats.Header(*header),
}
return p.Conn.PublishMsg(msg)
}
}
func (p *SubjectPublisher) PublishData(ctx context.Context, subject string, data []byte) error {
defer p.Conn.Flush()
return p.Conn.Publish(subject, data)
}
func (p *SubjectPublisher) PublishMsg(subject string, data []byte) error {
defer p.Conn.Flush()
return p.Conn.Publish(subject, data)
}
69 changes: 32 additions & 37 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,62 @@ package nats

import (
"context"
"github.com/common-go/mq"
"github.com/nats-io/nats.go"
"net/http"
"runtime"
)

type Subscriber struct {
Conn *nats.Conn
Subject string
Header bool
Conn *nats.Conn
Subject string
LogError func(ctx context.Context, msg string)
}

func NewSubscriber(conn *nats.Conn, subject string, header bool) *Subscriber {
return &Subscriber{conn, subject, header}
func NewSubscriber(conn *nats.Conn, subject string, logError func(ctx context.Context, msg string)) *Subscriber {
return &Subscriber{conn, subject, logError}
}

func NewSubscriberByConfig(c SubscriberConfig) (*Subscriber, error) {
func NewSubscriberByConfig(c SubscriberConfig, logError func(ctx context.Context, msg string)) (*Subscriber, error) {
if c.Connection.Retry.Retry1 <= 0 {
conn, err := nats.Connect(c.Connection.Url, c.Connection.Options)
conn, err := nats.Connect(c.Connection.Url, c.Connection.Option)
if err != nil {
return nil, err
}
return NewSubscriber(conn, c.Subject, c.Header), nil
return NewSubscriber(conn, c.Subject, logError), nil
} else {
durations := DurationsFromValue(c.Connection.Retry, "Retry", 9)
conn, err := NewConn(durations, c.Connection.Url, c.Connection.Options)
conn, err := NewConn(durations, c.Connection.Url, c.Connection.Option)
if err != nil {
return nil, err
}
return NewSubscriber(conn, c.Subject, c.Header), nil
return NewSubscriber(conn, c.Subject, logError), nil
}
}

func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
if c.Header {
c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
attrs := HeaderToMap(msg.Header)
message := &mq.Message{
Data: msg.Data,
Attributes: attrs,
Raw: msg,
}
handle(ctx, message, nil)
})
c.Conn.Flush()
runtime.Goexit()
} else {
c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
message := &mq.Message{
Data: msg.Data,
Raw: msg,
}
handle(ctx, message, nil)
})
c.Conn.Flush()
runtime.Goexit()
}
func (c *Subscriber) SubscribeMsg(ctx context.Context, handle func(context.Context, *nats.Msg)) {
c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
handle(ctx, msg)
})
c.Conn.Flush()
runtime.Goexit()
}
func (c *Subscriber) SubscribeData(ctx context.Context, handle func(context.Context, []byte)) {
c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
handle(ctx, msg.Data)
})
c.Conn.Flush()
runtime.Goexit()
}
func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, []byte, map[string]string)) {
c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
attrs := HeaderToMap(http.Header(msg.Header))
handle(ctx, msg.Data, attrs)
})
c.Conn.Flush()
runtime.Goexit()
}

func HeaderToMap(header http.Header) map[string]string {
attributes := make(map[string]string, 0)
attributes := make(map[string]string)
for name, values := range header {
for _, value := range values {
attributes[name] = value
Expand Down
5 changes: 2 additions & 3 deletions subscriber_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nats

type SubscriberConfig struct {
Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
Header bool `mapstructure:"header" json:"header,omitempty" gorm:"column:header" bson:"header,omitempty" dynamodbav:"header,omitempty" firestore:"header,omitempty"`
Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
}

0 comments on commit 7413759

Please sign in to comment.