Skip to content

Commit

Permalink
Merge branch 'sdp-multitenant' into marcelo/SDP-1001-refactor-migrati…
Browse files Browse the repository at this point in the history
…on-commands
  • Loading branch information
marcelosalloum authored Jan 2, 2024
2 parents 13fed1e + 3382eec commit 14cf079
Show file tree
Hide file tree
Showing 37 changed files with 1,015 additions and 361 deletions.
54 changes: 53 additions & 1 deletion cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/data"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler/jobs"
Expand All @@ -24,6 +25,12 @@ import (
serveadmin "github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/serve"
)

var (
eventBrokerType events.EventBrokerType
brokers []string
consumerGroupID string
)

type ServeCommand struct{}

type ServerServiceInterface interface {
Expand Down Expand Up @@ -121,7 +128,7 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
Required: true,
},
{
Name: "admin-serve-port",
Name: "admin-port",
Usage: "Port where the admin tenant server will be listening on",
OptType: types.Int,
ConfigKey: &adminServeOpts.Port,
Expand Down Expand Up @@ -309,6 +316,30 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
FlagDefault: true,
Required: false,
},
{
Name: "event-broker-type",
Usage: `Event Broker type. Options: "KAFKA", "NONE"`,
OptType: types.String,
ConfigKey: &eventBrokerType,
CustomSetValue: cmdUtils.SetConfigOptionEventBrokerType,
FlagDefault: string(events.KafkaEventBrokerType),
Required: true,
},
{
Name: "brokers",
Usage: "List of Message Brokers Connection string comma separated.",
OptType: types.String,
ConfigKey: &brokers,
CustomSetValue: cmdUtils.SetConfigOptionURLList,
Required: false,
},
{
Name: "consumer-group-id",
Usage: "Message Broker Consumer Group ID.",
OptType: types.String,
ConfigKey: &consumerGroupID,
Required: false,
},
}

messengerOptions := message.MessengerOptions{}
Expand Down Expand Up @@ -424,6 +455,27 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
}
serveOpts.AnchorPlatformAPIService = apAPIService

// Kafka (background)
if eventBrokerType == events.KafkaEventBrokerType {
kafkaProducer, err := events.NewKafkaProducer(brokers)
if err != nil {
log.Ctx(ctx).Fatalf("error creating Kafka Producer: %v", err)
}
defer kafkaProducer.Close()
serveOpts.EventProducer = kafkaProducer

// TODO: remove this example when start implementing the actual consumers
pingPongConsumer, err := events.NewKafkaConsumer(brokers, "ping-pong", consumerGroupID, &events.PingPongEventHandler{})
if err != nil {
log.Ctx(ctx).Fatalf("error creating Kafka Consumer: %v", err)
}
defer pingPongConsumer.Close()

go events.Consume(ctx, pingPongConsumer, crashTrackerClient)
} else {
log.Ctx(ctx).Warn("Event Broker is NONE.")
}

// Starting Scheduler Service (background job) if enabled
if serveOpts.EnableScheduler {
log.Ctx(ctx).Info("Starting Scheduler Service...")
Expand Down
7 changes: 7 additions & 0 deletions cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/anchorplatform"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
di "github.com/stellar/stellar-disbursement-platform-backend/internal/dependencyinjection"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/scheduler"
Expand Down Expand Up @@ -141,6 +142,10 @@ func Test_serve(t *testing.T) {
require.NoError(t, err)
serveOpts.SMSMessengerClient = smsMessengerClient

kafkaEventManager, err := events.NewKafkaProducer([]string{"kafka:9092"})
require.NoError(t, err)
serveOpts.EventProducer = kafkaEventManager

metricOptions := monitor.MetricOptions{
MetricType: monitor.MetricTypePrometheus,
Environment: "test",
Expand Down Expand Up @@ -213,6 +218,8 @@ func Test_serve(t *testing.T) {
t.Setenv("INSTANCE_NAME", serveOpts.InstanceName)
t.Setenv("ENABLE_SCHEDULER", "true")
t.Setenv("ENABLE_MULTITENANT_DB", "false")
t.Setenv("BROKERS", "kafka:9092")
t.Setenv("CONSUMER_GROUP_ID", "group-id")

// test & assert
rootCmd.SetArgs([]string{"--environment", "test", "serve", "--metrics-type", "PROMETHEUS"})
Expand Down
59 changes: 59 additions & 0 deletions cmd/utils/custom_set_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
Expand Down Expand Up @@ -203,3 +204,61 @@ func SetConfigOptionURLString(co *config.ConfigOption) error {

return nil
}

func SetConfigOptionURLList(co *config.ConfigOption) error {
urlsStr := viper.GetString(co.Name)

if urlsStr == "" {
return fmt.Errorf("url list cannot be empty")
}

urls := strings.Split(urlsStr, ",")
for _, u := range urls {
_, err := url.ParseRequestURI(strings.TrimSpace(u))
if err != nil {
return fmt.Errorf("error parsing url: %w", err)
}
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}
*key = urls

return nil
}

func SetConfigOptionStringList(co *config.ConfigOption) error {
listStr := viper.GetString(co.Name)

if listStr == "" {
return fmt.Errorf("cannot be empty")
}

list := strings.Split(listStr, ",")
for i, el := range list {
list[i] = strings.TrimSpace(el)
}

key, ok := co.ConfigKey.(*[]string)
if !ok {
return fmt.Errorf("the expected type for this config key is a string slice, but got a %T instead", co.ConfigKey)
}

*key = list

return nil
}

func SetConfigOptionEventBrokerType(co *config.ConfigOption) error {
ebType := viper.GetString(co.Name)

ebTypeParsed, err := events.ParseEventBrokerType(ebType)
if err != nil {
return fmt.Errorf("couldn't parse event broker type: %w", err)
}

*(co.ConfigKey.(*events.EventBrokerType)) = ebTypeParsed
return nil
}
116 changes: 116 additions & 0 deletions cmd/utils/custom_set_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-disbursement-platform-backend/internal/crashtracker"
"github.com/stellar/stellar-disbursement-platform-backend/internal/events"
"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
"github.com/stellar/stellar-disbursement-platform-backend/internal/monitor"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
Expand Down Expand Up @@ -582,3 +583,118 @@ func Test_SetConfigOptionURLString(t *testing.T) {
})
}
}

func Test_SetConfigOptionURLList(t *testing.T) {
opts := struct{ brokers []string }{}

co := config.ConfigOption{
Name: "brokers",
OptType: types.String,
CustomSetValue: SetConfigOptionURLList,
ConfigKey: &opts.brokers,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--brokers", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--brokers", "kafka:9092,localhost:9093,kafka://broker:9092"},
wantResult: []string{"kafka:9092", "localhost:9093", "kafka://broker:9092"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "kafka:9092,localhost:9093",
wantResult: []string{"kafka:9092", "localhost:9093"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.brokers = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}

func Test_SetConfigOptionStringList(t *testing.T) {
opts := struct{ topics []string }{}

co := config.ConfigOption{
Name: "topics",
OptType: types.String,
CustomSetValue: SetConfigOptionStringList,
ConfigKey: &opts.topics,
Required: false,
}

testCases := []customSetterTestCase[[]string]{
{
name: "returns an error if the list is empty",
args: []string{"--topics", ""},
wantErrContains: "cannot be empty",
},
{
name: "🎉 handles string list successfully (from CLI args)",
args: []string{"--topics", "topic1, topic2,topic3"},
wantResult: []string{"topic1", "topic2", "topic3"},
},
{
name: "🎉 string list successfully (from ENV vars)",
envValue: "topic1, topic2",
wantResult: []string{"topic1", "topic2"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.topics = []string{}
customSetterTester[[]string](t, tc, co)
})
}
}

func Test_SetConfigOptionEventBrokerType(t *testing.T) {
opts := struct{ eventBrokerType events.EventBrokerType }{}

co := config.ConfigOption{
Name: "event-broker-type",
OptType: types.String,
CustomSetValue: SetConfigOptionEventBrokerType,
ConfigKey: &opts.eventBrokerType,
}

testCases := []customSetterTestCase[events.EventBrokerType]{
{
name: "returns an error if event broker type is empty",
args: []string{"--event-broker-type", ""},
wantErrContains: "couldn't parse event broker type: invalid event broker type",
},
{
name: "🎉 handles event broker type (through CLI args): KAFKA",
args: []string{"--event-broker-type", "kafka"},
wantResult: events.KafkaEventBrokerType,
},
{
name: "🎉 handles event broker type (through CLI args): NONE",
args: []string{"--event-broker-type", "NONE"},
wantResult: events.NoneEventBrokerType,
},
{
name: "returns an error if a invalid event broker type",
args: []string{"--event-broker-type", "invalid"},
wantErrContains: "couldn't parse event broker type: invalid event broker type",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts.eventBrokerType = ""
customSetterTester[events.EventBrokerType](t, tc, co)
})
}
}
6 changes: 3 additions & 3 deletions db/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func TestMigrate_downApplyOne_Tenant_migrations(t *testing.T) {

n, err := Migrate(db.DSN, migrate.Up, 2, adminmigrations.FS, StellarAdminMigrationsTableName)
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 2, n)

n, err = Migrate(db.DSN, migrate.Down, 1, adminmigrations.FS, StellarAdminMigrationsTableName)
n, err = Migrate(db.DSN, migrate.Down, 2, adminmigrations.FS, StellarAdminMigrationsTableName)
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 2, n)

ids := []string{}
err = dbConnectionPool.SelectContext(ctx, &ids, fmt.Sprintf("SELECT id FROM %s", StellarAdminMigrationsTableName))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Drop unused cors allowed origins column.

-- +migrate Up

ALTER TABLE public.tenants
DROP COLUMN IF EXISTS cors_allowed_origins;


-- +migrate Down

ALTER TABLE public.tenants
ADD COLUMN cors_allowed_origins text[] NULL;
Loading

0 comments on commit 14cf079

Please sign in to comment.