diff --git a/README.md b/README.md
index 69ea207..969452c 100644
--- a/README.md
+++ b/README.md
@@ -1,15 +1,85 @@
-# nats
+# NATS
+NATS is an open-source, lightweight, and high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures. It supports multiple messaging patterns, including publish/subscribe, request/reply, and queuing. Key features include:
+- Simplicity: Easy to set up and use with minimal configuration.
+- Performance: Low latency and high throughput.
+- Scalability: Capable of handling millions of messages per second.
+- Fault Tolerance: Supports clustering for high availability.
+### Libraries for Google Pub/Sub
+- GO: [NATS](https://github.com/core-go/nats). Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample)
+- nodejs: [NATS](https://github.com/core-ts/nats). Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)
+
+#### A common flow to consume a message from a message queue
+![A common flow to consume a message from a message queue](https://cdn-images-1.medium.com/max/800/1*Y4QUN6QnfmJgaKigcNHbQA.png)
+- The libraries to implement this flow are:
+ - [mq](https://github.com/core-go/mq) for GOLANG. Example is at [go-nats-sample](https://github.com/project-samples/go-nats-sample)
+ - [mq-one](https://www.npmjs.com/package/mq-one) for nodejs. Example is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)
+
+### Use Cases of NATS
+#### Microservices Communication:
+- Scenario: Facilitating communication between microservices in a distributed system.
+- Benefit: Provides low-latency, reliable messaging, ensuring efficient inter-service communication.
+ ![Microservice Architecture](https://cdn-images-1.medium.com/max/800/1*vKeePO_UC73i7tfymSmYNA.png)
+#### Financial Services:
+- Scenario: Enabling real-time transactions and data updates.
+- Benefit: Provides reliable and fast message delivery critical for financial applications.
+#### Real-Time Data Streaming:
+- Scenario: Streaming data in real-time from various sources to data processing systems.
+- Benefit: Low latency ensures real-time data processing and analytics.
+ ![A typical micro service](https://cdn-images-1.medium.com/max/800/1*d9kyekAbQYBxH-C6w38XZQ.png)
+#### Event-Driven Architectures:
+- Scenario: Building applications based on event-driven paradigms.
+- Benefit: Decouples services, allowing for scalable and maintainable architectures.
+#### IoT Messaging:
+- Scenario: Handling communication between numerous IoT devices.
+- Benefit: Supports lightweight, scalable messaging suitable for IoT environments.
+#### Edge Computing:
+- Scenario: Managing communication between edge devices and cloud services.
+- Benefit: Efficiently handles data transfer and command execution with minimal latency.
+
+### Comparison of NATS, Kafka, and RabbitMQ
+#### NATS:
+- Type: Lightweight, high-performance messaging system.
+- Use Cases: Microservices communication, IoT messaging, real-time data streaming.
+- Delivery Guarantees: At-most-once (standard), at-least-once with JetStream.
+- Persistence: Optional (JetStream for persistence).
+- Latency: Very low, optimized for speed.
+- Scalability: Highly scalable with clustering.
+#### Apache Kafka:
+- Type: Distributed event streaming platform.
+- Use Cases: High-throughput messaging, event sourcing, log aggregation.
+- Delivery Guarantees: Configurable (at-least-once, exactly-once).
+- Persistence: Durable storage with configurable retention.
+- Latency: Higher due to disk persistence.
+- Scalability: Highly scalable with partitioned topics.
+#### RabbitMQ:
+- Type: Message broker.
+- Use Cases: Decoupling applications, job queuing, asynchronous communication.
+- Delivery Guarantees: At-least-once, exactly-once (with transactions).
+- Persistence: Persistent storage of messages.
+- Latency: Moderate, designed for reliability.
+- Scalability: Scalable with clustering and federation.
+
+### Key Differences:
+- Latency and Performance: NATS offers the lowest latency, Kafka provides high throughput with persistence, RabbitMQ balances reliability and performance.
+- Persistence: Kafka and RabbitMQ offer strong persistence guarantees, while NATS focuses on speed with optional persistence.
+- Scalability: All three are scalable, but Kafka excels in handling high-throughput event streams, NATS in low-latency scenarios, and RabbitMQ in reliable message delivery.
+
+### Use Case Suitability:
+- NATS: Best for real-time, low-latency communication in microservices and IoT.
+- Kafka: Ideal for high-throughput event streaming and log aggregation.
+- RabbitMQ: Suitable for reliable message queuing and asynchronous task processing.
+
## Installation
-Please make sure to initialize a Go module before installing common-go/nats:
+Please make sure to initialize a Go module before installing core-go/nats:
```shell
-go get -u github.com/common-go/nats
+go get -u github.com/core-go/nats
```
Import:
```go
-import "github.com/common-go/nats"
+import "github.com/core-go/nats"
```
diff --git a/health_checker.go b/health/health_checker.go
similarity index 94%
rename from health_checker.go
rename to health/health_checker.go
index 01fe5cf..fa2e8e7 100644
--- a/health_checker.go
+++ b/health/health_checker.go
@@ -2,7 +2,6 @@ package nats
import (
"context"
- "github.com/nats-io/nats.go"
"net"
"time"
)
@@ -46,10 +45,9 @@ func (s *HealthChecker) Check(ctx context.Context) (map[string]interface{}, erro
}
conn, err := opts.Connect()
if err != nil {
- return nil, err
+ return res, err
}
conn.Close()
- res["status"] = "success"
return res, nil
}
@@ -57,6 +55,9 @@ func (s *HealthChecker) Build(ctx context.Context, data map[string]interface{},
if err == nil {
return data
}
+ if data == nil {
+ data = make(map[string]interface{}, 0)
+ }
data["error"] = err.Error()
return data
}
diff --git a/nats.go b/nats.go
index 95d5042..b587229 100644
--- a/nats.go
+++ b/nats.go
@@ -10,20 +10,20 @@ import (
)
type ConnConfig struct {
- Url string `mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"`
- Options nats.Option `mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"`
- Retry RetryConfig `mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
+ Url string `yaml:"url" mapstructure:"url" json:"url,omitempty" gorm:"column:url" bson:"url,omitempty" dynamodbav:"url,omitempty" firestore:"url,omitempty"`
+ Option nats.Option `yaml:"option" mapstructure:"option" json:"option,omitempty" gorm:"column:option" bson:"option,omitempty" dynamodbav:"option,omitempty" firestore:"option,omitempty"`
+ Retry RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
}
type RetryConfig struct {
- Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
- Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
- Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
- Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
- Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
- Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
- Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
- Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
- Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
+ Retry1 int64 `yaml:"1" mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
+ Retry2 int64 `yaml:"2" mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
+ Retry3 int64 `yaml:"3" mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
+ Retry4 int64 `yaml:"4" mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
+ Retry5 int64 `yaml:"5" mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
+ Retry6 int64 `yaml:"6" mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
+ Retry7 int64 `yaml:"7" mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
+ Retry8 int64 `yaml:"8" mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
+ Retry9 int64 `yaml:"9" mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
}
func NewConn(retries []time.Duration, url string, options ...nats.Option) (*nats.Conn, error) {
diff --git a/publisher.go b/publisher.go
index bdd67a5..ab8676b 100644
--- a/publisher.go
+++ b/publisher.go
@@ -16,38 +16,46 @@ func NewPublisher(conn *nats.Conn, subject string) *Publisher {
}
func NewPublisherByConfig(p PublisherConfig) (*Publisher, error) {
if p.Connection.Retry.Retry1 <= 0 {
- conn, err := nats.Connect(p.Connection.Url, p.Connection.Options)
+ conn, err := nats.Connect(p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewPublisher(conn, p.Subject), nil
} else {
durations := DurationsFromValue(p.Connection.Retry, "Retry", 9)
- conn, err := NewConn(durations, p.Connection.Url, p.Connection.Options)
+ conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option)
if err != nil {
return nil, err
}
return NewPublisher(conn, p.Subject), nil
}
}
-func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
+func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
defer p.Conn.Flush()
if attributes == nil {
- err := p.Conn.Publish(p.Subject, data)
- return "", err
+ return p.Conn.Publish(p.Subject, data)
} else {
header := MapToHeader(attributes)
var msg = &nats.Msg{
Subject: p.Subject,
Data: data,
Reply: "",
- Header: *header,
+ Header: nats.Header(*header),
}
- err := p.Conn.PublishMsg(msg)
- return "", err
+ return p.Conn.PublishMsg(msg)
}
}
-
+func (p *Publisher) PublishData(ctx context.Context, data []byte) error {
+ defer p.Conn.Flush()
+ return p.Conn.Publish(p.Subject, data)
+}
+func (p *Publisher) PublishMsg(msg *nats.Msg) error {
+ if msg == nil {
+ return nil
+ }
+ defer p.Conn.Flush()
+ return p.Conn.PublishMsg(msg)
+}
func MapToHeader(attributes map[string]string) *http.Header {
if attributes == nil || len(attributes) == 0 {
return nil
diff --git a/publisher_config.go b/publisher_config.go
index d0ed9d0..e1188cd 100644
--- a/publisher_config.go
+++ b/publisher_config.go
@@ -1,6 +1,6 @@
package nats
type PublisherConfig struct {
- Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
- Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
+ Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
+ Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
}
diff --git a/subject_publisher.go b/subject_publisher.go
new file mode 100644
index 0000000..0752948
--- /dev/null
+++ b/subject_publisher.go
@@ -0,0 +1,53 @@
+package nats
+
+import (
+ "context"
+ "github.com/nats-io/nats.go"
+)
+
+type SubjectPublisher struct {
+ Conn *nats.Conn
+}
+
+func NewSubjectPublisher(conn *nats.Conn) *SubjectPublisher {
+ return &SubjectPublisher{conn}
+}
+func NewSubjectPublisherByConfig(p PublisherConfig) (*SubjectPublisher, error) {
+ if p.Connection.Retry.Retry1 <= 0 {
+ conn, err := nats.Connect(p.Connection.Url, p.Connection.Option)
+ if err != nil {
+ return nil, err
+ }
+ return NewSubjectPublisher(conn), nil
+ } else {
+ durations := DurationsFromValue(p.Connection.Retry, "Retry", 9)
+ conn, err := NewConn(durations, p.Connection.Url, p.Connection.Option)
+ if err != nil {
+ return nil, err
+ }
+ return NewSubjectPublisher(conn), nil
+ }
+}
+func (p *SubjectPublisher) Publish(ctx context.Context, subject string, data []byte, attributes map[string]string) error {
+ defer p.Conn.Flush()
+ if attributes == nil {
+ return p.Conn.Publish(subject, data)
+ } else {
+ header := MapToHeader(attributes)
+ var msg = &nats.Msg{
+ Subject: subject,
+ Data: data,
+ Reply: "",
+ Header: nats.Header(*header),
+ }
+ return p.Conn.PublishMsg(msg)
+ }
+}
+func (p *SubjectPublisher) PublishData(ctx context.Context, subject string, data []byte) error {
+ defer p.Conn.Flush()
+ return p.Conn.Publish(subject, data)
+}
+func (p *SubjectPublisher) PublishMsg(subject string, data []byte) error {
+ defer p.Conn.Flush()
+ return p.Conn.Publish(subject, data)
+}
diff --git a/subscriber.go b/subscriber.go
index 28e1dad..645b044 100644
--- a/subscriber.go
+++ b/subscriber.go
@@ -2,67 +2,62 @@ package nats
import (
"context"
- "github.com/common-go/mq"
"github.com/nats-io/nats.go"
"net/http"
"runtime"
)
type Subscriber struct {
- Conn *nats.Conn
- Subject string
- Header bool
+ Conn *nats.Conn
+ Subject string
+ LogError func(ctx context.Context, msg string)
}
-func NewSubscriber(conn *nats.Conn, subject string, header bool) *Subscriber {
- return &Subscriber{conn, subject, header}
+func NewSubscriber(conn *nats.Conn, subject string, logError func(ctx context.Context, msg string)) *Subscriber {
+ return &Subscriber{conn, subject, logError}
}
-func NewSubscriberByConfig(c SubscriberConfig) (*Subscriber, error) {
+func NewSubscriberByConfig(c SubscriberConfig, logError func(ctx context.Context, msg string)) (*Subscriber, error) {
if c.Connection.Retry.Retry1 <= 0 {
- conn, err := nats.Connect(c.Connection.Url, c.Connection.Options)
+ conn, err := nats.Connect(c.Connection.Url, c.Connection.Option)
if err != nil {
return nil, err
}
- return NewSubscriber(conn, c.Subject, c.Header), nil
+ return NewSubscriber(conn, c.Subject, logError), nil
} else {
durations := DurationsFromValue(c.Connection.Retry, "Retry", 9)
- conn, err := NewConn(durations, c.Connection.Url, c.Connection.Options)
+ conn, err := NewConn(durations, c.Connection.Url, c.Connection.Option)
if err != nil {
return nil, err
}
- return NewSubscriber(conn, c.Subject, c.Header), nil
+ return NewSubscriber(conn, c.Subject, logError), nil
}
}
-
-func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
- if c.Header {
- c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
- attrs := HeaderToMap(msg.Header)
- message := &mq.Message{
- Data: msg.Data,
- Attributes: attrs,
- Raw: msg,
- }
- handle(ctx, message, nil)
- })
- c.Conn.Flush()
- runtime.Goexit()
- } else {
- c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
- message := &mq.Message{
- Data: msg.Data,
- Raw: msg,
- }
- handle(ctx, message, nil)
- })
- c.Conn.Flush()
- runtime.Goexit()
- }
+func (c *Subscriber) SubscribeMsg(ctx context.Context, handle func(context.Context, *nats.Msg)) {
+ c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
+ handle(ctx, msg)
+ })
+ c.Conn.Flush()
+ runtime.Goexit()
+}
+func (c *Subscriber) SubscribeData(ctx context.Context, handle func(context.Context, []byte)) {
+ c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
+ handle(ctx, msg.Data)
+ })
+ c.Conn.Flush()
+ runtime.Goexit()
+}
+func (c *Subscriber) Subscribe(ctx context.Context, handle func(context.Context, []byte, map[string]string)) {
+ c.Conn.Subscribe(c.Subject, func(msg *nats.Msg) {
+ attrs := HeaderToMap(http.Header(msg.Header))
+ handle(ctx, msg.Data, attrs)
+ })
+ c.Conn.Flush()
+ runtime.Goexit()
}
func HeaderToMap(header http.Header) map[string]string {
- attributes := make(map[string]string, 0)
+ attributes := make(map[string]string)
for name, values := range header {
for _, value := range values {
attributes[name] = value
diff --git a/subscriber_config.go b/subscriber_config.go
index 53b0b6e..8aed13f 100644
--- a/subscriber_config.go
+++ b/subscriber_config.go
@@ -1,7 +1,6 @@
package nats
type SubscriberConfig struct {
- Subject string `mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
- Header bool `mapstructure:"header" json:"header,omitempty" gorm:"column:header" bson:"header,omitempty" dynamodbav:"header,omitempty" firestore:"header,omitempty"`
- Connection ConnConfig `mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
+ Subject string `yaml:"subject" mapstructure:"subject" json:"subject,omitempty" gorm:"column:subject" bson:"subject,omitempty" dynamodbav:"subject,omitempty" firestore:"subject,omitempty"`
+ Connection ConnConfig `yaml:"connection" mapstructure:"connection" json:"connection,omitempty" gorm:"column:connection" bson:"connection,omitempty" dynamodbav:"connection,omitempty" firestore:"connection,omitempty"`
}