From cf5c1bf1ffece3da4bf641b9945f4f37120636c5 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Mon, 11 Dec 2023 10:08:27 -0300 Subject: [PATCH 01/11] feat: kafka setup --- cmd/serve.go | 32 ++++++++ cmd/serve_test.go | 12 +++ cmd/utils/custom_set_value.go | 46 ++++++++++++ cmd/utils/custom_set_value_test.go | 74 +++++++++++++++++++ dev/docker-compose-sdp-anchor.yml | 20 +++++ go.list | 23 ++++-- go.mod | 9 +++ go.sum | 66 +++++++++++++++-- internal/events/events.go | 74 +++++++++++++++++++ internal/events/events_test.go | 95 ++++++++++++++++++++++++ internal/events/handler.go | 39 ++++++++++ internal/events/kafka.go | 115 +++++++++++++++++++++++++++++ internal/events/kafka_test.go | 52 +++++++++++++ internal/serve/serve.go | 5 ++ 14 files changed, 646 insertions(+), 16 deletions(-) create mode 100644 internal/events/events.go create mode 100644 internal/events/events_test.go create mode 100644 internal/events/handler.go create mode 100644 internal/events/kafka.go create mode 100644 internal/events/kafka_test.go diff --git a/cmd/serve.go b/cmd/serve.go index d21e6d2b4..0455e0a84 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" "github.com/stellar/stellar-disbursement-platform-backend/internal/data" di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler" "github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler/jobs" @@ -309,6 +310,29 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ FlagDefault: true, Required: false, }, + { + Name: "brokers", + Usage: "List of Message Brokers Connection string comma separated.", + OptType: types.String, + ConfigKey: &serveOpts.Brokers, + CustomSetValue: cmdUtils.SetConfigOptionURLList, + Required: true, + }, + { + Name: "topics", + Usage: "List of Message Brokers Consumer Topics comma separated.", + OptType: types.String, + ConfigKey: &serveOpts.Topics, + CustomSetValue: cmdUtils.SetConfigOptionStringList, + Required: true, + }, + { + Name: "consumer-group-id", + Usage: "Message Broker Consumer Group ID.", + OptType: types.String, + ConfigKey: &serveOpts.ConsumerGroupID, + Required: true, + }, } messengerOptions := message.MessengerOptions{} @@ -424,6 +448,14 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ } serveOpts.AnchorPlatformAPIService = apAPIService + // Kafka (background) + kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) + kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + defer kafkaEventManager.Close() + + go events.Consume(ctx, kafkaEventManager) + serveOpts.EventProducer = kafkaEventManager + // Starting Scheduler Service (background job) if enabled if serveOpts.EnableScheduler { log.Ctx(ctx).Info("Starting Scheduler Service...") diff --git a/cmd/serve_test.go b/cmd/serve_test.go index a163ff8f8..9461875ab 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -3,6 +3,7 @@ package cmd import ( "bytes" "context" + "strings" "sync" "testing" @@ -13,6 +14,7 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler" @@ -120,6 +122,9 @@ func Test_serve(t *testing.T) { EnableReCAPTCHA: true, EnableScheduler: true, EnableMultiTenantDB: false, + Brokers: []string{"kafka:9092"}, + Topics: []string{"my-topic"}, + ConsumerGroupID: "group-id", } var err error serveOpts.AnchorPlatformAPIService, err = anchorplatform.NewAnchorPlatformAPIService(httpclient.DefaultClient(), serveOpts.AnchorPlatformBasePlatformURL, serveOpts.AnchorPlatformOutgoingJWTSecret) @@ -141,6 +146,10 @@ func Test_serve(t *testing.T) { require.NoError(t, err) serveOpts.SMSMessengerClient = smsMessengerClient + kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) + kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + serveOpts.EventProducer = kafkaEventManager + metricOptions := monitor.MetricOptions{ MetricType: monitor.MetricTypePrometheus, Environment: "test", @@ -213,6 +222,9 @@ func Test_serve(t *testing.T) { t.Setenv("INSTANCE_NAME", serveOpts.InstanceName) t.Setenv("ENABLE_SCHEDULER", "true") t.Setenv("ENABLE_MULTITENANT_DB", "false") + t.Setenv("BROKERS", strings.Join(serveOpts.Brokers, ",")) + t.Setenv("TOPICS", strings.Join(serveOpts.Topics, ",")) + t.Setenv("CONSUMER_GROUP_ID", serveOpts.ConsumerGroupID) // test & assert rootCmd.SetArgs([]string{"--environment", "test", "serve", "--metrics-type", "PROMETHEUS"}) diff --git a/cmd/utils/custom_set_value.go b/cmd/utils/custom_set_value.go index 2841f83fd..32796cf83 100644 --- a/cmd/utils/custom_set_value.go +++ b/cmd/utils/custom_set_value.go @@ -203,3 +203,49 @@ func SetConfigOptionURLString(co *config.ConfigOption) error { return nil } + +func SetConfigOptionURLList(co *config.ConfigOption) error { + urlsStr := viper.GetString(co.Name) + + if urlsStr == "" { + return fmt.Errorf("url list cannot be empty") + } + + urls := strings.Split(urlsStr, ",") + for _, u := range urls { + _, err := url.ParseRequestURI(strings.TrimSpace(u)) + if err != nil { + return fmt.Errorf("error parsing url: %w", err) + } + } + + key, ok := co.ConfigKey.(*[]string) + if !ok { + return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey) + } + *key = urls + + return nil +} + +func SetConfigOptionStringList(co *config.ConfigOption) error { + listStr := viper.GetString(co.Name) + + if listStr == "" { + return fmt.Errorf("cannot be empty") + } + + list := strings.Split(listStr, ",") + for i, el := range list { + list[i] = strings.TrimSpace(el) + } + + key, ok := co.ConfigKey.(*[]string) + if !ok { + return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey) + } + + *key = list + + return nil +} diff --git a/cmd/utils/custom_set_value_test.go b/cmd/utils/custom_set_value_test.go index 9d7d62438..1c55c9d3b 100644 --- a/cmd/utils/custom_set_value_test.go +++ b/cmd/utils/custom_set_value_test.go @@ -582,3 +582,77 @@ func Test_SetConfigOptionURLString(t *testing.T) { }) } } + +func Test_SetConfigOptionURLList(t *testing.T) { + opts := struct{ brokers []string }{} + + co := config.ConfigOption{ + Name: "brokers", + OptType: types.String, + CustomSetValue: SetConfigOptionURLList, + ConfigKey: &opts.brokers, + Required: false, + } + + testCases := []customSetterTestCase[[]string]{ + { + name: "returns an error if the list is empty", + args: []string{"--brokers", ""}, + wantErrContains: "cannot be empty", + }, + { + name: "🎉 handles string list successfully (from CLI args)", + args: []string{"--brokers", "kafka:9092,localhost:9093,kafka://broker:9092"}, + wantResult: []string{"kafka:9092", "localhost:9093", "kafka://broker:9092"}, + }, + { + name: "🎉 string list successfully (from ENV vars)", + envValue: "kafka:9092,localhost:9093", + wantResult: []string{"kafka:9092", "localhost:9093"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts.brokers = []string{} + customSetterTester[[]string](t, tc, co) + }) + } +} + +func Test_SetConfigOptionStringList(t *testing.T) { + opts := struct{ topics []string }{} + + co := config.ConfigOption{ + Name: "topics", + OptType: types.String, + CustomSetValue: SetConfigOptionStringList, + ConfigKey: &opts.topics, + Required: false, + } + + testCases := []customSetterTestCase[[]string]{ + { + name: "returns an error if the list is empty", + args: []string{"--topics", ""}, + wantErrContains: "cannot be empty", + }, + { + name: "🎉 handles string list successfully (from CLI args)", + args: []string{"--topics", "topic1, topic2,topic3"}, + wantResult: []string{"topic1", "topic2", "topic3"}, + }, + { + name: "🎉 string list successfully (from ENV vars)", + envValue: "topic1, topic2", + wantResult: []string{"topic1", "topic2"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts.topics = []string{} + customSetterTester[[]string](t, tc, co) + }) + } +} diff --git a/dev/docker-compose-sdp-anchor.yml b/dev/docker-compose-sdp-anchor.yml index eded1744b..3f2caf0e7 100644 --- a/dev/docker-compose-sdp-anchor.yml +++ b/dev/docker-compose-sdp-anchor.yml @@ -170,8 +170,28 @@ services: SECRET_SEP10_SIGNING_SEED: ${SEP10_SIGNING_PRIVATE_KEY} SECRET_SEP24_INTERACTIVE_URL_JWT_SECRET: jwt_secret_1234567890 SECRET_SEP24_MORE_INFO_URL_JWT_SECRET: jwt_secret_1234567890 + + kafka: + image: docker.io/bitnami/kafka:3.6 + ports: + - "9094:9094" + volumes: + - "kafka-data:/bitnami" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + volumes: postgres-db: driver: local postgres-ap-db: driver: local + kafka-data: + driver: local diff --git a/go.list b/go.list index 104adb63c..af2d8fd22 100644 --- a/go.list +++ b/go.list @@ -71,6 +71,7 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4 github.com/go-gorp/gorp/v3 v3.1.0 github.com/go-kit/log v0.2.1 github.com/go-logfmt/logfmt v0.5.1 +github.com/go-logr/logr v1.2.3 github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab github.com/go-playground/locales v0.14.0 github.com/go-playground/universal-translator v0.18.0 @@ -93,7 +94,7 @@ github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.3 github.com/golang/snappy v0.0.4 github.com/google/btree v1.0.0 -github.com/google/go-cmp v0.5.9 +github.com/google/go-cmp v0.6.0 github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 github.com/google/martian v2.1.0+incompatible github.com/google/martian/v3 v3.1.0 @@ -144,7 +145,7 @@ github.com/kataras/pio v0.0.11 github.com/kataras/sitemap v0.0.6 github.com/kataras/tunnel v0.0.4 github.com/kisielk/gotool v1.0.0 -github.com/klauspost/compress v1.16.0 +github.com/klauspost/compress v1.16.7 github.com/konsorten/go-windows-terminal-sequences v1.0.1 github.com/kr/fs v0.1.0 github.com/kr/pretty v0.3.1 @@ -185,11 +186,13 @@ github.com/nelsam/hel/v2 v2.3.3 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e github.com/nyaruka/phonenumbers v1.1.8 github.com/olekukonko/tablewriter v0.0.5 -github.com/onsi/ginkgo v1.7.0 -github.com/onsi/gomega v1.4.3 +github.com/onsi/ginkgo v1.12.0 +github.com/onsi/ginkgo/v2 v2.4.0 +github.com/onsi/gomega v1.23.0 github.com/opentracing/opentracing-go v1.1.0 github.com/pelletier/go-toml v1.9.0 github.com/pelletier/go-toml/v2 v2.0.9 +github.com/pierrec/lz4/v4 v4.1.18 github.com/pingcap/errors v0.11.4 github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.1 @@ -208,7 +211,8 @@ github.com/russross/blackfriday/v2 v2.1.0 github.com/sagikazarmark/crypt v0.10.0 github.com/schollz/closestmatch v2.1.0+incompatible github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 -github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca +github.com/segmentio/kafka-go v0.4.46 +github.com/sergi/go-diff v1.2.0 github.com/shopspring/decimal v1.3.1 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 github.com/sirupsen/logrus v1.9.3 @@ -235,10 +239,13 @@ github.com/valyala/fasthttp v1.40.0 github.com/valyala/fasttemplate v1.2.2 github.com/vmihailenco/msgpack/v5 v5.3.5 github.com/vmihailenco/tagparser/v2 v2.0.0 +github.com/xdg-go/pbkdf2 v1.0.0 +github.com/xdg-go/scram v1.1.2 +github.com/xdg-go/stringprep v1.0.4 github.com/xdrpp/goxdr v0.1.1 -github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 -github.com/xeipuuv/gojsonreference v0.0.0-20150808065054-e02fc20de94c -github.com/xeipuuv/gojsonschema v0.0.0-20161231055540-f06f290571ce +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 +github.com/xeipuuv/gojsonschema v1.2.0 github.com/xhit/go-str2duration/v2 v2.1.0 github.com/yalp/jsonpath v0.0.0-20150812003900-31a79c7593bb github.com/yosssi/ace v0.0.5 diff --git a/go.mod b/go.mod index 4d7e3b6a5..a7c799416 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/rs/cors v1.9.0 github.com/rubenv/sql-migrate v1.5.2 + github.com/segmentio/kafka-go v0.4.46 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/viper v1.16.0 @@ -41,22 +42,28 @@ require ( github.com/go-gorp/gorp/v3 v3.1.0 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/onsi/ginkgo v1.12.0 // indirect + github.com/onsi/gomega v1.23.0 // indirect github.com/pelletier/go-toml/v2 v2.0.9 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 // indirect + github.com/sergi/go-diff v1.2.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -64,6 +71,8 @@ require ( github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee // indirect github.com/stretchr/objx v0.5.1 // indirect github.com/subosito/gotenv v1.4.2 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index bca470715..d1ccf6e7f 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gavv/monotime v0.0.0-20161010190848-47d58efa6955 h1:gmtGRvSexPU4B1T/yYo0sLOKzER1YT+b4kPxPpm0Ty4= @@ -149,7 +150,8 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 h1:oERTZ1buOUYlpmKaqlO5fYmz8cZ1rYu5DieJzF4ZVmU= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -178,6 +180,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk= @@ -194,7 +197,9 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -230,10 +235,17 @@ github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db h1:eZgFHVkk9uOTaOQL github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nyaruka/phonenumbers v1.1.8 h1:mjFu85FeoH2Wy18aOMUvxqi1GgAqiQSJsa/cCC5yu2s= github.com/nyaruka/phonenumbers v1.1.8/go.mod h1:DC7jZd321FqUe+qWSNcHi10tyIyGNXGcNbfkPvdp1Vs= -github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= -github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= +github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -259,7 +271,10 @@ github.com/rubenv/sql-migrate v1.5.2/go.mod h1:H38GW8Vqf8F0Su5XignRyaRcbXbJunSWx github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 h1:S4OC0+OBKz6mJnzuHioeEat74PuQ4Sgvbf8eus695sc= github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2/go.mod h1:8zLRYR5npGjaOXgPSKat5+oOh+UHd8OdbS18iqX9F6Y= -github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca h1:oR/RycYTFTVXzND5r4FdsvbnBn0HJXSVeNAnwaTXRwk= +github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI= +github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= +github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= @@ -284,6 +299,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0= github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -298,10 +314,20 @@ github.com/twilio/twilio-go v1.11.0 h1:ixO2DfAV4c0Yza0Tom5F5ZZB8WUbigiFc9wD84vbY github.com/twilio/twilio-go v1.11.0/go.mod h1:tdnfQ5TjbewoAu4lf9bMsGvfuJ/QU9gYuv9yx3TSIXU= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xdrpp/goxdr v0.1.1 h1:E1B2c6E8eYhOVyd7yEpOyopzTPirUeF6mVOfXfGyJyc= -github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 h1:KM4T3G70MiR+JtqplcYkNVoNz7pDwYaBxWBXQK804So= -github.com/xeipuuv/gojsonreference v0.0.0-20150808065054-e02fc20de94c h1:XZWnr3bsDQWAZg4Ne+cPoXRPILrNlPNQfxBuwLl43is= -github.com/xeipuuv/gojsonschema v0.0.0-20161231055540-f06f290571ce h1:cVSRGH8cOveJNwFEEZLXtB+XMnRqKLjUP6V/ZFYQCXI= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/yalp/jsonpath v0.0.0-20150812003900-31a79c7593bb h1:06WAhQa+mYv7BiOk13B/ywyTlkoE/S7uu6TBKU6FHnE= github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d h1:yJIizrfO599ot2kQ6Af1enICnwBD3XoxgX3MrMwot2M= github.com/yudai/golcs v0.0.0-20150405163532-d1c525dea8ce h1:888GrqRxabUce7lj4OaoShPxodm3kXOMpSa85wdYzfY= @@ -325,6 +351,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -364,8 +391,10 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -399,6 +428,9 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -422,7 +454,9 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -433,6 +467,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -466,11 +501,17 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.14.0 h1:LGK9IlZ8T9jvdy6cTdfKUCltatMFOehAQo9SRC46UQ8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -480,7 +521,11 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -535,6 +580,7 @@ golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -633,17 +679,21 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tylerb/graceful.v1 v1.2.15 h1:1JmOyhKqAyX3BgTXMI84LwT6FOJ4tP2N9e2kwTCM0nQ= gopkg.in/tylerb/graceful.v1 v1.2.15/go.mod h1:yBhekWvR20ACXVObSSdD3u6S9DeSylanL2PAbAC/uJ8= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 000000000..2e58d7719 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,74 @@ +package events + +import ( + "context" + "errors" + "fmt" + + "github.com/stellar/go/support/log" +) + +var ( + ErrTopicRequired = errors.New("message topic is required") + ErrKeyRequired = errors.New("message key is required") + ErrTenantIDRequired = errors.New("message tenant ID is required") + ErrTypeRequired = errors.New("message type is required") + ErrDataRequired = errors.New("message data is required") +) + +type Message struct { + Topic string `json:"topic"` + Key string `json:"key"` + TenantID string `json:"tenant_id"` + Type string `json:"type"` + Data any `json:"data"` +} + +func (m Message) String() string { + return fmt.Sprintf("Topic: %s - Key: %s - Type: %s - Tenant ID: %s", m.Topic, m.Key, m.Type, m.TenantID) +} + +func (m Message) Validate() error { + if m.Topic == "" { + return ErrTopicRequired + } + + if m.Key == "" { + return ErrKeyRequired + } + + if m.TenantID == "" { + return ErrTenantIDRequired + } + + if m.Type == "" { + return ErrTypeRequired + } + + if m.Data == nil { + return ErrDataRequired + } + + return nil +} + +type Producer interface { + WriteMessages(ctx context.Context, messages ...Message) error + Close() error +} + +type Consumer interface { + RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error + ReadMessage(ctx context.Context) error + Close() error +} + +func Consume(ctx context.Context, consumer Consumer) error { + log.Ctx(ctx).Info("starting consuming messages...") + for { + if err := consumer.ReadMessage(ctx); err != nil { + log.Errorf("error consuming: %s", err.Error()) + return fmt.Errorf("consuming messages: %w", err) + } + } +} diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 000000000..6754910d5 --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,95 @@ +package events + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type MockConsumer struct { + mock.Mock +} + +var _ Consumer = new(MockConsumer) + +func (c *MockConsumer) ReadMessage(ctx context.Context) error { + args := c.Called(ctx) + return args.Error(0) +} + +func (c *MockConsumer) RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error { + args := c.Called(ctx, eventHandlers) + return args.Error(0) +} + +func (c *MockConsumer) Close() error { + args := c.Called() + return args.Error(0) +} + +func TestConsume(t *testing.T) { + ctx := context.Background() + consumer := &MockConsumer{} + + consumer. + On("ReadMessage", ctx). + Return(errors.New("unexpected error")). + Once(). + On("ReadMessage", ctx). + Return(nil) + + err := Consume(ctx, consumer) + assert.EqualError(t, err, "consuming messages: unexpected error") + + tick := time.Tick(time.Second * 1) + go func() { + err := Consume(ctx, consumer) + require.NoError(t, err) + }() + + <-tick + + consumer.AssertExpectations(t) +} + +func Test_Message_Validate(t *testing.T) { + m := Message{} + + err := m.Validate() + assert.ErrorIs(t, err, ErrTopicRequired) + + m.Topic = "test-topic" + err = m.Validate() + assert.ErrorIs(t, err, ErrKeyRequired) + + m.Key = "test-key" + err = m.Validate() + assert.ErrorIs(t, err, ErrTenantIDRequired) + + m.TenantID = "tenant-ID" + err = m.Validate() + assert.ErrorIs(t, err, ErrTypeRequired) + + m.Type = "test-type" + err = m.Validate() + assert.ErrorIs(t, err, ErrDataRequired) + + m.Data = "test" + err = m.Validate() + assert.NoError(t, err) + + m.Data = nil + m.Data = map[string]string{"test": "test"} + err = m.Validate() + assert.NoError(t, err) + + m.Data = nil + m.Data = struct{ Name string }{Name: "test"} + err = m.Validate() + assert.NoError(t, err) +} diff --git a/internal/events/handler.go b/internal/events/handler.go new file mode 100644 index 000000000..719960a39 --- /dev/null +++ b/internal/events/handler.go @@ -0,0 +1,39 @@ +package events + +import ( + "context" + "fmt" +) + +type EventHandler interface { + Name() string + CanHandleMessage(ctx context.Context, message *Message) bool + Handle(ctx context.Context, message *Message) error +} + +type PingPongRequest struct { + Message string `json:"message"` +} + +// PingPongEventHandler is a example of event handler +type PingPongEventHandler struct{} + +var _ EventHandler = new(PingPongEventHandler) + +func (h *PingPongEventHandler) Name() string { + return "PingPong.EventHandler" +} + +func (h *PingPongEventHandler) CanHandleMessage(ctx context.Context, message *Message) bool { + return message.Topic == "ping-pong" +} + +func (h *PingPongEventHandler) Handle(ctx context.Context, message *Message) error { + if message.Type == "ping" { + fmt.Println("pong") + } else { + fmt.Println("ping") + } + + return nil +} diff --git a/internal/events/kafka.go b/internal/events/kafka.go new file mode 100644 index 000000000..92759c6ec --- /dev/null +++ b/internal/events/kafka.go @@ -0,0 +1,115 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/segmentio/kafka-go" + "github.com/stellar/go/support/log" + "golang.org/x/exp/maps" +) + +type KafkaEventManager struct { + handlers []EventHandler + writer *kafka.Writer + reader *kafka.Reader +} + +func NewKafkaEventManager(brokers []string, consumerTopics []string, consumerGroupID string) (*KafkaEventManager, error) { + k := KafkaEventManager{} + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Balancer: &kafka.RoundRobin{}, + RequiredAcks: -1, + }) + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + GroupID: consumerGroupID, + GroupTopics: consumerTopics, + }) + + k.writer = writer + k.reader = reader + + return &k, nil +} + +// Implements Producer interface +var _ Producer = new(KafkaEventManager) + +// Implements Consumer interface +var _ Consumer = new(KafkaEventManager) + +func (k *KafkaEventManager) WriteMessages(ctx context.Context, messages ...Message) error { + kafkaMessages := make([]kafka.Message, 0, len(messages)) + for _, msg := range messages { + msgJSON, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("marshalling message: %w", err) + } + + kafkaMessages = append(kafkaMessages, kafka.Message{ + Topic: msg.Topic, + Key: []byte(msg.Key), + Value: msgJSON, + }) + } + + if err := k.writer.WriteMessages(ctx, kafkaMessages...); err != nil { + log.Ctx(ctx).Errorf("writing message on kafka: %s", err.Error()) + return fmt.Errorf("writing message on kafka: %w", err) + } + + return nil +} + +func (k *KafkaEventManager) RegisterEventHandler(ctx context.Context, handlers ...EventHandler) error { + ehMap := make(map[string]EventHandler, len(handlers)) + for _, handler := range handlers { + log.Ctx(ctx).Infof("registering event handler %s", handler.Name()) + ehMap[handler.Name()] = handler + } + k.handlers = maps.Values(ehMap) + return nil +} + +func (k *KafkaEventManager) ReadMessage(ctx context.Context) error { + log.Ctx(ctx).Info("fetching messages from kafka") + kafkaMessage, err := k.reader.FetchMessage(ctx) + if err != nil { + log.Ctx(ctx).Errorf("fetching message from kafka: %s", err.Error()) + return fmt.Errorf("fetching message from kafka: %w", err) + } + + log.Ctx(ctx).Info("unmarshalling new message") + var msg Message + if err = json.Unmarshal(kafkaMessage.Value, &msg); err != nil { + return fmt.Errorf("unmarshaling message: %w", err) + } + + log.Ctx(ctx).Infof("new message being processed: %s", msg.String()) + for _, handler := range k.handlers { + if handler.CanHandleMessage(ctx, &msg) { + if err = handler.Handle(ctx, &msg); err != nil { + return fmt.Errorf("handler %s errored when handling message: %w", handler.Name(), err) + } + } + } + + // Acknowledgement + if err = k.reader.CommitMessages(ctx, kafkaMessage); err != nil { + return fmt.Errorf("committing message: %w", err) + } + + return nil +} + +func (k *KafkaEventManager) Close() error { + log.Info("closing kafka producer and consumer") + defer k.writer.Close() + defer k.reader.Close() + return nil +} diff --git a/internal/events/kafka_test.go b/internal/events/kafka_test.go new file mode 100644 index 000000000..cc2c48f73 --- /dev/null +++ b/internal/events/kafka_test.go @@ -0,0 +1,52 @@ +package events + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type MockEventHandler struct { + mock.Mock +} + +var _ EventHandler = new(MockEventHandler) + +func (h *MockEventHandler) Name() string { + return "MockEventHandler" +} + +func (h *MockEventHandler) CanHandleMessage(ctx context.Context, message *Message) bool { + args := h.Called(ctx, message) + return args.Bool(0) +} + +func (h *MockEventHandler) Handle(ctx context.Context, message *Message) error { + args := h.Called(ctx, message) + return args.Error(0) +} + +func Test_KafkaEventManager_RegisterEventHandler(t *testing.T) { + ctx := context.Background() + + t.Run("register handler successfully", func(t *testing.T) { + k := KafkaEventManager{} + assert.Empty(t, k.handlers) + eh := MockEventHandler{} + err := k.RegisterEventHandler(ctx, &eh) + require.NoError(t, err) + assert.Equal(t, []EventHandler{&eh}, k.handlers) + }) + + t.Run("no handler duplicated", func(t *testing.T) { + k := KafkaEventManager{} + assert.Empty(t, k.handlers) + eh := MockEventHandler{} + err := k.RegisterEventHandler(ctx, &eh, &eh) + require.NoError(t, err) + assert.Equal(t, []EventHandler{&eh}, k.handlers) + }) +} diff --git a/internal/serve/serve.go b/internal/serve/serve.go index 0ab823f8e..1d51728c9 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -20,6 +20,7 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" "github.com/stellar/stellar-disbursement-platform-backend/internal/data" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/serve/httpclient" @@ -90,6 +91,10 @@ type ServeOptions struct { EnableMultiTenantDB bool tenantManager tenant.ManagerInterface tenantRouter db.DataSourceRouter + EventProducer events.Producer + Brokers []string + Topics []string + ConsumerGroupID string } // SetupDependencies uses the serve options to setup the dependencies for the server. From 6b951c13289e766f3239d1353ef2b00a1359d81e Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Mon, 11 Dec 2023 10:32:15 -0300 Subject: [PATCH 02/11] lint: fix lint errors --- cmd/serve.go | 18 +++++++++++++++--- cmd/serve_test.go | 6 ++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 0455e0a84..eae8487d5 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -449,11 +449,23 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.AnchorPlatformAPIService = apAPIService // Kafka (background) - kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) - kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + kafkaEventManager, err := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) + if err != nil { + log.Ctx(ctx).Fatalf("error creating Kafka Event Manager: %v", err) + } defer kafkaEventManager.Close() - go events.Consume(ctx, kafkaEventManager) + err = kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + if err != nil { + log.Ctx(ctx).Fatalf("error creating registering handlers: %v", err) + } + + go func() { + err := events.Consume(ctx, kafkaEventManager) + if err != nil { + log.Ctx(ctx).Fatalf("error consuming events: %v", err) + } + }() serveOpts.EventProducer = kafkaEventManager // Starting Scheduler Service (background job) if enabled diff --git a/cmd/serve_test.go b/cmd/serve_test.go index 9461875ab..bc5769932 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -146,8 +146,10 @@ func Test_serve(t *testing.T) { require.NoError(t, err) serveOpts.SMSMessengerClient = smsMessengerClient - kafkaEventManager, _ := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) - kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + kafkaEventManager, err := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) + require.NoError(t, err) + err = kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + require.NoError(t, err) serveOpts.EventProducer = kafkaEventManager metricOptions := monitor.MetricOptions{ From 2042d20c09a7c1178b0eaf63f04d160146df5ea9 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Mon, 11 Dec 2023 15:42:02 -0300 Subject: [PATCH 03/11] feat: create a new dependency injection for kafka --- cmd/serve.go | 7 +- cmd/serve_test.go | 4 +- internal/dependencyinjection/event_manager.go | 48 ++++++++++ .../dependencyinjection/event_manager_test.go | 95 +++++++++++++++++++ internal/events/events.go | 6 ++ 5 files changed, 151 insertions(+), 9 deletions(-) create mode 100644 internal/dependencyinjection/event_manager.go create mode 100644 internal/dependencyinjection/event_manager_test.go diff --git a/cmd/serve.go b/cmd/serve.go index eae8487d5..760fea059 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -449,17 +449,12 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.AnchorPlatformAPIService = apAPIService // Kafka (background) - kafkaEventManager, err := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) + kafkaEventManager, err := di.NewKafkaEventManager(ctx, serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID, &events.PingPongEventHandler{}) if err != nil { log.Ctx(ctx).Fatalf("error creating Kafka Event Manager: %v", err) } defer kafkaEventManager.Close() - err = kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) - if err != nil { - log.Ctx(ctx).Fatalf("error creating registering handlers: %v", err) - } - go func() { err := events.Consume(ctx, kafkaEventManager) if err != nil { diff --git a/cmd/serve_test.go b/cmd/serve_test.go index bc5769932..d8cf689ca 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -146,9 +146,7 @@ func Test_serve(t *testing.T) { require.NoError(t, err) serveOpts.SMSMessengerClient = smsMessengerClient - kafkaEventManager, err := events.NewKafkaEventManager(serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID) - require.NoError(t, err) - err = kafkaEventManager.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + kafkaEventManager, err := di.NewKafkaEventManager(ctx, serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID, &events.PingPongEventHandler{}) require.NoError(t, err) serveOpts.EventProducer = kafkaEventManager diff --git a/internal/dependencyinjection/event_manager.go b/internal/dependencyinjection/event_manager.go new file mode 100644 index 000000000..4729abb9d --- /dev/null +++ b/internal/dependencyinjection/event_manager.go @@ -0,0 +1,48 @@ +package dependencyinjection + +import ( + "context" + "fmt" + + "github.com/stellar/go/support/log" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" +) + +const kafkaEventManagerInstanceName = "kafka_event_manager_instance_name" + +func NewKafkaEventManager(ctx context.Context, brokers []string, consumerTopics []string, consumerGroupID string, eventHandlers ...events.EventHandler) (*events.KafkaEventManager, error) { + if len(brokers) == 0 { + return nil, fmt.Errorf("brokers cannot be empty") + } + + if len(consumerTopics) == 0 { + return nil, fmt.Errorf("consumer topics cannot be empty") + } + + if consumerGroupID == "" { + return nil, fmt.Errorf("consumer group ID cannot be empty") + } + + if instance, ok := dependenciesStoreMap[kafkaEventManagerInstanceName]; ok { + if kafkaEventManager, ok := instance.(*events.KafkaEventManager); ok { + return kafkaEventManager, nil + } + return nil, fmt.Errorf("trying to cast pre-existing Kafka Event Manager for dependency injection") + } + + // Setup Kafka Event Manager + log.Infof("⚙️ Setting Kafka Event Manager") + kafkaEventManager, err := events.NewKafkaEventManager(brokers, consumerTopics, consumerGroupID) + if err != nil { + return nil, fmt.Errorf("creating Kafka Event Manager: %w", err) + } + + err = kafkaEventManager.RegisterEventHandler(ctx, eventHandlers...) + if err != nil { + return nil, fmt.Errorf("registering event handlers: %w", err) + } + + setInstance(kafkaEventManagerInstanceName, kafkaEventManager) + + return kafkaEventManager, nil +} diff --git a/internal/dependencyinjection/event_manager_test.go b/internal/dependencyinjection/event_manager_test.go new file mode 100644 index 000000000..c8eb95c3d --- /dev/null +++ b/internal/dependencyinjection/event_manager_test.go @@ -0,0 +1,95 @@ +package dependencyinjection + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_NewKafkaEventManager(t *testing.T) { + testingCases := []struct { + name string + brokers []string + topics []string + consumerGroupID string + wantErrContains string + }{ + { + name: "return an error if brokers is empty", + brokers: []string{}, + topics: []string{}, + consumerGroupID: "", + wantErrContains: "brokers cannot be empty", + }, + { + name: "return an error if consumer topics is empty", + brokers: []string{"kafka:9092"}, + topics: []string{}, + consumerGroupID: "", + wantErrContains: "consumer topics cannot be empty", + }, + { + name: "return an error if consumer group ID is empty", + brokers: []string{"kafka:9092"}, + topics: []string{"my-topic"}, + consumerGroupID: "", + wantErrContains: "consumer group ID cannot be empty", + }, + { + name: "🎉 successfully creates a new instance if none exist before", + brokers: []string{"kafka:9092"}, + topics: []string{"my-topic"}, + consumerGroupID: "group-id", + wantErrContains: "", + }, + } + ctx := context.Background() + + for _, tc := range testingCases { + t.Run(tc.name, func(t *testing.T) { + defer ClearInstancesTestHelper(t) + + gotResult, err := NewKafkaEventManager(ctx, tc.brokers, tc.topics, tc.consumerGroupID) + if tc.wantErrContains != "" { + require.ErrorContains(t, err, tc.wantErrContains) + require.Nil(t, gotResult) + } else { + require.NoError(t, err) + require.NotNil(t, gotResult) + } + }) + } +} + +func Test_NewKafkaEventManager_existingInstanceIsReturned(t *testing.T) { + ctx := context.Background() + brokers := []string{"kafka:9092"} + topics := []string{"my-topic"} + consumerGroupID := "group-id" + + defer ClearInstancesTestHelper(t) + + // STEP 1: assert that the instance is nil + _, ok := dependenciesStoreMap[kafkaEventManagerInstanceName] + require.False(t, ok) + + // STEP 2: create a new instance + kafkaEventManager1, err := NewKafkaEventManager(ctx, brokers, topics, consumerGroupID) + require.NoError(t, err) + require.NotNil(t, kafkaEventManager1) + + // STEP 3: assert that the instance is not nil + storedKafkaEventManager, ok := dependenciesStoreMap[kafkaEventManagerInstanceName] + require.True(t, ok) + require.NotNil(t, storedKafkaEventManager) + require.Same(t, kafkaEventManager1, storedKafkaEventManager) + + // STEP 4: create a new instance + kafkaEventManager2, err := NewKafkaEventManager(ctx, brokers, topics, consumerGroupID) + require.NoError(t, err) + require.NotNil(t, kafkaEventManager2) + + // STEP 5: assert that the returned object is the same as the stored one + require.Same(t, kafkaEventManager1, kafkaEventManager2) +} diff --git a/internal/events/events.go b/internal/events/events.go index 2e58d7719..21aa5df95 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "github.com/stellar/go/support/log" ) @@ -67,8 +68,13 @@ func Consume(ctx context.Context, consumer Consumer) error { log.Ctx(ctx).Info("starting consuming messages...") for { if err := consumer.ReadMessage(ctx); err != nil { + if errors.Is(err, io.EOF) { + log.Ctx(ctx).Warn("message broker returned EOF") + break + } log.Errorf("error consuming: %s", err.Error()) return fmt.Errorf("consuming messages: %w", err) } } + return nil } From 90c18a35c2383bd3bc700f326c21b0e3d33c6fd6 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Wed, 13 Dec 2023 10:15:14 -0300 Subject: [PATCH 04/11] refactor: improve error logging and error reporting --- internal/events/events.go | 9 ++++----- internal/events/events_test.go | 22 ++++++++++++++++------ internal/events/handler.go | 6 ++---- internal/events/kafka.go | 5 +---- internal/events/kafka_test.go | 5 ++--- 5 files changed, 25 insertions(+), 22 deletions(-) diff --git a/internal/events/events.go b/internal/events/events.go index 21aa5df95..81abafb8f 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -7,6 +7,7 @@ import ( "io" "github.com/stellar/go/support/log" + "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" ) var ( @@ -64,17 +65,15 @@ type Consumer interface { Close() error } -func Consume(ctx context.Context, consumer Consumer) error { +func Consume(ctx context.Context, consumer Consumer, crashTracker crashtracker.CrashTrackerClient) { log.Ctx(ctx).Info("starting consuming messages...") for { if err := consumer.ReadMessage(ctx); err != nil { if errors.Is(err, io.EOF) { - log.Ctx(ctx).Warn("message broker returned EOF") + log.Ctx(ctx).Warn("message broker returned EOF") // This is an end state break } - log.Errorf("error consuming: %s", err.Error()) - return fmt.Errorf("consuming messages: %w", err) + crashTracker.LogAndReportErrors(ctx, err, "consuming messages") } } - return nil } diff --git a/internal/events/events_test.go b/internal/events/events_test.go index 6754910d5..b067722a5 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -3,12 +3,13 @@ package events import ( "context" "errors" + "io" "testing" "time" + "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) type MockConsumer struct { @@ -35,26 +36,35 @@ func (c *MockConsumer) Close() error { func TestConsume(t *testing.T) { ctx := context.Background() consumer := &MockConsumer{} + crashTracker := &crashtracker.MockCrashTrackerClient{} + unexpectedErr := errors.New("unexpected error") consumer. On("ReadMessage", ctx). - Return(errors.New("unexpected error")). + Return(unexpectedErr). + Once(). + On("ReadMessage", ctx). + Return(io.EOF). Once(). On("ReadMessage", ctx). Return(nil) - err := Consume(ctx, consumer) - assert.EqualError(t, err, "consuming messages: unexpected error") + crashTracker. + On("LogAndReportErrors", ctx, unexpectedErr, "consuming messages"). + Return(). + Once() + + Consume(ctx, consumer, crashTracker) tick := time.Tick(time.Second * 1) go func() { - err := Consume(ctx, consumer) - require.NoError(t, err) + Consume(ctx, consumer, crashTracker) }() <-tick consumer.AssertExpectations(t) + crashTracker.AssertExpectations(t) } func Test_Message_Validate(t *testing.T) { diff --git a/internal/events/handler.go b/internal/events/handler.go index 719960a39..b14fccd42 100644 --- a/internal/events/handler.go +++ b/internal/events/handler.go @@ -8,7 +8,7 @@ import ( type EventHandler interface { Name() string CanHandleMessage(ctx context.Context, message *Message) bool - Handle(ctx context.Context, message *Message) error + Handle(ctx context.Context, message *Message) } type PingPongRequest struct { @@ -28,12 +28,10 @@ func (h *PingPongEventHandler) CanHandleMessage(ctx context.Context, message *Me return message.Topic == "ping-pong" } -func (h *PingPongEventHandler) Handle(ctx context.Context, message *Message) error { +func (h *PingPongEventHandler) Handle(ctx context.Context, message *Message) { if message.Type == "ping" { fmt.Println("pong") } else { fmt.Println("ping") } - - return nil } diff --git a/internal/events/kafka.go b/internal/events/kafka.go index 92759c6ec..99e9e1edd 100644 --- a/internal/events/kafka.go +++ b/internal/events/kafka.go @@ -80,7 +80,6 @@ func (k *KafkaEventManager) ReadMessage(ctx context.Context) error { log.Ctx(ctx).Info("fetching messages from kafka") kafkaMessage, err := k.reader.FetchMessage(ctx) if err != nil { - log.Ctx(ctx).Errorf("fetching message from kafka: %s", err.Error()) return fmt.Errorf("fetching message from kafka: %w", err) } @@ -93,9 +92,7 @@ func (k *KafkaEventManager) ReadMessage(ctx context.Context) error { log.Ctx(ctx).Infof("new message being processed: %s", msg.String()) for _, handler := range k.handlers { if handler.CanHandleMessage(ctx, &msg) { - if err = handler.Handle(ctx, &msg); err != nil { - return fmt.Errorf("handler %s errored when handling message: %w", handler.Name(), err) - } + handler.Handle(ctx, &msg) } } diff --git a/internal/events/kafka_test.go b/internal/events/kafka_test.go index cc2c48f73..fb56a587a 100644 --- a/internal/events/kafka_test.go +++ b/internal/events/kafka_test.go @@ -24,9 +24,8 @@ func (h *MockEventHandler) CanHandleMessage(ctx context.Context, message *Messag return args.Bool(0) } -func (h *MockEventHandler) Handle(ctx context.Context, message *Message) error { - args := h.Called(ctx, message) - return args.Error(0) +func (h *MockEventHandler) Handle(ctx context.Context, message *Message) { + h.Called(ctx, message) } func Test_KafkaEventManager_RegisterEventHandler(t *testing.T) { From ff7e406d71f5ace0de5dc945def2384674a42a45 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Wed, 13 Dec 2023 10:16:33 -0300 Subject: [PATCH 05/11] feat: add conduktor platform to facilitate kakfa tests --- dev/docker-compose-sdp-anchor.yml | 45 +++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/dev/docker-compose-sdp-anchor.yml b/dev/docker-compose-sdp-anchor.yml index 3f2caf0e7..100d556c6 100644 --- a/dev/docker-compose-sdp-anchor.yml +++ b/dev/docker-compose-sdp-anchor.yml @@ -43,6 +43,9 @@ services: CORS_ALLOWED_ORIGINS: "*" ENABLE_MFA: "false" ENABLE_RECAPTCHA: "false" + BROKERS: "kafka:9092" + TOPICS: "ping-pong" + CONSUMER_GROUP_ID: "group-id" # multi-tenant INSTANCE_NAME: "SDP Testnet on Docker" @@ -74,6 +77,7 @@ services: ./stellar-disbursement-platform serve depends_on: - db + - kafka db-anchor-platform: container_name: anchor-platform-postgres-db @@ -188,6 +192,43 @@ services: - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + db-conduktor: + image: postgres:14 + hostname: postgresql + volumes: + - db-conduktor-data:/var/lib/postgresql/data + environment: + POSTGRES_DB: "conduktor-platform" + POSTGRES_USER: "conduktor" + POSTGRES_PASSWORD: "change_me" + POSTGRES_HOST_AUTH_METHOD: "scram-sha-256" + + conduktor-platform: + image: conduktor/conduktor-platform:1.19.2 + depends_on: + - db-conduktor + ports: + - "9090:8080" + volumes: + - conduktor-platform-data:/var/conduktor + environment: + CDK_DATABASE_URL: "postgresql://conduktor:change_me@db-conduktor:5432/conduktor-platform" + CDK_MONITORING_CORTEX-URL: http://conduktor-monitoring:9009/ + CDK_MONITORING_ALERT-MANAGER-URL: http://conduktor-monitoring:9010/ + CDK_MONITORING_CALLBACK-URL: http://conduktor-platform:9090/monitoring/api/ + CDK_MONITORING_NOTIFICATIONS-CALLBACK-URL: http://localhost:9090 + healthcheck: + test: curl -f http://localhost:9090/platform/api/modules/health/live || exit 1 + interval: 10s + start_period: 10s + timeout: 5s + retries: 3 + + conduktor-monitoring: + image: conduktor/conduktor-platform-cortex:1.19.2 + environment: + CDK_CONSOLE-URL: "http://conduktor-platform:9090" + volumes: postgres-db: driver: local @@ -195,3 +236,7 @@ volumes: driver: local kafka-data: driver: local + db-conduktor-data: + driver: local + conduktor-platform-data: + driver: local From f32b7586fea76ccaf53b7ec2ac42e3c79c2357fd Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Wed, 13 Dec 2023 11:58:47 -0300 Subject: [PATCH 06/11] fix: kafka writer instantiation --- internal/events/kafka.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/events/kafka.go b/internal/events/kafka.go index 99e9e1edd..1f4f330c6 100644 --- a/internal/events/kafka.go +++ b/internal/events/kafka.go @@ -19,11 +19,11 @@ type KafkaEventManager struct { func NewKafkaEventManager(brokers []string, consumerTopics []string, consumerGroupID string) (*KafkaEventManager, error) { k := KafkaEventManager{} - writer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, + writer := kafka.Writer{ + Addr: kafka.TCP(brokers...), Balancer: &kafka.RoundRobin{}, RequiredAcks: -1, - }) + } reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, @@ -31,7 +31,7 @@ func NewKafkaEventManager(brokers []string, consumerTopics []string, consumerGro GroupTopics: consumerTopics, }) - k.writer = writer + k.writer = &writer k.reader = reader return &k, nil From 87c250d9d3feb26893f037c7735c336056f96743 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Wed, 13 Dec 2023 17:24:43 -0300 Subject: [PATCH 07/11] refactor: separate kafka consumer from kafka producer --- cmd/serve.go | 35 ++++----- cmd/serve_test.go | 12 +--- internal/dependencyinjection/event_manager.go | 38 +++------- .../dependencyinjection/event_manager_test.go | 46 ++++-------- internal/events/kafka.go | 71 +++++++++++-------- internal/events/kafka_test.go | 6 +- internal/serve/serve.go | 3 - 7 files changed, 83 insertions(+), 128 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index 760fea059..1d7352a27 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -25,6 +25,11 @@ import ( serveadmin "github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/serve" ) +var ( + brokers []string + consumerGroupID string +) + type ServeCommand struct{} type ServerServiceInterface interface { @@ -314,23 +319,15 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ Name: "brokers", Usage: "List of Message Brokers Connection string comma separated.", OptType: types.String, - ConfigKey: &serveOpts.Brokers, + ConfigKey: &brokers, CustomSetValue: cmdUtils.SetConfigOptionURLList, Required: true, }, - { - Name: "topics", - Usage: "List of Message Brokers Consumer Topics comma separated.", - OptType: types.String, - ConfigKey: &serveOpts.Topics, - CustomSetValue: cmdUtils.SetConfigOptionStringList, - Required: true, - }, { Name: "consumer-group-id", Usage: "Message Broker Consumer Group ID.", OptType: types.String, - ConfigKey: &serveOpts.ConsumerGroupID, + ConfigKey: &consumerGroupID, Required: true, }, } @@ -449,19 +446,17 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.AnchorPlatformAPIService = apAPIService // Kafka (background) - kafkaEventManager, err := di.NewKafkaEventManager(ctx, serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID, &events.PingPongEventHandler{}) + kafkaProducer, err := di.NewKafkaProducer(ctx, brokers) if err != nil { - log.Ctx(ctx).Fatalf("error creating Kafka Event Manager: %v", err) + log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err) } - defer kafkaEventManager.Close() + defer kafkaProducer.Close() + serveOpts.EventProducer = kafkaProducer - go func() { - err := events.Consume(ctx, kafkaEventManager) - if err != nil { - log.Ctx(ctx).Fatalf("error consuming events: %v", err) - } - }() - serveOpts.EventProducer = kafkaEventManager + // TODO: remove this example when start implementing the actual consumers + pingPongConsumer := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID) + defer pingPongConsumer.Close() + go events.Consume(ctx, pingPongConsumer, crashTrackerClient) // Starting Scheduler Service (background job) if enabled if serveOpts.EnableScheduler { diff --git a/cmd/serve_test.go b/cmd/serve_test.go index d8cf689ca..4814cf67d 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -3,7 +3,6 @@ package cmd import ( "bytes" "context" - "strings" "sync" "testing" @@ -14,7 +13,6 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection" - "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler" @@ -122,9 +120,6 @@ func Test_serve(t *testing.T) { EnableReCAPTCHA: true, EnableScheduler: true, EnableMultiTenantDB: false, - Brokers: []string{"kafka:9092"}, - Topics: []string{"my-topic"}, - ConsumerGroupID: "group-id", } var err error serveOpts.AnchorPlatformAPIService, err = anchorplatform.NewAnchorPlatformAPIService(httpclient.DefaultClient(), serveOpts.AnchorPlatformBasePlatformURL, serveOpts.AnchorPlatformOutgoingJWTSecret) @@ -146,7 +141,7 @@ func Test_serve(t *testing.T) { require.NoError(t, err) serveOpts.SMSMessengerClient = smsMessengerClient - kafkaEventManager, err := di.NewKafkaEventManager(ctx, serveOpts.Brokers, serveOpts.Topics, serveOpts.ConsumerGroupID, &events.PingPongEventHandler{}) + kafkaEventManager, err := di.NewKafkaProducer(ctx, []string{"kafka:9092"}) require.NoError(t, err) serveOpts.EventProducer = kafkaEventManager @@ -222,9 +217,8 @@ func Test_serve(t *testing.T) { t.Setenv("INSTANCE_NAME", serveOpts.InstanceName) t.Setenv("ENABLE_SCHEDULER", "true") t.Setenv("ENABLE_MULTITENANT_DB", "false") - t.Setenv("BROKERS", strings.Join(serveOpts.Brokers, ",")) - t.Setenv("TOPICS", strings.Join(serveOpts.Topics, ",")) - t.Setenv("CONSUMER_GROUP_ID", serveOpts.ConsumerGroupID) + t.Setenv("BROKERS", "kafka:9092") + t.Setenv("CONSUMER_GROUP_ID", "group-id") // test & assert rootCmd.SetArgs([]string{"--environment", "test", "serve", "--metrics-type", "PROMETHEUS"}) diff --git a/internal/dependencyinjection/event_manager.go b/internal/dependencyinjection/event_manager.go index 4729abb9d..d6249fc3a 100644 --- a/internal/dependencyinjection/event_manager.go +++ b/internal/dependencyinjection/event_manager.go @@ -8,41 +8,23 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/events" ) -const kafkaEventManagerInstanceName = "kafka_event_manager_instance_name" +const kafkaProducerInstanceName = "kafka_producer_instance_name" -func NewKafkaEventManager(ctx context.Context, brokers []string, consumerTopics []string, consumerGroupID string, eventHandlers ...events.EventHandler) (*events.KafkaEventManager, error) { +func NewKafkaProducer(ctx context.Context, brokers []string) (*events.KafkaProducer, error) { if len(brokers) == 0 { return nil, fmt.Errorf("brokers cannot be empty") } - if len(consumerTopics) == 0 { - return nil, fmt.Errorf("consumer topics cannot be empty") - } - - if consumerGroupID == "" { - return nil, fmt.Errorf("consumer group ID cannot be empty") - } - - if instance, ok := dependenciesStoreMap[kafkaEventManagerInstanceName]; ok { - if kafkaEventManager, ok := instance.(*events.KafkaEventManager); ok { - return kafkaEventManager, nil + if instance, ok := dependenciesStoreMap[kafkaProducerInstanceName]; ok { + if kafkaProducer, ok := instance.(*events.KafkaProducer); ok { + return kafkaProducer, nil } - return nil, fmt.Errorf("trying to cast pre-existing Kafka Event Manager for dependency injection") + return nil, fmt.Errorf("trying to cast pre-existing Kafka Producer for dependency injection") } // Setup Kafka Event Manager - log.Infof("⚙️ Setting Kafka Event Manager") - kafkaEventManager, err := events.NewKafkaEventManager(brokers, consumerTopics, consumerGroupID) - if err != nil { - return nil, fmt.Errorf("creating Kafka Event Manager: %w", err) - } - - err = kafkaEventManager.RegisterEventHandler(ctx, eventHandlers...) - if err != nil { - return nil, fmt.Errorf("registering event handlers: %w", err) - } - - setInstance(kafkaEventManagerInstanceName, kafkaEventManager) - - return kafkaEventManager, nil + log.Infof("⚙️ Setting Kafka Producer") + kafkaProducer := events.NewKafkaProducer(brokers) + setInstance(kafkaProducerInstanceName, kafkaProducer) + return kafkaProducer, nil } diff --git a/internal/dependencyinjection/event_manager_test.go b/internal/dependencyinjection/event_manager_test.go index c8eb95c3d..71711cc01 100644 --- a/internal/dependencyinjection/event_manager_test.go +++ b/internal/dependencyinjection/event_manager_test.go @@ -7,40 +7,20 @@ import ( "github.com/stretchr/testify/require" ) -func Test_NewKafkaEventManager(t *testing.T) { +func Test_NewKafkaProducer(t *testing.T) { testingCases := []struct { name string brokers []string - topics []string - consumerGroupID string wantErrContains string }{ { name: "return an error if brokers is empty", brokers: []string{}, - topics: []string{}, - consumerGroupID: "", wantErrContains: "brokers cannot be empty", }, - { - name: "return an error if consumer topics is empty", - brokers: []string{"kafka:9092"}, - topics: []string{}, - consumerGroupID: "", - wantErrContains: "consumer topics cannot be empty", - }, - { - name: "return an error if consumer group ID is empty", - brokers: []string{"kafka:9092"}, - topics: []string{"my-topic"}, - consumerGroupID: "", - wantErrContains: "consumer group ID cannot be empty", - }, { name: "🎉 successfully creates a new instance if none exist before", brokers: []string{"kafka:9092"}, - topics: []string{"my-topic"}, - consumerGroupID: "group-id", wantErrContains: "", }, } @@ -50,7 +30,7 @@ func Test_NewKafkaEventManager(t *testing.T) { t.Run(tc.name, func(t *testing.T) { defer ClearInstancesTestHelper(t) - gotResult, err := NewKafkaEventManager(ctx, tc.brokers, tc.topics, tc.consumerGroupID) + gotResult, err := NewKafkaProducer(ctx, tc.brokers) if tc.wantErrContains != "" { require.ErrorContains(t, err, tc.wantErrContains) require.Nil(t, gotResult) @@ -62,34 +42,32 @@ func Test_NewKafkaEventManager(t *testing.T) { } } -func Test_NewKafkaEventManager_existingInstanceIsReturned(t *testing.T) { +func Test_NewKafkaProducer_existingInstanceIsReturned(t *testing.T) { ctx := context.Background() brokers := []string{"kafka:9092"} - topics := []string{"my-topic"} - consumerGroupID := "group-id" defer ClearInstancesTestHelper(t) // STEP 1: assert that the instance is nil - _, ok := dependenciesStoreMap[kafkaEventManagerInstanceName] + _, ok := dependenciesStoreMap[kafkaProducerInstanceName] require.False(t, ok) // STEP 2: create a new instance - kafkaEventManager1, err := NewKafkaEventManager(ctx, brokers, topics, consumerGroupID) + kafkaProducer1, err := NewKafkaProducer(ctx, brokers) require.NoError(t, err) - require.NotNil(t, kafkaEventManager1) + require.NotNil(t, kafkaProducer1) // STEP 3: assert that the instance is not nil - storedKafkaEventManager, ok := dependenciesStoreMap[kafkaEventManagerInstanceName] + storedKafkaProducer, ok := dependenciesStoreMap[kafkaProducerInstanceName] require.True(t, ok) - require.NotNil(t, storedKafkaEventManager) - require.Same(t, kafkaEventManager1, storedKafkaEventManager) + require.NotNil(t, storedKafkaProducer) + require.Same(t, kafkaProducer1, storedKafkaProducer) // STEP 4: create a new instance - kafkaEventManager2, err := NewKafkaEventManager(ctx, brokers, topics, consumerGroupID) + kafkaProducer2, err := NewKafkaProducer(ctx, brokers) require.NoError(t, err) - require.NotNil(t, kafkaEventManager2) + require.NotNil(t, kafkaProducer2) // STEP 5: assert that the returned object is the same as the stored one - require.Same(t, kafkaEventManager1, kafkaEventManager2) + require.Same(t, kafkaProducer1, kafkaProducer2) } diff --git a/internal/events/kafka.go b/internal/events/kafka.go index 1f4f330c6..b5d3e8986 100644 --- a/internal/events/kafka.go +++ b/internal/events/kafka.go @@ -10,40 +10,26 @@ import ( "golang.org/x/exp/maps" ) -type KafkaEventManager struct { - handlers []EventHandler - writer *kafka.Writer - reader *kafka.Reader +type KafkaProducer struct { + writer *kafka.Writer } -func NewKafkaEventManager(brokers []string, consumerTopics []string, consumerGroupID string) (*KafkaEventManager, error) { - k := KafkaEventManager{} +// Implements Producer interface +var _ Producer = new(KafkaProducer) + +func NewKafkaProducer(brokers []string) *KafkaProducer { + k := KafkaProducer{} - writer := kafka.Writer{ + k.writer = &kafka.Writer{ Addr: kafka.TCP(brokers...), Balancer: &kafka.RoundRobin{}, RequiredAcks: -1, } - reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokers, - GroupID: consumerGroupID, - GroupTopics: consumerTopics, - }) - - k.writer = &writer - k.reader = reader - - return &k, nil + return &k } -// Implements Producer interface -var _ Producer = new(KafkaEventManager) - -// Implements Consumer interface -var _ Consumer = new(KafkaEventManager) - -func (k *KafkaEventManager) WriteMessages(ctx context.Context, messages ...Message) error { +func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for _, msg := range messages { msgJSON, err := json.Marshal(msg) @@ -66,7 +52,32 @@ func (k *KafkaEventManager) WriteMessages(ctx context.Context, messages ...Messa return nil } -func (k *KafkaEventManager) RegisterEventHandler(ctx context.Context, handlers ...EventHandler) error { +func (k *KafkaProducer) Close() error { + log.Info("closing kafka producer") + return k.writer.Close() +} + +type KafkaConsumer struct { + handlers []EventHandler + reader *kafka.Reader +} + +// Implements Consumer interface +var _ Consumer = new(KafkaConsumer) + +func NewKafkaConsumer(brokers []string, topic string, consumerGroupID string) *KafkaConsumer { + k := KafkaConsumer{} + + k.reader = kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + Topic: topic, + GroupID: consumerGroupID, + }) + + return &k +} + +func (k *KafkaConsumer) RegisterEventHandler(ctx context.Context, handlers ...EventHandler) error { ehMap := make(map[string]EventHandler, len(handlers)) for _, handler := range handlers { log.Ctx(ctx).Infof("registering event handler %s", handler.Name()) @@ -76,7 +87,7 @@ func (k *KafkaEventManager) RegisterEventHandler(ctx context.Context, handlers . return nil } -func (k *KafkaEventManager) ReadMessage(ctx context.Context) error { +func (k *KafkaConsumer) ReadMessage(ctx context.Context) error { log.Ctx(ctx).Info("fetching messages from kafka") kafkaMessage, err := k.reader.FetchMessage(ctx) if err != nil { @@ -104,9 +115,7 @@ func (k *KafkaEventManager) ReadMessage(ctx context.Context) error { return nil } -func (k *KafkaEventManager) Close() error { - log.Info("closing kafka producer and consumer") - defer k.writer.Close() - defer k.reader.Close() - return nil +func (k *KafkaConsumer) Close() error { + log.Info("closing kafka consumer") + return k.reader.Close() } diff --git a/internal/events/kafka_test.go b/internal/events/kafka_test.go index fb56a587a..b0e698923 100644 --- a/internal/events/kafka_test.go +++ b/internal/events/kafka_test.go @@ -28,11 +28,11 @@ func (h *MockEventHandler) Handle(ctx context.Context, message *Message) { h.Called(ctx, message) } -func Test_KafkaEventManager_RegisterEventHandler(t *testing.T) { +func Test_KafkaConsumer_RegisterEventHandler(t *testing.T) { ctx := context.Background() t.Run("register handler successfully", func(t *testing.T) { - k := KafkaEventManager{} + k := KafkaConsumer{} assert.Empty(t, k.handlers) eh := MockEventHandler{} err := k.RegisterEventHandler(ctx, &eh) @@ -41,7 +41,7 @@ func Test_KafkaEventManager_RegisterEventHandler(t *testing.T) { }) t.Run("no handler duplicated", func(t *testing.T) { - k := KafkaEventManager{} + k := KafkaConsumer{} assert.Empty(t, k.handlers) eh := MockEventHandler{} err := k.RegisterEventHandler(ctx, &eh, &eh) diff --git a/internal/serve/serve.go b/internal/serve/serve.go index 1d51728c9..3cde628ad 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -92,9 +92,6 @@ type ServeOptions struct { tenantManager tenant.ManagerInterface tenantRouter db.DataSourceRouter EventProducer events.Producer - Brokers []string - Topics []string - ConsumerGroupID string } // SetupDependencies uses the serve options to setup the dependencies for the server. From e3e7307805afd96364e340a49ba58919bec502c4 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Thu, 14 Dec 2023 15:02:05 -0300 Subject: [PATCH 08/11] refactor: use context cancellation to stop consuming message --- internal/events/events.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/internal/events/events.go b/internal/events/events.go index 81abafb8f..12c33642c 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "io" + "os" + "os/signal" + "syscall" "github.com/stellar/go/support/log" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" @@ -67,13 +70,28 @@ type Consumer interface { func Consume(ctx context.Context, consumer Consumer, crashTracker crashtracker.CrashTrackerClient) { log.Ctx(ctx).Info("starting consuming messages...") + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + for { - if err := consumer.ReadMessage(ctx); err != nil { - if errors.Is(err, io.EOF) { - log.Ctx(ctx).Warn("message broker returned EOF") // This is an end state - break + select { + case <-ctx.Done(): + log.Ctx(ctx).Infof("Stopping consuming messages due to context cancellation...") + return + + case sig := <-signalChan: + log.Ctx(ctx).Infof("Stopping consuming messages due to OS signal '%+v'", sig) + return + + default: + if err := consumer.ReadMessage(ctx); err != nil { + if errors.Is(err, io.EOF) { + log.Ctx(ctx).Warn("message broker returned EOF") // This is an end state + return + } + crashTracker.LogAndReportErrors(ctx, err, "consuming messages") } - crashTracker.LogAndReportErrors(ctx, err, "consuming messages") } } } From b79912a0c82148c8667e41fbe7da283abae76bc2 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Thu, 14 Dec 2023 15:02:34 -0300 Subject: [PATCH 09/11] fix: register PingPongHandler --- cmd/serve.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/serve.go b/cmd/serve.go index 1d7352a27..f383168ad 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -456,6 +456,12 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ // TODO: remove this example when start implementing the actual consumers pingPongConsumer := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID) defer pingPongConsumer.Close() + + err = pingPongConsumer.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + if err != nil { + log.Ctx(ctx).Fatalf("error registering handler: %v", err) + } + go events.Consume(ctx, pingPongConsumer, crashTrackerClient) // Starting Scheduler Service (background job) if enabled From 9ee7d22a1d0d32495233dfe75be43cc790dba533 Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Thu, 14 Dec 2023 17:42:34 -0300 Subject: [PATCH 10/11] refactor: structure improvements --- cmd/serve.go | 10 +-- cmd/serve_test.go | 3 +- internal/dependencyinjection/event_manager.go | 30 -------- .../dependencyinjection/event_manager_test.go | 73 ------------------- internal/events/events.go | 1 - internal/events/kafka.go | 23 +++--- internal/events/kafka_test.go | 51 ------------- 7 files changed, 20 insertions(+), 171 deletions(-) delete mode 100644 internal/dependencyinjection/event_manager.go delete mode 100644 internal/dependencyinjection/event_manager_test.go delete mode 100644 internal/events/kafka_test.go diff --git a/cmd/serve.go b/cmd/serve.go index f383168ad..e2bfb7587 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -446,7 +446,7 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.AnchorPlatformAPIService = apAPIService // Kafka (background) - kafkaProducer, err := di.NewKafkaProducer(ctx, brokers) + kafkaProducer, err := events.NewKafkaProducer(brokers) if err != nil { log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err) } @@ -454,13 +454,11 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.EventProducer = kafkaProducer // TODO: remove this example when start implementing the actual consumers - pingPongConsumer := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID) - defer pingPongConsumer.Close() - - err = pingPongConsumer.RegisterEventHandler(ctx, &events.PingPongEventHandler{}) + pingPongConsumer, err := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID, &events.PingPongEventHandler{}) if err != nil { - log.Ctx(ctx).Fatalf("error registering handler: %v", err) + log.Ctx(ctx).Fatalf("error creating Kafka Consumer: %v", err) } + defer pingPongConsumer.Close() go events.Consume(ctx, pingPongConsumer, crashTrackerClient) diff --git a/cmd/serve_test.go b/cmd/serve_test.go index 4814cf67d..b773267b6 100644 --- a/cmd/serve_test.go +++ b/cmd/serve_test.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler" @@ -141,7 +142,7 @@ func Test_serve(t *testing.T) { require.NoError(t, err) serveOpts.SMSMessengerClient = smsMessengerClient - kafkaEventManager, err := di.NewKafkaProducer(ctx, []string{"kafka:9092"}) + kafkaEventManager, err := events.NewKafkaProducer([]string{"kafka:9092"}) require.NoError(t, err) serveOpts.EventProducer = kafkaEventManager diff --git a/internal/dependencyinjection/event_manager.go b/internal/dependencyinjection/event_manager.go deleted file mode 100644 index d6249fc3a..000000000 --- a/internal/dependencyinjection/event_manager.go +++ /dev/null @@ -1,30 +0,0 @@ -package dependencyinjection - -import ( - "context" - "fmt" - - "github.com/stellar/go/support/log" - "github.com/stellar/stellar-disbursement-platform-backend/internal/events" -) - -const kafkaProducerInstanceName = "kafka_producer_instance_name" - -func NewKafkaProducer(ctx context.Context, brokers []string) (*events.KafkaProducer, error) { - if len(brokers) == 0 { - return nil, fmt.Errorf("brokers cannot be empty") - } - - if instance, ok := dependenciesStoreMap[kafkaProducerInstanceName]; ok { - if kafkaProducer, ok := instance.(*events.KafkaProducer); ok { - return kafkaProducer, nil - } - return nil, fmt.Errorf("trying to cast pre-existing Kafka Producer for dependency injection") - } - - // Setup Kafka Event Manager - log.Infof("⚙️ Setting Kafka Producer") - kafkaProducer := events.NewKafkaProducer(brokers) - setInstance(kafkaProducerInstanceName, kafkaProducer) - return kafkaProducer, nil -} diff --git a/internal/dependencyinjection/event_manager_test.go b/internal/dependencyinjection/event_manager_test.go deleted file mode 100644 index 71711cc01..000000000 --- a/internal/dependencyinjection/event_manager_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package dependencyinjection - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" -) - -func Test_NewKafkaProducer(t *testing.T) { - testingCases := []struct { - name string - brokers []string - wantErrContains string - }{ - { - name: "return an error if brokers is empty", - brokers: []string{}, - wantErrContains: "brokers cannot be empty", - }, - { - name: "🎉 successfully creates a new instance if none exist before", - brokers: []string{"kafka:9092"}, - wantErrContains: "", - }, - } - ctx := context.Background() - - for _, tc := range testingCases { - t.Run(tc.name, func(t *testing.T) { - defer ClearInstancesTestHelper(t) - - gotResult, err := NewKafkaProducer(ctx, tc.brokers) - if tc.wantErrContains != "" { - require.ErrorContains(t, err, tc.wantErrContains) - require.Nil(t, gotResult) - } else { - require.NoError(t, err) - require.NotNil(t, gotResult) - } - }) - } -} - -func Test_NewKafkaProducer_existingInstanceIsReturned(t *testing.T) { - ctx := context.Background() - brokers := []string{"kafka:9092"} - - defer ClearInstancesTestHelper(t) - - // STEP 1: assert that the instance is nil - _, ok := dependenciesStoreMap[kafkaProducerInstanceName] - require.False(t, ok) - - // STEP 2: create a new instance - kafkaProducer1, err := NewKafkaProducer(ctx, brokers) - require.NoError(t, err) - require.NotNil(t, kafkaProducer1) - - // STEP 3: assert that the instance is not nil - storedKafkaProducer, ok := dependenciesStoreMap[kafkaProducerInstanceName] - require.True(t, ok) - require.NotNil(t, storedKafkaProducer) - require.Same(t, kafkaProducer1, storedKafkaProducer) - - // STEP 4: create a new instance - kafkaProducer2, err := NewKafkaProducer(ctx, brokers) - require.NoError(t, err) - require.NotNil(t, kafkaProducer2) - - // STEP 5: assert that the returned object is the same as the stored one - require.Same(t, kafkaProducer1, kafkaProducer2) -} diff --git a/internal/events/events.go b/internal/events/events.go index 12c33642c..2d8beb713 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -63,7 +63,6 @@ type Producer interface { } type Consumer interface { - RegisterEventHandler(ctx context.Context, eventHandlers ...EventHandler) error ReadMessage(ctx context.Context) error Close() error } diff --git a/internal/events/kafka.go b/internal/events/kafka.go index b5d3e8986..abb6e87cc 100644 --- a/internal/events/kafka.go +++ b/internal/events/kafka.go @@ -17,16 +17,20 @@ type KafkaProducer struct { // Implements Producer interface var _ Producer = new(KafkaProducer) -func NewKafkaProducer(brokers []string) *KafkaProducer { +func NewKafkaProducer(brokers []string) (*KafkaProducer, error) { k := KafkaProducer{} + if len(brokers) == 0 { + return nil, fmt.Errorf("brokers cannot be empty") + } + k.writer = &kafka.Writer{ Addr: kafka.TCP(brokers...), Balancer: &kafka.RoundRobin{}, RequiredAcks: -1, } - return &k + return &k, nil } func (k *KafkaProducer) WriteMessages(ctx context.Context, messages ...Message) error { @@ -65,7 +69,7 @@ type KafkaConsumer struct { // Implements Consumer interface var _ Consumer = new(KafkaConsumer) -func NewKafkaConsumer(brokers []string, topic string, consumerGroupID string) *KafkaConsumer { +func NewKafkaConsumer(brokers []string, topic string, consumerGroupID string, handlers ...EventHandler) (*KafkaConsumer, error) { k := KafkaConsumer{} k.reader = kafka.NewReader(kafka.ReaderConfig{ @@ -74,17 +78,18 @@ func NewKafkaConsumer(brokers []string, topic string, consumerGroupID string) *K GroupID: consumerGroupID, }) - return &k -} + if len(handlers) == 0 { + return nil, fmt.Errorf("handlers cannot be empty") + } -func (k *KafkaConsumer) RegisterEventHandler(ctx context.Context, handlers ...EventHandler) error { - ehMap := make(map[string]EventHandler, len(handlers)) + ehMap := make(map[string]EventHandler) for _, handler := range handlers { - log.Ctx(ctx).Infof("registering event handler %s", handler.Name()) + log.Infof("registering event handler %s for topic %s", handler.Name(), topic) ehMap[handler.Name()] = handler } k.handlers = maps.Values(ehMap) - return nil + + return &k, nil } func (k *KafkaConsumer) ReadMessage(ctx context.Context) error { diff --git a/internal/events/kafka_test.go b/internal/events/kafka_test.go deleted file mode 100644 index b0e698923..000000000 --- a/internal/events/kafka_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package events - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -type MockEventHandler struct { - mock.Mock -} - -var _ EventHandler = new(MockEventHandler) - -func (h *MockEventHandler) Name() string { - return "MockEventHandler" -} - -func (h *MockEventHandler) CanHandleMessage(ctx context.Context, message *Message) bool { - args := h.Called(ctx, message) - return args.Bool(0) -} - -func (h *MockEventHandler) Handle(ctx context.Context, message *Message) { - h.Called(ctx, message) -} - -func Test_KafkaConsumer_RegisterEventHandler(t *testing.T) { - ctx := context.Background() - - t.Run("register handler successfully", func(t *testing.T) { - k := KafkaConsumer{} - assert.Empty(t, k.handlers) - eh := MockEventHandler{} - err := k.RegisterEventHandler(ctx, &eh) - require.NoError(t, err) - assert.Equal(t, []EventHandler{&eh}, k.handlers) - }) - - t.Run("no handler duplicated", func(t *testing.T) { - k := KafkaConsumer{} - assert.Empty(t, k.handlers) - eh := MockEventHandler{} - err := k.RegisterEventHandler(ctx, &eh, &eh) - require.NoError(t, err) - assert.Equal(t, []EventHandler{&eh}, k.handlers) - }) -} From a80a6b06ca0468dd08a97aa39d1f8e72605a8bce Mon Sep 17 00:00:00 2001 From: Caio Teixeira Date: Mon, 18 Dec 2023 14:46:04 -0300 Subject: [PATCH 11/11] feat: add event broker type flag --- cmd/serve.go | 44 ++++++++++++++++++++---------- cmd/utils/custom_set_value.go | 13 +++++++++ cmd/utils/custom_set_value_test.go | 42 ++++++++++++++++++++++++++++ dev/docker-compose-sdp-anchor.yml | 1 - internal/events/types.go | 25 +++++++++++++++++ 5 files changed, 109 insertions(+), 16 deletions(-) create mode 100644 internal/events/types.go diff --git a/cmd/serve.go b/cmd/serve.go index e2bfb7587..2ff7d38e9 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -26,6 +26,7 @@ import ( ) var ( + eventBrokerType events.EventBrokerType brokers []string consumerGroupID string ) @@ -315,20 +316,29 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ FlagDefault: true, Required: false, }, + { + Name: "event-broker-type", + Usage: `Event Broker type. Options: "KAFKA", "NONE"`, + OptType: types.String, + ConfigKey: &eventBrokerType, + CustomSetValue: cmdUtils.SetConfigOptionEventBrokerType, + FlagDefault: string(events.KafkaEventBrokerType), + Required: true, + }, { Name: "brokers", Usage: "List of Message Brokers Connection string comma separated.", OptType: types.String, ConfigKey: &brokers, CustomSetValue: cmdUtils.SetConfigOptionURLList, - Required: true, + Required: false, }, { Name: "consumer-group-id", Usage: "Message Broker Consumer Group ID.", OptType: types.String, ConfigKey: &consumerGroupID, - Required: true, + Required: false, }, } @@ -446,21 +456,25 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ serveOpts.AnchorPlatformAPIService = apAPIService // Kafka (background) - kafkaProducer, err := events.NewKafkaProducer(brokers) - if err != nil { - log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err) - } - defer kafkaProducer.Close() - serveOpts.EventProducer = kafkaProducer + if eventBrokerType == events.KafkaEventBrokerType { + kafkaProducer, err := events.NewKafkaProducer(brokers) + if err != nil { + log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err) + } + defer kafkaProducer.Close() + serveOpts.EventProducer = kafkaProducer - // TODO: remove this example when start implementing the actual consumers - pingPongConsumer, err := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID, &events.PingPongEventHandler{}) - if err != nil { - log.Ctx(ctx).Fatalf("error creating Kafka Consumer: %v", err) - } - defer pingPongConsumer.Close() + // TODO: remove this example when start implementing the actual consumers + pingPongConsumer, err := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID, &events.PingPongEventHandler{}) + if err != nil { + log.Ctx(ctx).Fatalf("error creating Kafka Consumer: %v", err) + } + defer pingPongConsumer.Close() - go events.Consume(ctx, pingPongConsumer, crashTrackerClient) + go events.Consume(ctx, pingPongConsumer, crashTrackerClient) + } else { + log.Ctx(ctx).Warn("Event Broker is NONE.") + } // Starting Scheduler Service (background job) if enabled if serveOpts.EnableScheduler { diff --git a/cmd/utils/custom_set_value.go b/cmd/utils/custom_set_value.go index 32796cf83..0ed811af5 100644 --- a/cmd/utils/custom_set_value.go +++ b/cmd/utils/custom_set_value.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/support/config" "github.com/stellar/go/support/log" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/utils" @@ -249,3 +250,15 @@ func SetConfigOptionStringList(co *config.ConfigOption) error { return nil } + +func SetConfigOptionEventBrokerType(co *config.ConfigOption) error { + ebType := viper.GetString(co.Name) + + ebTypeParsed, err := events.ParseEventBrokerType(ebType) + if err != nil { + return fmt.Errorf("couldn't parse event broker type: %w", err) + } + + *(co.ConfigKey.(*events.EventBrokerType)) = ebTypeParsed + return nil +} diff --git a/cmd/utils/custom_set_value_test.go b/cmd/utils/custom_set_value_test.go index 1c55c9d3b..bc961b3af 100644 --- a/cmd/utils/custom_set_value_test.go +++ b/cmd/utils/custom_set_value_test.go @@ -10,6 +10,7 @@ import ( "github.com/stellar/go/support/config" "github.com/stellar/go/support/log" "github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker" + "github.com/stellar/stellar-disbursement-platform-backend/internal/events" "github.com/stellar/stellar-disbursement-platform-backend/internal/message" "github.com/stellar/stellar-disbursement-platform-backend/internal/monitor" "github.com/stellar/stellar-disbursement-platform-backend/internal/utils" @@ -656,3 +657,44 @@ func Test_SetConfigOptionStringList(t *testing.T) { }) } } + +func Test_SetConfigOptionEventBrokerType(t *testing.T) { + opts := struct{ eventBrokerType events.EventBrokerType }{} + + co := config.ConfigOption{ + Name: "event-broker-type", + OptType: types.String, + CustomSetValue: SetConfigOptionEventBrokerType, + ConfigKey: &opts.eventBrokerType, + } + + testCases := []customSetterTestCase[events.EventBrokerType]{ + { + name: "returns an error if event broker type is empty", + args: []string{"--event-broker-type", ""}, + wantErrContains: "couldn't parse event broker type: invalid event broker type", + }, + { + name: "🎉 handles event broker type (through CLI args): KAFKA", + args: []string{"--event-broker-type", "kafka"}, + wantResult: events.KafkaEventBrokerType, + }, + { + name: "🎉 handles event broker type (through CLI args): NONE", + args: []string{"--event-broker-type", "NONE"}, + wantResult: events.NoneEventBrokerType, + }, + { + name: "returns an error if a invalid event broker type", + args: []string{"--event-broker-type", "invalid"}, + wantErrContains: "couldn't parse event broker type: invalid event broker type", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts.eventBrokerType = "" + customSetterTester[events.EventBrokerType](t, tc, co) + }) + } +} diff --git a/dev/docker-compose-sdp-anchor.yml b/dev/docker-compose-sdp-anchor.yml index 100d556c6..6bd9129a8 100644 --- a/dev/docker-compose-sdp-anchor.yml +++ b/dev/docker-compose-sdp-anchor.yml @@ -44,7 +44,6 @@ services: ENABLE_MFA: "false" ENABLE_RECAPTCHA: "false" BROKERS: "kafka:9092" - TOPICS: "ping-pong" CONSUMER_GROUP_ID: "group-id" # multi-tenant diff --git a/internal/events/types.go b/internal/events/types.go new file mode 100644 index 000000000..49fb75f5f --- /dev/null +++ b/internal/events/types.go @@ -0,0 +1,25 @@ +package events + +import ( + "fmt" + "strings" +) + +type EventBrokerType string + +const ( + KafkaEventBrokerType EventBrokerType = "KAFKA" + // NoneEventBrokerType means that no event broker was chosen. + NoneEventBrokerType EventBrokerType = "NONE" +) + +func ParseEventBrokerType(ebType string) (EventBrokerType, error) { + switch EventBrokerType(strings.ToUpper(ebType)) { + case KafkaEventBrokerType: + return KafkaEventBrokerType, nil + case NoneEventBrokerType: + return NoneEventBrokerType, nil + default: + return "", fmt.Errorf("invalid event broker type") + } +}