Skip to content

Commit

Permalink
new field for secretsmanager instead of hacking existing password field
Browse files Browse the repository at this point in the history
  • Loading branch information
petedannemann committed Dec 29, 2023
1 parent 0df9585 commit d959bfe
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 84 deletions.
9 changes: 6 additions & 3 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ func applyTopic(
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
applyConfig.dryRun,
applyConfig.shared.saslUsername,
applyConfig.shared.saslPassword,
config.AdminClientOpts{
ReadOnly: applyConfig.dryRun,
UsernameOverride: applyConfig.shared.saslUsername,
PasswordOverride: applyConfig.shared.saslPassword,
SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn,
},
)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions cmd/topicctl/subcmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
adminClient, err := clusterConfig.NewAdminClient(
ctx,
nil,
true,
bootstrapConfig.shared.saslUsername,
bootstrapConfig.shared.saslPassword,
config.AdminClientOpts{
ReadOnly: true,
UsernameOverride: bootstrapConfig.shared.saslUsername,
PasswordOverride: bootstrapConfig.shared.saslPassword,
SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn,
},
)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions cmd/topicctl/subcmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ func checkTopicFile(
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
true,
checkConfig.shared.saslUsername,
checkConfig.shared.saslPassword,
config.AdminClientOpts{
ReadOnly: true,
UsernameOverride: checkConfig.shared.saslUsername,
PasswordOverride: checkConfig.shared.saslPassword,
SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn,
},
)
if err != nil {
return false, err
Expand Down
9 changes: 6 additions & 3 deletions cmd/topicctl/subcmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ func createACL(
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
createConfig.dryRun,
createConfig.shared.saslUsername,
createConfig.shared.saslPassword,
config.AdminClientOpts{
ReadOnly: createConfig.dryRun,
UsernameOverride: createConfig.shared.saslUsername,
PasswordOverride: createConfig.shared.saslPassword,
SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn,
},
)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {

adminClient, err := clusterConfig.NewAdminClient(ctx,
nil,
rebalanceConfig.dryRun,
rebalanceConfig.shared.saslUsername,
rebalanceConfig.shared.saslPassword,
config.AdminClientOpts{
ReadOnly: rebalanceConfig.dryRun,
UsernameOverride: rebalanceConfig.shared.saslUsername,
PasswordOverride: rebalanceConfig.shared.saslPassword,
SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn,
},
)
if err != nil {
log.Fatal(err)
Expand Down
77 changes: 31 additions & 46 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ package subcmd
import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/hashicorp/go-multierror"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/config"
Expand All @@ -19,20 +14,21 @@ import (
)

type sharedOptions struct {
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
brokerAddr string
clusterConfig string
expandEnv bool
saslMechanism string
saslPassword string
saslUsername string
saslSecretsManagerArn string
tlsCACert string
tlsCert string
tlsEnabled bool
tlsKey string
tlsSkipVerify bool
tlsServerName string
zkAddr string
zkPrefix string
}

func (s sharedOptions) validate() error {
Expand Down Expand Up @@ -81,7 +77,7 @@ func (s sharedOptions) validate() error {
}

useTLS := s.tlsEnabled || s.tlsCACert != "" || s.tlsCert != "" || s.tlsKey != ""
useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != ""
useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != "" || s.saslSecretsManagerArn != ""

if useTLS && s.zkAddr != "" {
log.Warn("TLS flags are ignored accessing cluster via zookeeper")
Expand All @@ -100,6 +96,10 @@ func (s sharedOptions) validate() error {
(s.saslUsername != "" || s.saslPassword != "") {
log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM")
}

if s.saslUsername != "" || s.saslPassword != "" && s.saslSecretsManagerArn != "" {
err = multierror.Append(err, errors.New("Cannot set both sasl-username or sasl-password and sasl-secrets-manager-arn"))
}
}

return err
Expand All @@ -118,9 +118,12 @@ func (s sharedOptions) getAdminClient(
return clusterConfig.NewAdminClient(
ctx,
sess,
readOnly,
s.saslUsername,
s.saslPassword,
config.AdminClientOpts{
ReadOnly: readOnly,
UsernameOverride: s.saslUsername,
PasswordOverride: s.saslPassword,
SecretsManagerArnOverride: s.saslSecretsManagerArn,
},
)
} else if s.brokerAddr != "" {
tlsEnabled := (s.tlsEnabled ||
Expand All @@ -141,25 +144,6 @@ func (s sharedOptions) getAdminClient(
}
}

if strings.HasPrefix(s.saslPassword, "arn:aws:secretsmanager:") {
log.Debug("Fetching password from AWS secrets manager...")
sess := session.Must(session.NewSession())

svc := secretsmanager.New(sess, aws.NewConfig())

arn, err := arn.Parse(s.saslPassword)
if err != nil {
return nil, fmt.Errorf("failed to parse ARN from password: %v", err)
}

secretValue, err := svc.GetSecretValue(&secretsmanager.GetSecretValueInput{SecretId: aws.String(arn.Resource)})
if err != nil {
return nil, fmt.Errorf("failed to get secret value: %v", err)
}

s.saslPassword = *secretValue.SecretString
}

return admin.NewBrokerAdminClient(
ctx,
admin.BrokerAdminClientConfig{
Expand All @@ -174,10 +158,11 @@ func (s sharedOptions) getAdminClient(
SkipVerify: s.tlsSkipVerify,
},
SASL: admin.SASLConfig{
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
Enabled: saslEnabled,
Mechanism: saslMechanism,
Password: s.saslPassword,
Username: s.saslUsername,
SecretsManagerArn: s.saslSecretsManagerArn,
},
},
ReadOnly: readOnly,
Expand Down
75 changes: 65 additions & 10 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package admin
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/aws/aws-sdk-go/service/secretsmanager"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam"
Expand Down Expand Up @@ -48,10 +51,11 @@ type TLSConfig struct {

// SASLConfig stores the SASL-related configuration for a connection.
type SASLConfig struct {
Enabled bool
Mechanism SASLMechanism
Username string
Password string
Enabled bool
Mechanism SASLMechanism
Username string
Password string
SecretsManagerArn string
}

// Connector is a wrapper around the low-level, kafka-go dialer and client.
Expand All @@ -72,6 +76,19 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
var err error

if config.SASL.Enabled {
saslUsername := config.SASL.Username
saslPassword := config.SASL.Password

if config.SASL.SecretsManagerArn != "" {
creds, err := fetchCredentialsFromSecretsManager(config.SASL.SecretsManagerArn)
if err != nil {
return nil, err
}

saslUsername = creds.Username
saslPassword = creds.Password
}

switch config.SASL.Mechanism {
case SASLMechanismAWSMSKIAM:
sess := session.Must(session.NewSession())
Expand All @@ -84,23 +101,23 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
}
case SASLMechanismPlain:
mechanismClient = plain.Mechanism{
Username: config.SASL.Username,
Password: config.SASL.Password,
Username: saslUsername,
Password: saslPassword,
}
case SASLMechanismScramSHA256:
mechanismClient, err = scram.Mechanism(
scram.SHA256,
config.SASL.Username,
config.SASL.Password,
saslUsername,
saslPassword,
)
if err != nil {
return nil, err
}
case SASLMechanismScramSHA512:
mechanismClient, err = scram.Mechanism(
scram.SHA512,
config.SASL.Username,
config.SASL.Password,
saslUsername,
saslPassword,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -193,3 +210,41 @@ func SASLNameToMechanism(name string) (SASLMechanism, error) {
)
}
}

type credentials struct {
Username string `json:"username"`
Password string `json:"password"`
}

func fetchCredentialsFromSecretsManager(secretArn string) (credentials, error) {
log.Debugf("Fetching credentials from Secrets Manager for secret: %s", secretArn)
var creds credentials

sess := session.Must(session.NewSession())
svc := secretsmanager.New(sess)

arn, err := arn.Parse(secretArn)
if err != nil {
return creds, err
}
// Remove "secret:" from the resource to get the secret name
secretName := strings.Split(arn.Resource, ":")[1]
// Strip the six random characters at the end of the arn to get the secret name
// https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html
secretNameNoSuffix := secretName[:len(secretName)-7]

log.Debugf("Fetching secret value for secret name: %s", secretNameNoSuffix)

input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretNameNoSuffix),
}

result, err := svc.GetSecretValue(input)
if err != nil {
return creds, err
}

json.Unmarshal([]byte(*result.SecretString), &creds)

return creds, nil
}
Loading

0 comments on commit d959bfe

Please sign in to comment.