From d959bfe3dbdac2dd27f3d6b2c908f829100a3835 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Fri, 29 Dec 2023 11:10:44 -0500 Subject: [PATCH] new field for secretsmanager instead of hacking existing password field --- cmd/topicctl/subcmd/apply.go | 9 ++-- cmd/topicctl/subcmd/bootstrap.go | 9 ++-- cmd/topicctl/subcmd/check.go | 9 ++-- cmd/topicctl/subcmd/create.go | 9 ++-- cmd/topicctl/subcmd/rebalance.go | 9 ++-- cmd/topicctl/subcmd/shared.go | 77 +++++++++++++------------------- pkg/admin/connector.go | 75 ++++++++++++++++++++++++++----- pkg/config/cluster.go | 57 +++++++++++++++++------ pkg/config/cluster_test.go | 44 ++++++++++++++++++ 9 files changed, 214 insertions(+), 84 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 8bed22d6..14b22d80 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -197,9 +197,12 @@ func applyTopic( adminClient, err = clusterConfig.NewAdminClient( ctx, nil, - applyConfig.dryRun, - applyConfig.shared.saslUsername, - applyConfig.shared.saslPassword, + config.AdminClientOpts{ + ReadOnly: applyConfig.dryRun, + UsernameOverride: applyConfig.shared.saslUsername, + PasswordOverride: applyConfig.shared.saslPassword, + SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn, + }, ) if err != nil { return err diff --git a/cmd/topicctl/subcmd/bootstrap.go b/cmd/topicctl/subcmd/bootstrap.go index 14502aca..18c3f04d 100644 --- a/cmd/topicctl/subcmd/bootstrap.go +++ b/cmd/topicctl/subcmd/bootstrap.go @@ -72,9 +72,12 @@ func bootstrapRun(cmd *cobra.Command, args []string) error { adminClient, err := clusterConfig.NewAdminClient( ctx, nil, - true, - bootstrapConfig.shared.saslUsername, - bootstrapConfig.shared.saslPassword, + config.AdminClientOpts{ + ReadOnly: true, + UsernameOverride: bootstrapConfig.shared.saslUsername, + PasswordOverride: bootstrapConfig.shared.saslPassword, + SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn, + }, ) if err != nil { return err diff --git a/cmd/topicctl/subcmd/check.go b/cmd/topicctl/subcmd/check.go index 624cfe28..12b4175f 100644 --- a/cmd/topicctl/subcmd/check.go +++ b/cmd/topicctl/subcmd/check.go @@ -137,9 +137,12 @@ func checkTopicFile( adminClient, err = clusterConfig.NewAdminClient( ctx, nil, - true, - checkConfig.shared.saslUsername, - checkConfig.shared.saslPassword, + config.AdminClientOpts{ + ReadOnly: true, + UsernameOverride: checkConfig.shared.saslUsername, + PasswordOverride: checkConfig.shared.saslPassword, + SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn, + }, ) if err != nil { return false, err diff --git a/cmd/topicctl/subcmd/create.go b/cmd/topicctl/subcmd/create.go index 7bef8883..2286fb7d 100644 --- a/cmd/topicctl/subcmd/create.go +++ b/cmd/topicctl/subcmd/create.go @@ -150,9 +150,12 @@ func createACL( adminClient, err = clusterConfig.NewAdminClient( ctx, nil, - createConfig.dryRun, - createConfig.shared.saslUsername, - createConfig.shared.saslPassword, + config.AdminClientOpts{ + ReadOnly: createConfig.dryRun, + UsernameOverride: createConfig.shared.saslUsername, + PasswordOverride: createConfig.shared.saslPassword, + SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn, + }, ) if err != nil { return err diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index a85150d6..7823a251 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -125,9 +125,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { adminClient, err := clusterConfig.NewAdminClient(ctx, nil, - rebalanceConfig.dryRun, - rebalanceConfig.shared.saslUsername, - rebalanceConfig.shared.saslPassword, + config.AdminClientOpts{ + ReadOnly: rebalanceConfig.dryRun, + UsernameOverride: rebalanceConfig.shared.saslUsername, + PasswordOverride: rebalanceConfig.shared.saslPassword, + SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn, + }, ) if err != nil { log.Fatal(err) diff --git a/cmd/topicctl/subcmd/shared.go b/cmd/topicctl/subcmd/shared.go index 4abb9db8..d5aa5ccb 100644 --- a/cmd/topicctl/subcmd/shared.go +++ b/cmd/topicctl/subcmd/shared.go @@ -3,14 +3,9 @@ package subcmd import ( "context" "errors" - "fmt" "os" - "strings" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/secretsmanager" "github.com/hashicorp/go-multierror" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/config" @@ -19,20 +14,21 @@ import ( ) type sharedOptions struct { - brokerAddr string - clusterConfig string - expandEnv bool - saslMechanism string - saslPassword string - saslUsername string - tlsCACert string - tlsCert string - tlsEnabled bool - tlsKey string - tlsSkipVerify bool - tlsServerName string - zkAddr string - zkPrefix string + brokerAddr string + clusterConfig string + expandEnv bool + saslMechanism string + saslPassword string + saslUsername string + saslSecretsManagerArn string + tlsCACert string + tlsCert string + tlsEnabled bool + tlsKey string + tlsSkipVerify bool + tlsServerName string + zkAddr string + zkPrefix string } func (s sharedOptions) validate() error { @@ -81,7 +77,7 @@ func (s sharedOptions) validate() error { } useTLS := s.tlsEnabled || s.tlsCACert != "" || s.tlsCert != "" || s.tlsKey != "" - useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != "" + useSASL := s.saslMechanism != "" || s.saslPassword != "" || s.saslUsername != "" || s.saslSecretsManagerArn != "" if useTLS && s.zkAddr != "" { log.Warn("TLS flags are ignored accessing cluster via zookeeper") @@ -100,6 +96,10 @@ func (s sharedOptions) validate() error { (s.saslUsername != "" || s.saslPassword != "") { log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM") } + + if s.saslUsername != "" || s.saslPassword != "" && s.saslSecretsManagerArn != "" { + err = multierror.Append(err, errors.New("Cannot set both sasl-username or sasl-password and sasl-secrets-manager-arn")) + } } return err @@ -118,9 +118,12 @@ func (s sharedOptions) getAdminClient( return clusterConfig.NewAdminClient( ctx, sess, - readOnly, - s.saslUsername, - s.saslPassword, + config.AdminClientOpts{ + ReadOnly: readOnly, + UsernameOverride: s.saslUsername, + PasswordOverride: s.saslPassword, + SecretsManagerArnOverride: s.saslSecretsManagerArn, + }, ) } else if s.brokerAddr != "" { tlsEnabled := (s.tlsEnabled || @@ -141,25 +144,6 @@ func (s sharedOptions) getAdminClient( } } - if strings.HasPrefix(s.saslPassword, "arn:aws:secretsmanager:") { - log.Debug("Fetching password from AWS secrets manager...") - sess := session.Must(session.NewSession()) - - svc := secretsmanager.New(sess, aws.NewConfig()) - - arn, err := arn.Parse(s.saslPassword) - if err != nil { - return nil, fmt.Errorf("failed to parse ARN from password: %v", err) - } - - secretValue, err := svc.GetSecretValue(&secretsmanager.GetSecretValueInput{SecretId: aws.String(arn.Resource)}) - if err != nil { - return nil, fmt.Errorf("failed to get secret value: %v", err) - } - - s.saslPassword = *secretValue.SecretString - } - return admin.NewBrokerAdminClient( ctx, admin.BrokerAdminClientConfig{ @@ -174,10 +158,11 @@ func (s sharedOptions) getAdminClient( SkipVerify: s.tlsSkipVerify, }, SASL: admin.SASLConfig{ - Enabled: saslEnabled, - Mechanism: saslMechanism, - Password: s.saslPassword, - Username: s.saslUsername, + Enabled: saslEnabled, + Mechanism: saslMechanism, + Password: s.saslPassword, + Username: s.saslUsername, + SecretsManagerArn: s.saslSecretsManagerArn, }, }, ReadOnly: readOnly, diff --git a/pkg/admin/connector.go b/pkg/admin/connector.go index 319d9633..0c97555d 100644 --- a/pkg/admin/connector.go +++ b/pkg/admin/connector.go @@ -3,14 +3,17 @@ package admin import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "os" "strings" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/aws/session" sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/aws/aws-sdk-go/service/secretsmanager" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/aws_msk_iam" @@ -48,10 +51,11 @@ type TLSConfig struct { // SASLConfig stores the SASL-related configuration for a connection. type SASLConfig struct { - Enabled bool - Mechanism SASLMechanism - Username string - Password string + Enabled bool + Mechanism SASLMechanism + Username string + Password string + SecretsManagerArn string } // Connector is a wrapper around the low-level, kafka-go dialer and client. @@ -72,6 +76,19 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { var err error if config.SASL.Enabled { + saslUsername := config.SASL.Username + saslPassword := config.SASL.Password + + if config.SASL.SecretsManagerArn != "" { + creds, err := fetchCredentialsFromSecretsManager(config.SASL.SecretsManagerArn) + if err != nil { + return nil, err + } + + saslUsername = creds.Username + saslPassword = creds.Password + } + switch config.SASL.Mechanism { case SASLMechanismAWSMSKIAM: sess := session.Must(session.NewSession()) @@ -84,14 +101,14 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { } case SASLMechanismPlain: mechanismClient = plain.Mechanism{ - Username: config.SASL.Username, - Password: config.SASL.Password, + Username: saslUsername, + Password: saslPassword, } case SASLMechanismScramSHA256: mechanismClient, err = scram.Mechanism( scram.SHA256, - config.SASL.Username, - config.SASL.Password, + saslUsername, + saslPassword, ) if err != nil { return nil, err @@ -99,8 +116,8 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { case SASLMechanismScramSHA512: mechanismClient, err = scram.Mechanism( scram.SHA512, - config.SASL.Username, - config.SASL.Password, + saslUsername, + saslPassword, ) if err != nil { return nil, err @@ -193,3 +210,41 @@ func SASLNameToMechanism(name string) (SASLMechanism, error) { ) } } + +type credentials struct { + Username string `json:"username"` + Password string `json:"password"` +} + +func fetchCredentialsFromSecretsManager(secretArn string) (credentials, error) { + log.Debugf("Fetching credentials from Secrets Manager for secret: %s", secretArn) + var creds credentials + + sess := session.Must(session.NewSession()) + svc := secretsmanager.New(sess) + + arn, err := arn.Parse(secretArn) + if err != nil { + return creds, err + } + // Remove "secret:" from the resource to get the secret name + secretName := strings.Split(arn.Resource, ":")[1] + // Strip the six random characters at the end of the arn to get the secret name + // https://docs.aws.amazon.com/secretsmanager/latest/userguide/getting-started.html + secretNameNoSuffix := secretName[:len(secretName)-7] + + log.Debugf("Fetching secret value for secret name: %s", secretNameNoSuffix) + + input := &secretsmanager.GetSecretValueInput{ + SecretId: aws.String(secretNameNoSuffix), + } + + result, err := svc.GetSecretValue(input) + if err != nil { + return creds, err + } + + json.Unmarshal([]byte(*result.SecretString), &creds) + + return creds, nil +} diff --git a/pkg/config/cluster.go b/pkg/config/cluster.go index a5ea4fb8..47458cdf 100644 --- a/pkg/config/cluster.go +++ b/pkg/config/cluster.go @@ -111,6 +111,10 @@ type SASLConfig struct { // Password is the SASL password. Ignored if mechanism is AWS-MSK-IAM. Password string `json:"password"` + + // SecretsManagerArn is the ARN of the AWS Secrets Manager secret containing the SASL credentials. + // Ignored if mechanism is AWS-MSK-IAM. Username and Password will be ignored if this is set. + SecretsManagerArn string `json:"secretsManagerArn"` } // Validate evaluates whether the cluster config is valid. @@ -165,6 +169,19 @@ func (c ClusterConfig) Validate() error { (c.Spec.SASL.Username != "" || c.Spec.SASL.Password != "") { log.Warn("Username and password are ignored if using SASL AWS-MSK-IAM") } + + if saslMechanism == admin.SASLMechanismAWSMSKIAM && + c.Spec.SASL.SecretsManagerArn != "" { + log.Warn("SecretsManagerArn is ignored if using SASL AWS-MSK-IAM") + } + + if c.Spec.SASL.SecretsManagerArn != "" && + (c.Spec.SASL.Username != "" || c.Spec.SASL.Password != "") { + err = multierror.Append( + err, + errors.New("Username and password are ignored if using SecretsManagerArn"), + ) + } } return err @@ -180,33 +197,46 @@ func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, err return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr) } +type AdminClientOpts struct { + ReadOnly bool + UsernameOverride string + PasswordOverride string + SecretsManagerArnOverride string +} + // NewAdminClient returns a new admin client using the parameters in the current cluster config. func (c ClusterConfig) NewAdminClient( ctx context.Context, sess *session.Session, - readOnly bool, - usernameOverride string, - passwordOverride string, + opts AdminClientOpts, ) (admin.Client, error) { if len(c.Spec.ZKAddrs) == 0 { log.Debug("No ZK addresses provided, using broker admin client") var saslUsername string var saslPassword string - if usernameOverride != "" { + var secretsManagerArn string + if opts.UsernameOverride != "" { log.Debugf("Setting SASL username from override value") - saslUsername = usernameOverride + saslUsername = opts.UsernameOverride } else { saslUsername = c.Spec.SASL.Username } - if passwordOverride != "" { + if opts.PasswordOverride != "" { log.Debugf("Setting SASL password from override value") - saslPassword = passwordOverride + saslPassword = opts.PasswordOverride } else { saslPassword = c.Spec.SASL.Password } + if opts.SecretsManagerArnOverride != "" { + log.Debugf("Setting SASL SecretsManagerArn from override value") + secretsManagerArn = opts.SecretsManagerArnOverride + } else { + secretsManagerArn = c.Spec.SASL.SecretsManagerArn + } + var saslMechanism admin.SASLMechanism var err error @@ -231,14 +261,15 @@ func (c ClusterConfig) NewAdminClient( SkipVerify: c.Spec.TLS.SkipVerify, }, SASL: admin.SASLConfig{ - Enabled: c.Spec.SASL.Enabled, - Mechanism: saslMechanism, - Username: saslUsername, - Password: saslPassword, + Enabled: c.Spec.SASL.Enabled, + Mechanism: saslMechanism, + Username: saslUsername, + Password: saslPassword, + SecretsManagerArn: secretsManagerArn, }, }, ExpectedClusterID: c.Spec.ClusterID, - ReadOnly: readOnly, + ReadOnly: opts.ReadOnly, }, ) } else { @@ -251,7 +282,7 @@ func (c ClusterConfig) NewAdminClient( BootstrapAddrs: c.Spec.BootstrapAddrs, ExpectedClusterID: c.Spec.ClusterID, Sess: sess, - ReadOnly: readOnly, + ReadOnly: opts.ReadOnly, }, ) } diff --git a/pkg/config/cluster_test.go b/pkg/config/cluster_test.go index c0987983..b8ea3845 100644 --- a/pkg/config/cluster_test.go +++ b/pkg/config/cluster_test.go @@ -92,6 +92,50 @@ func TestClusterValidate(t *testing.T) { }, expError: true, }, + { + description: "secrets manager set", + clusterConfig: ClusterConfig{ + Meta: ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + Description: "test-description", + }, + Spec: ClusterSpec{ + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + SASL: SASLConfig{ + Enabled: true, + Mechanism: "plain", + SecretsManagerArn: "arn:aws:secretsmanager:::secret:SecretName-xxxxxx", + }, + }, + }, + expError: true, + }, + { + description: "secrets manager cannot be set with username and password", + clusterConfig: ClusterConfig{ + Meta: ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + Description: "test-description", + }, + Spec: ClusterSpec{ + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + SASL: SASLConfig{ + Enabled: true, + Mechanism: "plain", + Username: "user", + Password: "password", + SecretsManagerArn: "arn:aws:secretsmanager:::secret:SecretName-xxxxxx", + }, + }, + }, + expError: true, + }, } for _, testCase := range testCases {