From ebc587a650f0e8c2b042b2813e7d0a752af4f168 Mon Sep 17 00:00:00 2001 From: rislah Date: Thu, 6 Oct 2022 09:00:30 +0300 Subject: [PATCH 1/2] add delete topic command * repl is set to read-only by default * update README --- README.md | 14 +++++++ cmd/topicctl/subcmd/delete.go | 74 ++++++++++++++++++++++++++++++++++ cmd/topicctl/subcmd/repl.go | 15 ++++++- pkg/admin/brokerclient.go | 25 ++++++++++++ pkg/admin/brokerclient_test.go | 64 +++++++++++++++++++++++++++++ pkg/admin/client.go | 3 ++ pkg/admin/zkclient.go | 23 +++++++++++ pkg/cli/cli.go | 22 ++++++++++ pkg/cli/repl.go | 38 +++++++++++++++++ 9 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 cmd/topicctl/subcmd/delete.go diff --git a/README.md b/README.md index 1d2c911d..61e06041 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,18 @@ resource type in the cluster. Currently, the following operations are supported: | `get offsets [topic]` | Number of messages per partition along with start and end times | | `get topics` | All topics in the cluster | +#### delete + +``` +topicctl delete [flags] [operation] +``` + +The `delete` subcommand deletes a particular resource type in the cluster. +Currently, the following operations are supported: +| Subcommand | Description | +| --------- | ----------- | +| `delete topic [topic]` | Deletes a single topic in the cluster | + #### repl ``` @@ -184,6 +196,8 @@ topicctl repl [flags] The `repl` subcommand starts up a shell that allows running the `get` and `tail` subcommands interactively. +By default, `repl` is in read-only mode. Disable this behaviour with: `--read-only-enabled=false` + #### reset-offsets ``` diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go new file mode 100644 index 00000000..1f8b6a30 --- /dev/null +++ b/cmd/topicctl/subcmd/delete.go @@ -0,0 +1,74 @@ +package subcmd + +import ( + "context" + "fmt" + "strings" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/topicctl/pkg/cli" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var deleteCmd = &cobra.Command{ + Use: "delete [resource type]", + Short: "delete instances of a particular type", + Long: strings.Join( + []string{ + "Deletes instances of a particular type.", + "Supported types currently include: topic.", + "", + "See the tool README for a detailed description of each one.", + }, + "\n", + ), + Args: cobra.MinimumNArgs(1), + RunE: deleteRun, + PreRunE: deletePreRun, +} + +type deleteCmdConfig struct { + shared sharedOptions +} + +var deleteConfig deleteCmdConfig + +func init() { + addSharedFlags(deleteCmd, &deleteConfig.shared) + RootCmd.AddCommand(deleteCmd) +} + +func deletePreRun(cmd *cobra.Command, args []string) error { + return deleteConfig.shared.validate() +} + +func deleteRun(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + + resource := args[0] + + switch resource { + case "topic": + var topicName string + + if len(args) == 2 { + topicName = args[1] + } else if len(args) > 2 { + return fmt.Errorf("Can only provide one positional argument with args") + } + + return cliRunner.DeleteTopic(ctx, topicName) + default: + return fmt.Errorf("Unrecognized resource type: %s", resource) + } +} diff --git a/cmd/topicctl/subcmd/repl.go b/cmd/topicctl/subcmd/repl.go index c3f8aaf5..c0978083 100644 --- a/cmd/topicctl/subcmd/repl.go +++ b/cmd/topicctl/subcmd/repl.go @@ -15,13 +15,24 @@ var replCmd = &cobra.Command{ RunE: replRun, } +type replCmdOptions struct { + readOnly bool +} + type replCmdConfig struct { - shared sharedOptions + options replCmdOptions + shared sharedOptions } var replConfig replCmdConfig func init() { + replCmd.Flags().BoolVar( + &replConfig.options.readOnly, + "read-only-enabled", + true, + "Use read only mode") + addSharedFlags(replCmd, &replConfig.shared) RootCmd.AddCommand(replCmd) } @@ -34,7 +45,7 @@ func replRun(cmd *cobra.Command, args []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) - adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true) + adminClient, err := replConfig.shared.getAdminClient(ctx, sess, replConfig.options.readOnly) if err != nil { return err } diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 0922d401..18342919 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -498,6 +498,31 @@ func (c *BrokerAdminClient) AssignPartitions( return err } +// DeleteTopic deletes a topic in the cluster. +func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.config.ReadOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := &kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.client.DeleteTopics(ctx, req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AddPartitions extends a topic by adding one or more new partitions to it. func (c *BrokerAdminClient) AddPartitions( ctx context.Context, diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 89834bb4..7ce75c38 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -387,6 +387,70 @@ func TestBrokerClientAddPartitions(t *testing.T) { assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas) } +func TestBrokerDeleteTopic(t *testing.T) { + if !util.CanTestBrokerAdmin() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + topicName := util.RandomString("topic-delete-", 6) + err = client.CreateTopic( + ctx, + kafka.TopicConfig{ + Topic: topicName, + NumPartitions: -1, + ReplicationFactor: -1, + ReplicaAssignments: []kafka.ReplicaAssignment{ + { + Partition: 0, + Replicas: []int{1, 2}, + }, + { + Partition: 1, + Replicas: []int{2, 3}, + }, + { + Partition: 2, + Replicas: []int{3, 4}, + }, + }, + ConfigEntries: []kafka.ConfigEntry{ + { + ConfigName: "flush.ms", + ConfigValue: "2000", + }, + { + ConfigName: "retention.ms", + ConfigValue: "10000000", + }, + }, + }, + ) + require.NoError(t, err) + util.RetryUntil(t, 5*time.Second, func() error { + _, err := client.GetTopic(ctx, topicName, true) + return err + }) + + err = client.DeleteTopic(ctx, topicName) + require.NoError(t, err) + + time.Sleep(time.Second * 5) + + _, err = client.GetTopic(ctx, topicName, false) + require.Error(t, err) +} + func TestBrokerClientAlterAssignments(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/client.go b/pkg/admin/client.go index 733a9ab0..ce8dea5d 100644 --- a/pkg/admin/client.go +++ b/pkg/admin/client.go @@ -38,6 +38,9 @@ type Client interface { detailed bool, ) (TopicInfo, error) + // DeleteTopic deletes a single topic in the cluster. + DeleteTopic(ctx context.Context, topic string) error + // UpdateTopicConfig updates the configuration for the argument topic. It returns the config // keys that were updated. UpdateTopicConfig( diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 2f95c1fc..96ce071d 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -578,6 +578,29 @@ func (c *ZKAdminClient) CreateTopic( return err } +func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error { + if c.readOnly { + return errors.New("Cannot delete topics in read-only mode") + } + + req := kafka.DeleteTopicsRequest{ + Topics: []string{topic}, + } + log.Debugf("DeleteTopics request: %+v", req) + + resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req) + log.Debugf("DeleteTopics response: %+v (%+v)", resp, err) + if err != nil { + return err + } + + if err, ok := resp.Errors[topic]; ok { + return err + } + + return nil +} + // AssignPartitions notifies the cluster to begin a partition reassignment. // This should only be used for existing partitions; to create new partitions, // use the AddPartitions method. diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index a00215c1..cac8b412 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -484,6 +484,28 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { return nil } +// DeleteTopic deletes a single topic. +func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error { + confirm, err := apply.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false) + if err != nil { + return err + } + + if !confirm { + return nil + } + + c.startSpinner() + err = c.adminClient.DeleteTopic(ctx, topic) + c.stopSpinner() + if err != nil { + return err + } + c.printer("Success") + + return nil +} + // ResetOffsets resets the offsets for a single consumer group / topic combination. func (c *CLIRunner) ResetOffsets( ctx context.Context, diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index d72d1efb..733abb6a 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -24,6 +24,10 @@ var ( Text: "get", Description: "Get information about one or more resources in the cluster", }, + { + Text: "delete", + Description: "Delete a resource in the cluster", + }, { Text: "tail", Description: "Tail all messages in a topic", @@ -38,6 +42,13 @@ var ( }, } + deleteSuggestions = []prompt.Suggest{ + { + Text: "topic", + Description: "Delete a single topic", + }, + } + getSuggestions = []prompt.Suggest{ { Text: "balance", @@ -219,6 +230,25 @@ func (r *Repl) executor(in string) { case "exit": fmt.Println("Bye!") os.Exit(0) + case "delete": + if len(command.args) == 1 { + log.Error("Unrecognized input. Run 'help' for details on available commands.") + return + } + + switch command.args[1] { + case "topic": + if err := command.checkArgs(3, 3, nil); err != nil { + log.Errorf("Error: %+v", err) + return + } + + topicName := command.args[2] + if err := r.cliRunner.DeleteTopic(ctx, topicName); err != nil { + log.Errorf("Error: %+v", err) + return + } + } case "get": if len(command.args) == 1 { log.Error("Unrecognized input. Run 'help' for details on available commands.") @@ -384,6 +414,10 @@ func (r *Repl) completer(doc prompt.Document) []prompt.Suggest { suggestions = commandSuggestions } else if len(words) == 2 && words[0] == "get" { suggestions = getSuggestions + } else if len(words) == 2 && words[0] == "delete" { + suggestions = deleteSuggestions + } else if len(words) == 3 && words[0] == "delete" && (words[1] == "topic") { + suggestions = r.topicSuggestions } else if len(words) == 3 && words[0] == "get" && (words[1] == "balance" || words[1] == "lags" || @@ -467,6 +501,10 @@ func helpTable() string { " get topics", "Get all topics", }, + { + " delete topic", + "Deletes a single topic", + }, { " tail [topic] [optional filter regexp] [--raw]", "Tail all messages in a topic", From dff6cbc19642876fc951f9fc81e786ea0a11e369 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Thu, 9 Nov 2023 10:18:17 -0500 Subject: [PATCH 2/2] check if topic exists first and use proper cobra subcommands --- cmd/topicctl/subcmd/delete.go | 56 ++++++++++++++--------------------- pkg/cli/cli.go | 11 +++++++ 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/cmd/topicctl/subcmd/delete.go b/cmd/topicctl/subcmd/delete.go index 1f8b6a30..f479c406 100644 --- a/cmd/topicctl/subcmd/delete.go +++ b/cmd/topicctl/subcmd/delete.go @@ -2,7 +2,6 @@ package subcmd import ( "context" - "fmt" "strings" "github.com/aws/aws-sdk-go/aws/session" @@ -17,15 +16,10 @@ var deleteCmd = &cobra.Command{ Long: strings.Join( []string{ "Deletes instances of a particular type.", - "Supported types currently include: topic.", - "", - "See the tool README for a detailed description of each one.", }, "\n", ), - Args: cobra.MinimumNArgs(1), - RunE: deleteRun, - PreRunE: deletePreRun, + PersistentPreRunE: deletePreRun, } type deleteCmdConfig struct { @@ -36,6 +30,9 @@ var deleteConfig deleteCmdConfig func init() { addSharedFlags(deleteCmd, &deleteConfig.shared) + deleteCmd.AddCommand( + deleteTopicCmd(), + ) RootCmd.AddCommand(deleteCmd) } @@ -43,32 +40,23 @@ func deletePreRun(cmd *cobra.Command, args []string) error { return deleteConfig.shared.validate() } -func deleteRun(cmd *cobra.Command, args []string) error { - ctx := context.Background() - sess := session.Must(session.NewSession()) - - adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false) - if err != nil { - return err - } - defer adminClient.Close() - - cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - - resource := args[0] - - switch resource { - case "topic": - var topicName string - - if len(args) == 2 { - topicName = args[1] - } else if len(args) > 2 { - return fmt.Errorf("Can only provide one positional argument with args") - } - - return cliRunner.DeleteTopic(ctx, topicName) - default: - return fmt.Errorf("Unrecognized resource type: %s", resource) +func deleteTopicCmd() *cobra.Command { + return &cobra.Command{ + Use: "topic [topic name]", + Short: "Delete a topic", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.DeleteTopic(ctx, args[0]) + }, } } diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 657c663e..3bd4fb7e 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -522,6 +522,17 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { // DeleteTopic deletes a single topic. func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error { + c.printer("Checking if topic %s exists...", topic) + c.startSpinner() + // First check that topic exists + _, err := c.adminClient.GetTopic(ctx, topic, false) + if err != nil { + c.stopSpinner() + return fmt.Errorf("Error fetching topic info: %+v", err) + } + c.stopSpinner() + c.printer("Topic %s exists in the cluster!", topic) + confirm, err := apply.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false) if err != nil { return err