Skip to content

Commit

Permalink
feat: kafka setup
Browse files Browse the repository at this point in the history
  • Loading branch information
CaioTeixeira95 committed Dec 11, 2023
1 parent cb24b67 commit cf5c1bf
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 16 deletions.
32 changes: 32 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/data"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler/jobs"
Expand Down Expand Up @@ -309,6 +310,29 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
FlagDefault: true,
Required: false,
},
{
Name: "brokers",
Usage: "List of Message Brokers Connection string comma separated.",
OptType: types.String,
ConfigKey: &serveOpts.Brokers,
CustomSetValue: cmdUtils.SetConfigOptionURLList,
Required: true,
},
{
Name: "topics",
Usage: "List of Message Brokers Consumer Topics comma separated.",
OptType: types.String,
ConfigKey: &serveOpts.Topics,
CustomSetValue: cmdUtils.SetConfigOptionStringList,
Required: true,
},
{
Name: "consumer-group-id",
Usage: "Message Broker Consumer Group ID.",
OptType: types.String,
ConfigKey: &serveOpts.ConsumerGroupID,
Required: true,
},
}

messengerOptions := message.MessengerOptions{}
Expand Down Expand Up @@ -424,6 +448,14 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
}
serveOpts.AnchorPlatformAPIService = apAPIService

// Kafka (background)
kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID)
kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{})

Check failure on line 453 in cmd/serve.go

View workflow job for this annotation

GitHub Actions / check

Error return value of `kafkaEventManager.RegisterEventHandler` is not checked (errcheck)
defer kafkaEventManager.Close()

go events.Consume(ctx, kafkaEventManager)

Check failure on line 456 in cmd/serve.go

View workflow job for this annotation

GitHub Actions / check

Error return value of `events.Consume` is not checked (errcheck)
serveOpts.EventProducer = kafkaEventManager

// Starting Scheduler Service (background job) if enabled
if serveOpts.EnableScheduler {
log.Ctx(ctx).Info("Starting Scheduler Service...")
Expand Down
12 changes: 12 additions & 0 deletions cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"bytes"
"context"
"strings"
"sync"
"testing"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
Expand Down Expand Up @@ -120,6 +122,9 @@ func Test_serve(t *testing.T) {
EnableReCAPTCHA: true,
EnableScheduler: true,
EnableMultiTenantDB: false,
Brokers: []string{"kafka:9092"},
Topics: []string{"my-topic"},
ConsumerGroupID: "group-id",
}
var err error
serveOpts.AnchorPlatformAPIService, err = anchorplatform.NewAnchorPlatformAPIService(httpclient.DefaultClient(), serveOpts.AnchorPlatformBasePlatformURL, serveOpts.AnchorPlatformOutgoingJWTSecret)
Expand All @@ -141,6 +146,10 @@ func Test_serve(t *testing.T) {
require.NoError(t, err)
serveOpts.SMSMessengerClient = smsMessengerClient

kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID)
kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{})

Check failure on line 150 in cmd/serve_test.go

View workflow job for this annotation

GitHub Actions / check

Error return value of `kafkaEventManager.RegisterEventHandler` is not checked (errcheck)
serveOpts.EventProducer = kafkaEventManager

metricOptions := monitor.MetricOptions{
MetricType: monitor.MetricTypePrometheus,
Environment: "test",
Expand Down Expand Up @@ -213,6 +222,9 @@ func Test_serve(t *testing.T) {
t.Setenv("INSTANCE_NAME", serveOpts.InstanceName)
t.Setenv("ENABLE_SCHEDULER", "true")
t.Setenv("ENABLE_MULTITENANT_DB", "false")
t.Setenv("BROKERS", strings.Join(serveOpts.Brokers, ","))
t.Setenv("TOPICS", strings.Join(serveOpts.Topics, ","))
t.Setenv("CONSUMER_GROUP_ID", serveOpts.ConsumerGroupID)

// test & assert
rootCmd.SetArgs([]string{"--environment", "test", "serve", "--metrics-type", "PROMETHEUS"})
Expand Down
46 changes: 46 additions & 0 deletions cmd/utils/custom_set_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,49 @@ func SetConfigOptionURLString(co *config.ConfigOption) error {

return nil
}

func SetConfigOptionURLList(co *config.ConfigOption) error {
urlsStr := viper.GetString(co.Name)

if urlsStr == "" {
return fmt.Errorf("url list cannot be empty")
}

urls := strings.Split(urlsStr, ",")
for _, u := range urls {
_, err := url.ParseRequestURI(strings.TrimSpace(u))
if err != nil {
return fmt.Errorf("error parsing url: %w", err)
}
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}
*key = urls

return nil
}

func SetConfigOptionStringList(co *config.ConfigOption) error {
listStr := viper.GetString(co.Name)

if listStr == "" {
return fmt.Errorf("cannot be empty")
}

list := strings.Split(listStr, ",")
for i, el := range list {
list[i] = strings.TrimSpace(el)
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}

*key = list

return nil
}
74 changes: 74 additions & 0 deletions cmd/utils/custom_set_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,77 @@ func Test_SetConfigOptionURLString(t *testing.T) {
})
}
}

func Test_SetConfigOptionURLList(t *testing.T) {
opts := struct{ brokers []string }{}

co := config.ConfigOption{
Name: "brokers",
OptType: types.String,
CustomSetValue: SetConfigOptionURLList,
ConfigKey: &opts.brokers,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--brokers", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--brokers", "kafka:9092,localhost:9093,kafka://broker:9092"},
wantResult: []string{"kafka:9092", "localhost:9093", "kafka://broker:9092"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "kafka:9092,localhost:9093",
wantResult: []string{"kafka:9092", "localhost:9093"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.brokers = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}

func Test_SetConfigOptionStringList(t *testing.T) {
opts := struct{ topics []string }{}

co := config.ConfigOption{
Name: "topics",
OptType: types.String,
CustomSetValue: SetConfigOptionStringList,
ConfigKey: &opts.topics,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--topics", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--topics", "topic1, topic2,topic3"},
wantResult: []string{"topic1", "topic2", "topic3"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "topic1, topic2",
wantResult: []string{"topic1", "topic2"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.topics = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}
20 changes: 20 additions & 0 deletions dev/docker-compose-sdp-anchor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,28 @@ services:
SECRET_SEP10_SIGNING_SEED: ${SEP10_SIGNING_PRIVATE_KEY}
SECRET_SEP24_INTERACTIVE_URL_JWT_SECRET: jwt_secret_1234567890
SECRET_SEP24_MORE_INFO_URL_JWT_SECRET: jwt_secret_1234567890

kafka:
image: docker.io/bitnami/kafka:3.6
ports:
- "9094:9094"
volumes:
- "kafka-data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

volumes:
postgres-db:
driver: local
postgres-ap-db:
driver: local
kafka-data:
driver: local
23 changes: 15 additions & 8 deletions go.list
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4
github.com/go-gorp/gorp/v3 v3.1.0
github.com/go-kit/log v0.2.1
github.com/go-logfmt/logfmt v0.5.1
github.com/go-logr/logr v1.2.3
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab
github.com/go-playground/locales v0.14.0
github.com/go-playground/universal-translator v0.18.0
Expand All @@ -93,7 +94,7 @@ github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/golang/snappy v0.0.4
github.com/google/btree v1.0.0
github.com/google/go-cmp v0.5.9
github.com/google/go-cmp v0.6.0
github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5
github.com/google/martian v2.1.0+incompatible
github.com/google/martian/v3 v3.1.0
Expand Down Expand Up @@ -144,7 +145,7 @@ github.com/kataras/pio v0.0.11
github.com/kataras/sitemap v0.0.6
github.com/kataras/tunnel v0.0.4
github.com/kisielk/gotool v1.0.0
github.com/klauspost/compress v1.16.0
github.com/klauspost/compress v1.16.7
github.com/konsorten/go-windows-terminal-sequences v1.0.1
github.com/kr/fs v0.1.0
github.com/kr/pretty v0.3.1
Expand Down Expand Up @@ -185,11 +186,13 @@ github.com/nelsam/hel/v2 v2.3.3
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e
github.com/nyaruka/phonenumbers v1.1.8
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/onsi/ginkgo v1.12.0
github.com/onsi/ginkgo/v2 v2.4.0
github.com/onsi/gomega v1.23.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pelletier/go-toml v1.9.0
github.com/pelletier/go-toml/v2 v2.0.9
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/errors v0.11.4
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.1
Expand All @@ -208,7 +211,8 @@ github.com/russross/blackfriday/v2 v2.1.0
github.com/sagikazarmark/crypt v0.10.0
github.com/schollz/closestmatch v2.1.0+incompatible
github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2
github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca
github.com/segmentio/kafka-go v0.4.46
github.com/sergi/go-diff v1.2.0
github.com/shopspring/decimal v1.3.1
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/sirupsen/logrus v1.9.3
Expand All @@ -235,10 +239,13 @@ github.com/valyala/fasthttp v1.40.0
github.com/valyala/fasttemplate v1.2.2
github.com/vmihailenco/msgpack/v5 v5.3.5
github.com/vmihailenco/tagparser/v2 v2.0.0
github.com/xdg-go/pbkdf2 v1.0.0
github.com/xdg-go/scram v1.1.2
github.com/xdg-go/stringprep v1.0.4
github.com/xdrpp/goxdr v0.1.1
github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076
github.com/xeipuuv/gojsonreference v0.0.0-20150808065054-e02fc20de94c
github.com/xeipuuv/gojsonschema v0.0.0-20161231055540-f06f290571ce
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415
github.com/xeipuuv/gojsonschema v1.2.0
github.com/xhit/go-str2duration/v2 v2.1.0
github.com/yalp/jsonpath v0.0.0-20150812003900-31a79c7593bb
github.com/yosssi/ace v0.0.5
Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/rs/cors v1.9.0
github.com/rubenv/sql-migrate v1.5.2
github.com/segmentio/kafka-go v0.4.46
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.16.0
Expand All @@ -41,29 +42,37 @@ require (
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.23.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.9 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
Loading

0 comments on commit cf5c1bf

Please sign in to comment.