Skip to content

Commit

Permalink
[SDP-1317] Add MessageDispatcher to SDP (#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
marwen-abid authored Aug 15, 2024
1 parent 99cbb5d commit 3f16719
Show file tree
Hide file tree
Showing 19 changed files with 597 additions and 96 deletions.
14 changes: 9 additions & 5 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *ServerService) GetSchedulerJobRegistrars(
scheduler.WithPatchAnchorPlatformTransactionsCompletionJobOption(schedulerOptions.PaymentJobIntervalSeconds, apAPIService, models),
scheduler.WithSendReceiverWalletsSMSInvitationJobOption(jobs.SendReceiverWalletsSMSInvitationJobOptions{
Models: models,
MessengerClient: serveOpts.SMSMessengerClient,
MessageDispatcher: serveOpts.MessageDispatcher,
MaxInvitationSMSResendAttempts: int64(serveOpts.MaxInvitationSMSResendAttempts),
Sep10SigningPrivateKey: serveOpts.Sep10SigningPrivateKey,
CrashTrackerClient: serveOpts.CrashTrackerClient.Clone(),
Expand Down Expand Up @@ -147,7 +147,7 @@ func (s *ServerService) SetupConsumers(ctx context.Context, o SetupConsumersOpti
MtnDBConnectionPool: o.ServeOpts.MtnDBConnectionPool,
AdminDBConnectionPool: o.ServeOpts.AdminDBConnectionPool,
AnchorPlatformBaseSepURL: o.ServeOpts.AnchorPlatformBasePlatformURL,
MessengerClient: o.ServeOpts.SMSMessengerClient,
MessageDispatcher: o.ServeOpts.MessageDispatcher,
MaxInvitationSMSResendAttempts: int64(o.ServeOpts.MaxInvitationSMSResendAttempts),
Sep10SigningPrivateKey: o.ServeOpts.Sep10SigningPrivateKey,
}),
Expand Down Expand Up @@ -582,10 +582,14 @@ func (c *ServeCommand) Command(serverService ServerServiceInterface, monitorServ
serveOpts.EmailMessengerClient = emailMessengerClient
adminServeOpts.EmailMessengerClient = emailMessengerClient

// Setup the SMS client
serveOpts.SMSMessengerClient, err = di.NewSMSClient(smsOpts)
// Setup the Message Dispatcher
messageDispatcherOpts := di.MessageDispatcherOpts{
EmailOpts: &emailOpts,
SMSOpts: &smsOpts,
}
serveOpts.MessageDispatcher, err = di.NewMessageDispatcher(ctx, messageDispatcherOpts)
if err != nil {
log.Ctx(ctx).Fatalf("error creating SMS client: %s", err.Error())
log.Ctx(ctx).Fatalf("error creating message dispatcher: %s", err.Error())
}

// Setup the AP Auth enforcer
Expand Down
15 changes: 11 additions & 4 deletions cmd/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,18 @@ func Test_serve(t *testing.T) {
require.NoError(t, err)
serveOpts.CrashTrackerClient = crashTrackerClient

messengerClient, err := di.NewEmailClient(di.EmailClientOptions{EmailType: message.MessengerTypeDryRun})
emailOpts := di.EmailClientOptions{EmailType: message.MessengerTypeDryRun}
emailClient, err := di.NewEmailClient(emailOpts)
require.NoError(t, err)
serveOpts.EmailMessengerClient = messengerClient
serveOpts.EmailMessengerClient = emailClient

serveOpts.SMSMessengerClient, err = di.NewSMSClient(di.SMSClientOptions{SMSType: message.MessengerTypeDryRun})
smsOpts := di.SMSClientOptions{SMSType: message.MessengerTypeDryRun}

messageDispatcherOpts := di.MessageDispatcherOpts{
EmailOpts: &emailOpts,
SMSOpts: &smsOpts,
}
serveOpts.MessageDispatcher, err = di.NewMessageDispatcher(ctx, messageDispatcherOpts)
require.NoError(t, err)

kafkaConfig := events.KafkaConfig{
Expand All @@ -220,7 +227,7 @@ func Test_serve(t *testing.T) {

serveTenantOpts := serveadmin.ServeOptions{
Environment: "test",
EmailMessengerClient: messengerClient,
EmailMessengerClient: emailClient,
AdminDBConnectionPool: dbConnectionPool,
MTNDBConnectionPool: dbConnectionPool,
CrashTrackerClient: crashTrackerClient,
Expand Down
45 changes: 45 additions & 0 deletions internal/dependencyinjection/message_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dependencyinjection

import (
"context"
"fmt"

"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
)

const MessageDispatcherInstanceName = "message_dispatcher_instance"

type MessageDispatcherOpts struct {
EmailOpts *EmailClientOptions
SMSOpts *SMSClientOptions
}

func NewMessageDispatcher(ctx context.Context, opts MessageDispatcherOpts) (*message.MessageDispatcher, error) {
if instance, ok := GetInstance(MessageDispatcherInstanceName); ok {
if dispatcherInstance, ok := instance.(*message.MessageDispatcher); ok {
return dispatcherInstance, nil
}
return nil, fmt.Errorf("trying to cast pre-existing MessageDispatcher for dependency injection")
}

dispatcher := message.NewMessageDispatcher()

if opts.EmailOpts != nil {
emailClient, err := NewEmailClient(*opts.EmailOpts)
if err != nil {
return nil, fmt.Errorf("creating email client: %w", err)
}
dispatcher.RegisterClient(ctx, message.MessageChannelEmail, emailClient)
}

if opts.SMSOpts != nil {
smsClient, err := NewSMSClient(*opts.SMSOpts)
if err != nil {
return nil, fmt.Errorf("creating SMS client: %w", err)
}
dispatcher.RegisterClient(ctx, message.MessageChannelSMS, smsClient)
}

SetInstance(MessageDispatcherInstanceName, dispatcher)
return dispatcher, nil
}
116 changes: 116 additions & 0 deletions internal/dependencyinjection/message_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package dependencyinjection

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/stellar-disbursement-platform-backend/internal/message"
)

func Test_NewMessageDispatcher(t *testing.T) {
ctx := context.Background()
t.Run("should return the same instance when called twice", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

opts := MessageDispatcherOpts{
EmailOpts: &EmailClientOptions{
EmailType: message.MessengerTypeDryRun,
},
SMSOpts: &SMSClientOptions{
SMSType: message.MessengerTypeDryRun,
},
}

dispatcher1, err := NewMessageDispatcher(ctx, opts)
require.NoError(t, err)
dispatcher2, err := NewMessageDispatcher(ctx, opts)
require.NoError(t, err)
assert.Equal(t, dispatcher1, dispatcher2)
})

t.Run("should create dispatcher with email client only", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

opts := MessageDispatcherOpts{
EmailOpts: &EmailClientOptions{
EmailType: message.MessengerTypeDryRun,
},
}

dispatcher, err := NewMessageDispatcher(ctx, opts)
require.NoError(t, err)

emailClient, err := dispatcher.GetClient(message.MessageChannelEmail)
require.NoError(t, err)
assert.NotNil(t, emailClient)

smsClient, err := dispatcher.GetClient(message.MessageChannelSMS)
assert.EqualError(t, err, "no client registered for channel \"SMS\"")
assert.Nil(t, smsClient)
})

t.Run("should create dispatcher with SMS client only", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

opts := MessageDispatcherOpts{
SMSOpts: &SMSClientOptions{
SMSType: message.MessengerTypeDryRun,
},
}

dispatcher, err := NewMessageDispatcher(ctx, opts)
require.NoError(t, err)

smsClient, err := dispatcher.GetClient(message.MessageChannelSMS)
require.NoError(t, err)
assert.NotNil(t, smsClient)

emailClient, err := dispatcher.GetClient(message.MessageChannelEmail)
assert.EqualError(t, err, "no client registered for channel \"EMAIL\"")
assert.Nil(t, emailClient)
})

t.Run("should return an error on invalid email client creation", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

opts := MessageDispatcherOpts{
EmailOpts: &EmailClientOptions{
EmailType: "invalid-type",
},
}

dispatcher, err := NewMessageDispatcher(ctx, opts)
assert.ErrorContains(t, err, `trying to create a Email client with a non-supported Email type: "invalid-type"`)
assert.Nil(t, dispatcher)
})

t.Run("should return an error on invalid SMS client creation", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

opts := MessageDispatcherOpts{
SMSOpts: &SMSClientOptions{
SMSType: "invalid-type",
},
}

dispatcher, err := NewMessageDispatcher(ctx, opts)
assert.ErrorContains(t, err, `trying to create a SMS client with a non-supported SMS type: "invalid-type"`)
assert.Nil(t, dispatcher)
})

t.Run("should return an error on invalid pre-existing instance", func(t *testing.T) {
defer ClearInstancesTestHelper(t)

preExistingDispatcherWithInvalidType := struct{}{}
SetInstance(MessageDispatcherInstanceName, preExistingDispatcherWithInvalidType)

opts := MessageDispatcherOpts{}

gotDispatcher, err := NewMessageDispatcher(ctx, opts)
assert.Nil(t, gotDispatcher)
assert.EqualError(t, err, "trying to cast pre-existing MessageDispatcher for dependency injection")
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type SendReceiverWalletsSMSInvitationEventHandlerOptions struct {
AdminDBConnectionPool db.DBConnectionPool
MtnDBConnectionPool db.DBConnectionPool
AnchorPlatformBaseSepURL string
MessengerClient message.MessengerClient
MessageDispatcher message.MessageDispatcherInterface
MaxInvitationSMSResendAttempts int64
Sep10SigningPrivateKey string
CrashTrackerClient crashtracker.CrashTrackerClient
Expand All @@ -45,7 +45,7 @@ func NewSendReceiverWalletsSMSInvitationEventHandler(options SendReceiverWallets

s, err := services.NewSendReceiverWalletInviteService(
models,
options.MessengerClient,
options.MessageDispatcher,
options.Sep10SigningPrivateKey,
options.MaxInvitationSMSResendAttempts,
options.CrashTrackerClient,
Expand Down
57 changes: 57 additions & 0 deletions internal/message/message_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package message

import (
"context"
"fmt"

"github.com/stellar/go/support/log"
)

type MessageChannel string

const (
MessageChannelEmail MessageChannel = "EMAIL"
MessageChannelSMS MessageChannel = "SMS"
)

//go:generate mockery --name MessageDispatcherInterface --case=underscore --structname=MockMessageDispatcher --inpackage
type MessageDispatcherInterface interface {
RegisterClient(ctx context.Context, channel MessageChannel, client MessengerClient)
SendMessage(message Message, channel MessageChannel) error
GetClient(channel MessageChannel) (MessengerClient, error)
}

type MessageDispatcher struct {
clients map[MessageChannel]MessengerClient
}

func NewMessageDispatcher() *MessageDispatcher {
return &MessageDispatcher{
clients: make(map[MessageChannel]MessengerClient),
}
}

func (d *MessageDispatcher) RegisterClient(ctx context.Context, channel MessageChannel, client MessengerClient) {
log.Ctx(ctx).Infof("📡 [MessageDispatcher] Registering client %s for channel %s", client.MessengerType(), channel)
d.clients[channel] = client
}

func (d *MessageDispatcher) SendMessage(message Message, channel MessageChannel) error {
client, err := d.GetClient(channel)
if err != nil {
return fmt.Errorf("getting client for channel: %w", err)
}

return client.SendMessage(message)
}

func (d *MessageDispatcher) GetClient(channel MessageChannel) (MessengerClient, error) {
client, ok := d.clients[channel]
if !ok {
return nil, fmt.Errorf("no client registered for channel %q", channel)
}

return client, nil
}

var _ MessageDispatcherInterface = &MessageDispatcher{}
Loading

0 comments on commit 3f16719

Please sign in to comment.