Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add delete topic command #100

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ resource type in the cluster. Currently, the following operations are supported:
| `get offsets [topic]` | Number of messages per partition along with start and end times |
| `get topics` | All topics in the cluster |

#### delete

```
topicctl delete [flags] [operation]
```

The `delete` subcommand deletes a particular resource type in the cluster.
Currently, the following operations are supported:
| Subcommand | Description |
| --------- | ----------- |
| `delete topic [topic]` | Deletes a single topic in the cluster |

#### repl

```
Expand All @@ -184,6 +196,8 @@ topicctl repl [flags]
The `repl` subcommand starts up a shell that allows running the `get` and `tail`
subcommands interactively.

By default, `repl` is in read-only mode. Disable this behaviour with: `--read-only-enabled=false`

#### reset-offsets

```
Expand Down
74 changes: 74 additions & 0 deletions cmd/topicctl/subcmd/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package subcmd

import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var deleteCmd = &cobra.Command{
Use: "delete [resource type]",
Short: "delete instances of a particular type",
Long: strings.Join(
[]string{
"Deletes instances of a particular type.",
"Supported types currently include: topic.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any other obvious resource types a delete function would be applicable to - did you have something in mind?

"",
"See the tool README for a detailed description of each one.",
},
"\n",
),
Args: cobra.MinimumNArgs(1),
RunE: deleteRun,
PreRunE: deletePreRun,
}

type deleteCmdConfig struct {
shared sharedOptions
}

var deleteConfig deleteCmdConfig

func init() {
addSharedFlags(deleteCmd, &deleteConfig.shared)
RootCmd.AddCommand(deleteCmd)
}

func deletePreRun(cmd *cobra.Command, args []string) error {
return deleteConfig.shared.validate()
}

func deleteRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)

resource := args[0]

switch resource {
case "topic":
var topicName string

if len(args) == 2 {
topicName = args[1]
} else if len(args) > 2 {
return fmt.Errorf("Can only provide one positional argument with args")
}

return cliRunner.DeleteTopic(ctx, topicName)
default:
return fmt.Errorf("Unrecognized resource type: %s", resource)
}
}
15 changes: 13 additions & 2 deletions cmd/topicctl/subcmd/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@ var replCmd = &cobra.Command{
RunE: replRun,
}

type replCmdOptions struct {
readOnly bool
}

type replCmdConfig struct {
shared sharedOptions
options replCmdOptions
shared sharedOptions
}

var replConfig replCmdConfig

func init() {
replCmd.Flags().BoolVar(
&replConfig.options.readOnly,
"read-only-enabled",
true,
"Use read only mode")

addSharedFlags(replCmd, &replConfig.shared)
RootCmd.AddCommand(replCmd)
}
Expand All @@ -34,7 +45,7 @@ func replRun(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := replConfig.shared.getAdminClient(ctx, sess, true)
adminClient, err := replConfig.shared.getAdminClient(ctx, sess, replConfig.options.readOnly)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,31 @@ func (c *BrokerAdminClient) AssignPartitions(
return err
}

// DeleteTopic deletes a topic in the cluster.
func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.config.ReadOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := &kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.client.DeleteTopics(ctx, req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)

if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AddPartitions extends a topic by adding one or more new partitions to it.
func (c *BrokerAdminClient) AddPartitions(
ctx context.Context,
Expand Down
64 changes: 64 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,70 @@ func TestBrokerClientAddPartitions(t *testing.T) {
assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas)
}

func TestBrokerDeleteTopic(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

topicName := util.RandomString("topic-delete-", 6)
err = client.CreateTopic(
ctx,
kafka.TopicConfig{
Topic: topicName,
NumPartitions: -1,
ReplicationFactor: -1,
ReplicaAssignments: []kafka.ReplicaAssignment{
{
Partition: 0,
Replicas: []int{1, 2},
},
{
Partition: 1,
Replicas: []int{2, 3},
},
{
Partition: 2,
Replicas: []int{3, 4},
},
},
ConfigEntries: []kafka.ConfigEntry{
{
ConfigName: "flush.ms",
ConfigValue: "2000",
},
{
ConfigName: "retention.ms",
ConfigValue: "10000000",
},
},
},
)
require.NoError(t, err)
util.RetryUntil(t, 5*time.Second, func() error {
_, err := client.GetTopic(ctx, topicName, true)
return err
})

err = client.DeleteTopic(ctx, topicName)
require.NoError(t, err)

time.Sleep(time.Second * 5)

_, err = client.GetTopic(ctx, topicName, false)
require.Error(t, err)
}

func TestBrokerClientAlterAssignments(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Client interface {
detailed bool,
) (TopicInfo, error)

// DeleteTopic deletes a single topic in the cluster.
DeleteTopic(ctx context.Context, topic string) error

// UpdateTopicConfig updates the configuration for the argument topic. It returns the config
// keys that were updated.
UpdateTopicConfig(
Expand Down
23 changes: 23 additions & 0 deletions pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,29 @@ func (c *ZKAdminClient) CreateTopic(
return err
}

func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.readOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)
if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AssignPartitions notifies the cluster to begin a partition reassignment.
// This should only be used for existing partitions; to create new partitions,
// use the AddPartitions method.
Expand Down
22 changes: 22 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,28 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error {
return nil
}

// DeleteTopic deletes a single topic.
func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error {
confirm, err := apply.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This asks for confirmation without verifying it's an existing topic which can be confusing. In the interest of taking every step to confirm safe operation, can you add a check here that a topic of the given name exists prior to asking user confirmation?

if err != nil {
return err
}

if !confirm {
return nil
}

c.startSpinner()
err = c.adminClient.DeleteTopic(ctx, topic)
c.stopSpinner()
if err != nil {
return err
}
c.printer("Success")

return nil
}

// ResetOffsets resets the offsets for a single consumer group / topic combination.
func (c *CLIRunner) ResetOffsets(
ctx context.Context,
Expand Down
38 changes: 38 additions & 0 deletions pkg/cli/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
Text: "get",
Description: "Get information about one or more resources in the cluster",
},
{
Text: "delete",
Description: "Delete a resource in the cluster",
},
{
Text: "tail",
Description: "Tail all messages in a topic",
Expand All @@ -38,6 +42,13 @@ var (
},
}

deleteSuggestions = []prompt.Suggest{
{
Text: "topic",
Description: "Delete a single topic",
},
}

getSuggestions = []prompt.Suggest{
{
Text: "balance",
Expand Down Expand Up @@ -219,6 +230,25 @@ func (r *Repl) executor(in string) {
case "exit":
fmt.Println("Bye!")
os.Exit(0)
case "delete":
if len(command.args) == 1 {
log.Error("Unrecognized input. Run 'help' for details on available commands.")
return
}

switch command.args[1] {
case "topic":
if err := command.checkArgs(3, 3, nil); err != nil {
log.Errorf("Error: %+v", err)
return
}

topicName := command.args[2]
if err := r.cliRunner.DeleteTopic(ctx, topicName); err != nil {
log.Errorf("Error: %+v", err)
return
}
}
case "get":
if len(command.args) == 1 {
log.Error("Unrecognized input. Run 'help' for details on available commands.")
Expand Down Expand Up @@ -384,6 +414,10 @@ func (r *Repl) completer(doc prompt.Document) []prompt.Suggest {
suggestions = commandSuggestions
} else if len(words) == 2 && words[0] == "get" {
suggestions = getSuggestions
} else if len(words) == 2 && words[0] == "delete" {
suggestions = deleteSuggestions
} else if len(words) == 3 && words[0] == "delete" && (words[1] == "topic") {
suggestions = r.topicSuggestions
} else if len(words) == 3 && words[0] == "get" &&
(words[1] == "balance" ||
words[1] == "lags" ||
Expand Down Expand Up @@ -467,6 +501,10 @@ func helpTable() string {
" get topics",
"Get all topics",
},
{
" delete topic",
"Deletes a single topic",
},
{
" tail [topic] [optional filter regexp] [--raw]",
"Tail all messages in a topic",
Expand Down