Skip to content

Commit

Permalink
feat: create acls (#165)
Browse files Browse the repository at this point in the history
* bump kafka-go to include acl apis

* add acl interfaces and aclinfo type stub

* pull latest kafka-go and use kafka-go aclresource type

* wip

* fix test

* fix typos

* get acls working

* getacls working

* upgrade cobra to latest

* finish separating get into separate subcommands

* remove unneeded variables

* wip

* pr feedback

* Revert "upgrade cobra to latest"

This reverts commit 7b8ee42.

* use getCliRunnerAndCtx in get acls

* more consistent variable names

* custom cobra type

* bring in new kafka-go

* support resource pattern type

* add support for acloperationtype and remove options for unknown

* improve descriptions

* support permissiontype and host filters

* add resource name filter and fix permission type formatting

* support principal filtering

* improve docs

* add examples

* remove comment

* remove TODOs that are complete

* remove TODOs that are complete

* update README

* fix test

* wip

* fix error handling

* error handling for zk

* more consistent error msg

* clean up createacl

* add TestBrokerClientCreateACLReadOnly

* improve zk tests

* run acl tests in ci

* enable acls for kafka 2.4.1 in ci

* fix zk tests

* skip TestBrokerClientCreateACLReadOnly on old versions of kafka

* try to debug

* handle nested errors from createacls

* operations -> operation

* operations -> operation

* remove setting log level in test

* clean up allowed types in help command

* fix merge conflict

* fix test

* add json annotations

* bump kafka-go to version on main

* wip

* basic tests

* start on getusers cmd

* add json annotations

* get users working

* wip

* add todos and fix type annotaitons

* improve test

* use CanTestBrokerAdminSecurity to feature flag test

* update README

* remove duplicate test from merge conflicts

* fix more merge conflicts

* create user working

* add uncommitted files

* start adding validation

* meta validation for users

* wip

* support dry run and skip confirm

* wip

* wip

* add more files

* resourcemta

* consistency checking for acls

* remove emacs backups

* remove user stuff

* remove diff from cluster.yaml file

* remove diff from topic file

* remove debug log

* smaller diff

* remove completed todos

* remove unused error helper

* add missing meta file

* skip ACL tests when ACLs cannot be used due to kafka version limitations

* fix loadacls test

* add more todos

* add validation and set defaults

* don't use ioutil

* move confirm to util package

* move confirm to util package

* add create to README

* use validation and setdefaults

* add example acl

* fix formatting in readme

* use released version of kafka-go

* fix spelling

* make invalid field more obvious

* fix dryrun and skip confirm

* fix grammar
  • Loading branch information
petedannemann authored Dec 13, 2023
1 parent 4e44ea4 commit e9241f4
Show file tree
Hide file tree
Showing 32 changed files with 1,546 additions and 108 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ vendor/
build/

.vscode

# Emacs backups
*~
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ The `check` command validates that each topic config has the correct fields set
consistent with the associated cluster config. Unless `--validate-only` is set, it then
checks the topic config against the state of the topic in the corresponding cluster.

#### create
```
topicctl create [flags] [command]
```

The `create` command creates resources in the cluster from a configuration file.
Currently, only ACLs are supported. The create command is separate from the apply
command as it is intended for usage with immutable resources managed by topicctl.

#### get

```
Expand Down Expand Up @@ -419,6 +428,47 @@ This subcommand will not rebalance a topic if:
1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config
1. a topic does not exist in the kafka cluster

### ACLs

Sets of ACLs can be configured in a YAML file. The following is an
annotated example:

```yaml
meta:
name: acls-test # Name of the group of ACLs
cluster: my-cluster # Name of the cluster
environment: stage # Environment of the cluster
region: us-west-2 # Region of the cluster
description: | # Free-text description of the topic (optional)
Test topic in my-cluster.
labels: # Custom key-value pairs purposed for ACL bookkeeping (optional)
key1: value1
key2: value2
spec:
acls:
- resource:
type: topic # Type of resource (topic, group, cluster, etc.)
name: test-topic # Name of the resource to apply an ACL to
patternType: literal # Type of pattern (literal, prefixed, etc.)
principal: User:my-user # Principal to apply the ACL to
host: * # Host to apply the ACL to
permission: allow # Permission to apply (allow, deny)
operations: # List of operations to use for the ACLs
- read
- describe
```

The `cluster`, `environment`, and `region` fields are used for matching
against a cluster config and double-checking that the cluster we're applying
in is correct; they don't appear in any API calls.

See the [Kafka documentation](https://kafka.apache.org/documentation/#security_authz_primitives)
for more details on the parameters that can be set in the `acls` field.

Multiple groups of ACLs can be included in the same file, separated by `---` lines, provided
that they reference the same cluster.

## Tool safety

The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make
Expand All @@ -441,6 +491,9 @@ The `apply` subcommand can make changes, but under the following conditions:

The `reset-offsets` command can also make changes in the cluster and should be used carefully.

The `create` command can be used to create new resources in the cluster. It cannot be used with
mutable resources.

### Idempotency

Apply runs are designed to be idemponent- the effects should be the same no matter how many
Expand Down
201 changes: 201 additions & 0 deletions cmd/topicctl/subcmd/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package subcmd

import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/config"
"github.com/segmentio/topicctl/pkg/create"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var createCmd = &cobra.Command{
Use: "create [resource type]",
Short: "creates one or more resources",
PersistentPreRunE: createPreRun,
}

type createCmdConfig struct {
dryRun bool
pathPrefix string
skipConfirm bool

shared sharedOptions
}

var createConfig createCmdConfig

func init() {
createCmd.PersistentFlags().BoolVar(
&createConfig.dryRun,
"dry-run",
false,
"Do a dry-run",
)
createCmd.PersistentFlags().StringVar(
&createConfig.pathPrefix,
"path-prefix",
os.Getenv("TOPICCTL_ACL_PATH_PREFIX"),
"Prefix for ACL config paths",
)
createCmd.PersistentFlags().BoolVar(
&createConfig.skipConfirm,
"skip-confirm",
false,
"Skip confirmation prompts during creation process",
)

addSharedFlags(createCmd, &createConfig.shared)
createCmd.AddCommand(
createACLsCmd(),
)
RootCmd.AddCommand(createCmd)
}

func createPreRun(cmd *cobra.Command, args []string) error {
if err := RootCmd.PersistentPreRunE(cmd, args); err != nil {
return err
}
return createConfig.shared.validate()
}

func createACLsCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "acls [acl configs]",
Short: "creates ACLs from configuration files",
Args: cobra.MinimumNArgs(1),
RunE: createACLRun,
PreRunE: createPreRun,
}

return cmd
}

func createACLRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()

// Keep a cache of the admin clients with the cluster config path as the key
adminClients := map[string]admin.Client{}

defer func() {
for _, adminClient := range adminClients {
adminClient.Close()
}
}()

matchCount := 0

for _, arg := range args {
if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) {
arg = filepath.Join(createConfig.pathPrefix, arg)
}

matches, err := filepath.Glob(arg)
if err != nil {
return err
}

for _, match := range matches {
matchCount++
if err := createACL(ctx, match, adminClients); err != nil {
return err
}
}
}

if matchCount == 0 {
return fmt.Errorf("No ACL configs match the provided args (%+v)", args)
}

return nil
}

func createACL(
ctx context.Context,
aclConfigPath string,
adminClients map[string]admin.Client,
) error {
clusterConfigPath, err := clusterConfigForACLCreate(aclConfigPath)
if err != nil {
return err
}

aclConfigs, err := config.LoadACLsFile(aclConfigPath)
if err != nil {
return err
}

clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv)
if err != nil {
return err
}

adminClient, ok := adminClients[clusterConfigPath]
if !ok {
adminClient, err = clusterConfig.NewAdminClient(
ctx,
nil,
createConfig.dryRun,
createConfig.shared.saslUsername,
createConfig.shared.saslPassword,
)
if err != nil {
return err
}
adminClients[clusterConfigPath] = adminClient
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)

for _, aclConfig := range aclConfigs {
aclConfig.SetDefaults()
log.Infof(
"Processing ACL %s in config %s with cluster config %s",
aclConfig.Meta.Name,
aclConfigPath,
clusterConfigPath,
)

creatorConfig := create.ACLCreatorConfig{
DryRun: createConfig.dryRun,
SkipConfirm: createConfig.skipConfirm,
ACLConfig: aclConfig,
ClusterConfig: clusterConfig,
}

if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil {
return err
}
}

return nil
}

func clusterConfigForACLCreate(aclConfigPath string) (string, error) {
if createConfig.shared.clusterConfig != "" {
return createConfig.shared.clusterConfig, nil
}

return filepath.Abs(
filepath.Join(
filepath.Dir(aclConfigPath),
"..",
"cluster.yaml",
),
)
}
5 changes: 3 additions & 2 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package subcmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"os"
"os/signal"
"path/filepath"
"strconv"
"syscall"
"time"

"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
Expand Down Expand Up @@ -159,7 +160,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {

for _, topicConfig := range topicConfigs {
// topic config should be consistent with the cluster config
if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil {
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"fmt"
"strconv"

"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/groups"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -167,7 +167,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error {
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/topicctl/subcmd/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/topicctl/pkg/apply"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func runTestReader(ctx context.Context) error {
testerConfig.readConsumer,
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func runTestWriter(ctx context.Context) error {
testerConfig.writeRate,
)

ok, _ := apply.Confirm("OK to continue?", false)
ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}
Expand Down
31 changes: 31 additions & 0 deletions examples/auth/acls/acl-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
meta:
name: acl-default
cluster: local-cluster-auth
environment: local-env
region: local-region
description: |
This is a default ACL for the local cluster.
It grants read and describe access to the topic `my-topic` and read access to the group `my-group`
to the user `default`.
spec:
acls:
- resource:
type: topic
name: my-topic
patternType: literal
principal: 'User:default'
host: '*'
permission: allow
operations:
- Read
- Describe
- resource:
type: group
name: my-group
patternType: prefixed
principal: 'User:default'
host: '*'
permission: allow
operations:
- Read
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/olekukonko/tablewriter v0.0.5
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/segmentio/kafka-go v0.4.44
github.com/segmentio/kafka-go v0.4.45
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
Expand Down
Loading

0 comments on commit e9241f4

Please sign in to comment.