Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDP-922] events/serve: kafka setup #119

Merged
merged 11 commits into from
Dec 19, 2023
52 changes: 52 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/data"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler/jobs"
Expand All @@ -24,6 +25,12 @@ import (
serveadmin "github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/serve"
)

var (
eventBrokerType events.EventBrokerType
brokers []string
consumerGroupID string
)

type ServeCommand struct{}

type ServerServiceInterface interface {
Expand Down Expand Up @@ -309,6 +316,30 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
FlagDefault: true,
Required: false,
},
{
Name: "event-broker-type",
Usage: `Event Broker type. Options: "KAFKA", "NONE"`,
OptType: types.String,
ConfigKey: &eventBrokerType,
CustomSetValue: cmdUtils.SetConfigOptionEventBrokerType,
FlagDefault: string(events.KafkaEventBrokerType),
Required: true,
},
{
Name: "brokers",
Usage: "List of Message Brokers Connection string comma separated.",
OptType: types.String,
ConfigKey: &brokers,
CustomSetValue: cmdUtils.SetConfigOptionURLList,
Required: false,
},
{
Name: "consumer-group-id",
Usage: "Message Broker Consumer Group ID.",
OptType: types.String,
ConfigKey: &consumerGroupID,
Required: false,
},
}

messengerOptions := message.MessengerOptions{}
Expand Down Expand Up @@ -424,6 +455,27 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
}
serveOpts.AnchorPlatformAPIService = apAPIService

// Kafka (background)
if eventBrokerType == events.KafkaEventBrokerType {
kafkaProducer, err := events.NewKafkaProducer(brokers)
if err != nil {
log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err)
}
defer kafkaProducer.Close()
serveOpts.EventProducer = kafkaProducer

// TODO: remove this example when start implementing the actual consumers
pingPongConsumer, err := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID, &events.PingPongEventHandler{})
if err != nil {
log.Ctx(ctx).Fatalf("error creating Kafka Consumer: %v", err)
}
defer pingPongConsumer.Close()

go events.Consume(ctx, pingPongConsumer, crashTrackerClient)
} else {
log.Ctx(ctx).Warn("Event Broker is NONE.")
}

// Starting Scheduler Service (background job) if enabled
if serveOpts.EnableScheduler {
log.Ctx(ctx).Info("Starting Scheduler Service...")
Expand Down
7 changes: 7 additions & 0 deletions cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
Expand Down Expand Up @@ -141,6 +142,10 @@ func Test_serve(t *testing.T) {
require.NoError(t, err)
serveOpts.SMSMessengerClient = smsMessengerClient

kafkaEventManager, err := events.NewKafkaProducer([]string{"kafka:9092"})
require.NoError(t, err)
serveOpts.EventProducer = kafkaEventManager

metricOptions := monitor.MetricOptions{
MetricType: monitor.MetricTypePrometheus,
Environment: "test",
Expand Down Expand Up @@ -213,6 +218,8 @@ func Test_serve(t *testing.T) {
t.Setenv("INSTANCE_NAME", serveOpts.InstanceName)
t.Setenv("ENABLE_SCHEDULER", "true")
t.Setenv("ENABLE_MULTITENANT_DB", "false")
t.Setenv("BROKERS", "kafka:9092")
t.Setenv("CONSUMER_GROUP_ID", "group-id")

// test & assert
rootCmd.SetArgs([]string{"--environment", "test", "serve", "--metrics-type", "PROMETHEUS"})
Expand Down
59 changes: 59 additions & 0 deletions cmd/utils/custom_set_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
Expand Down Expand Up @@ -203,3 +204,61 @@ func SetConfigOptionURLString(co *config.ConfigOption) error {

return nil
}

func SetConfigOptionURLList(co *config.ConfigOption) error {
urlsStr := viper.GetString(co.Name)

if urlsStr == "" {
return fmt.Errorf("url list cannot be empty")
}

urls := strings.Split(urlsStr, ",")
for _, u := range urls {
_, err := url.ParseRequestURI(strings.TrimSpace(u))
if err != nil {
return fmt.Errorf("error parsing url: %w", err)
}
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}
*key = urls

return nil
}

func SetConfigOptionStringList(co *config.ConfigOption) error {
listStr := viper.GetString(co.Name)

if listStr == "" {
return fmt.Errorf("cannot be empty")
}

list := strings.Split(listStr, ",")
for i, el := range list {
list[i] = strings.TrimSpace(el)
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}

*key = list

return nil
}

func SetConfigOptionEventBrokerType(co *config.ConfigOption) error {
ebType := viper.GetString(co.Name)

ebTypeParsed, err := events.ParseEventBrokerType(ebType)
if err != nil {
return fmt.Errorf("couldn't parse event broker type: %w", err)
}

*(co.ConfigKey.(*events.EventBrokerType)) = ebTypeParsed
return nil
}
116 changes: 116 additions & 0 deletions cmd/utils/custom_set_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
Expand Down Expand Up @@ -582,3 +583,118 @@ func Test_SetConfigOptionURLString(t *testing.T) {
})
}
}

func Test_SetConfigOptionURLList(t *testing.T) {
opts := struct{ brokers []string }{}

co := config.ConfigOption{
Name: "brokers",
OptType: types.String,
CustomSetValue: SetConfigOptionURLList,
ConfigKey: &opts.brokers,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--brokers", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--brokers", "kafka:9092,localhost:9093,kafka://broker:9092"},
wantResult: []string{"kafka:9092", "localhost:9093", "kafka://broker:9092"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "kafka:9092,localhost:9093",
wantResult: []string{"kafka:9092", "localhost:9093"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.brokers = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}

func Test_SetConfigOptionStringList(t *testing.T) {
opts := struct{ topics []string }{}

co := config.ConfigOption{
Name: "topics",
OptType: types.String,
CustomSetValue: SetConfigOptionStringList,
ConfigKey: &opts.topics,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--topics", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--topics", "topic1, topic2,topic3"},
wantResult: []string{"topic1", "topic2", "topic3"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "topic1, topic2",
wantResult: []string{"topic1", "topic2"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.topics = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}

func Test_SetConfigOptionEventBrokerType(t *testing.T) {
opts := struct{ eventBrokerType events.EventBrokerType }{}

co := config.ConfigOption{
Name: "event-broker-type",
OptType: types.String,
CustomSetValue: SetConfigOptionEventBrokerType,
ConfigKey: &opts.eventBrokerType,
}

testCases := []customSetterTestCase[events.EventBrokerType]{
{
name: "returns an error if event broker type is empty",
args: []string{"--event-broker-type", ""},
wantErrContains: "couldn't parse event broker type: invalid event broker type",
},
{
name: "🎉 handles event broker type (through CLI args): KAFKA",
args: []string{"--event-broker-type", "kafka"},
wantResult: events.KafkaEventBrokerType,
},
{
name: "🎉 handles event broker type (through CLI args): NONE",
args: []string{"--event-broker-type", "NONE"},
wantResult: events.NoneEventBrokerType,
},
{
name: "returns an error if a invalid event broker type",
args: []string{"--event-broker-type", "invalid"},
wantErrContains: "couldn't parse event broker type: invalid event broker type",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.eventBrokerType = ""
customSetterTester[events.EventBrokerType](t, tc, co)
})
}
}
64 changes: 64 additions & 0 deletions dev/docker-compose-sdp-anchor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ services:
CORS_ALLOWED_ORIGINS: "*"
ENABLE_MFA: "false"
ENABLE_RECAPTCHA: "false"
BROKERS: "kafka:9092"
CONSUMER_GROUP_ID: "group-id"

# multi-tenant
INSTANCE_NAME: "SDP Testnet on Docker"
Expand Down Expand Up @@ -74,6 +76,7 @@ services:
./stellar-disbursement-platform serve
depends_on:
- db
- kafka

db-anchor-platform:
container_name: anchor-platform-postgres-db
Expand Down Expand Up @@ -170,8 +173,69 @@ services:
SECRET_SEP10_SIGNING_SEED: ${SEP10_SIGNING_PRIVATE_KEY}
SECRET_SEP24_INTERACTIVE_URL_JWT_SECRET: jwt_secret_1234567890
SECRET_SEP24_MORE_INFO_URL_JWT_SECRET: jwt_secret_1234567890

kafka:
image: docker.io/bitnami/kafka:3.6
ports:
- "9094:9094"
volumes:
- "kafka-data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

db-conduktor:
image: postgres:14
hostname: postgresql
volumes:
- db-conduktor-data:/var/lib/postgresql/data
environment:
POSTGRES_DB: "conduktor-platform"
POSTGRES_USER: "conduktor"
POSTGRES_PASSWORD: "change_me"
POSTGRES_HOST_AUTH_METHOD: "scram-sha-256"

conduktor-platform:
image: conduktor/conduktor-platform:1.19.2
depends_on:
- db-conduktor
ports:
- "9090:8080"
volumes:
- conduktor-platform-data:/var/conduktor
environment:
CDK_DATABASE_URL: "postgresql://conduktor:change_me@db-conduktor:5432/conduktor-platform"
CDK_MONITORING_CORTEX-URL: http://conduktor-monitoring:9009/
CDK_MONITORING_ALERT-MANAGER-URL: http://conduktor-monitoring:9010/
CDK_MONITORING_CALLBACK-URL: http://conduktor-platform:9090/monitoring/api/
CDK_MONITORING_NOTIFICATIONS-CALLBACK-URL: http://localhost:9090
healthcheck:
test: curl -f http://localhost:9090/platform/api/modules/health/live || exit 1
interval: 10s
start_period: 10s
timeout: 5s
retries: 3

conduktor-monitoring:
image: conduktor/conduktor-platform-cortex:1.19.2
environment:
CDK_CONSOLE-URL: "http://conduktor-platform:9090"

volumes:
postgres-db:
driver: local
postgres-ap-db:
driver: local
kafka-data:
driver: local
db-conduktor-data:
driver: local
conduktor-platform-data:
driver: local
Loading
Loading