Skip to content

Commit

Permalink
Support --delete option for reset-offsets command
Browse files Browse the repository at this point in the history
  • Loading branch information
extemporalgenome committed Dec 14, 2023
1 parent 8ad90a7 commit 28261b1
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 74 deletions.
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ docker-compose up -d
3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics):

```
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml
topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml
```

4. Send some test messages to the `topic-default` topic:
Expand Down Expand Up @@ -225,13 +225,49 @@ subcommands interactively.
topicctl reset-offsets [topic] [group] [flags]
```

The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:
The `reset-offsets` subcommand allows resetting the offsets
for a consumer group in a topic.
There are a few typical approaches for setting the offsets:

1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration.
1. Use `--partitions` and combine it with one of the offset operators:
`--delete`, `--offset`, `--to-earliest` or `--to-latest`.
2. Use `--partition-offset-map` to pass specific offsets per partition.
For example, `1=5,2=10` means that the consumer group offset
for partition 1 must be set to 5, and partition 2 to offset 10.
This is mainly used for replays of specific traffic,
such as when a deploy has mishandled or corrupted state,
and the prior release must be rerun
starting at a specific offset per partition.
This is the most flexible approach for offset setting.

2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags.
Note that `--partition-offset-map` flag is standalone
and cannot be coupled with other flags.

##### Partition selection flags

At most one of the following may be selected:

* `--partitions` specifies a comma-separated list of partitions IDs.

If none of these are specified,
the command defaults to selecting ALL of the partitions.

##### Offset selection flags

At most one of the following may be selected:

* `--delete` removes stored group offsets.
This will generally have the same effect as `--to-earliest` or `--to-latest`,
depending on the consumer group configuration.
However, `--delete` is more reliable and convenient,
since `--to-earliest` in particular involves a race with message retention
that may require numerous attempts.
* `--offset` indicates the specific value that all selected
consumer group partitions will be set to.
* `--to-earliest` resets group offsets to oldest still-retained per partition.
* `--to-latest` resets group offsets to newest per partitions.

If none of these are specified, `--to-earliest` will be the default.

#### tail

Expand Down
205 changes: 136 additions & 69 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"fmt"
"strconv"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
"github.com/segmentio/topicctl/pkg/groups"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var resetOffsetsCmd = &cobra.Command{
Use: "reset-offsets [topic name] [group name]",
Use: "reset-offsets <topic-name> <group-name>",
Short: "reset consumer group offsets",
Args: cobra.MinimumNArgs(2),
Args: cobra.ExactArgs(2),
PreRunE: resetOffsetsPreRun,
RunE: resetOffsetsRun,
}
Expand All @@ -27,6 +29,7 @@ type resetOffsetsCmdConfig struct {
partitionOffsetMap map[string]int64
toEarliest bool
toLatest bool
delete bool

shared sharedOptions
}
Expand Down Expand Up @@ -62,121 +65,185 @@ func init() {
"to-latest",
false,
"Resets offsets of consumer group members to latest offsets of partitions")
resetOffsetsCmd.Flags().BoolVar(
&resetOffsetsConfig.delete,
"delete",
false,
"Deletes offsets for the given consumer group")

addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared)
RootCmd.AddCommand(resetOffsetsCmd)
}

func resetOffsetsPreRun(cmd *cobra.Command, args []string) error {
resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset."
offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset."

if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") ||
len(resetOffsetsConfig.partitions) > 0 ||
resetOffsetsConfig.toEarliest ||
resetOffsetsConfig.toLatest) {
return errors.New(offsetMapSpecification)
resetOffsetSpec := "You must choose only one of the following " +
"reset-offset specifications: --delete, --to-earliest, --to-latest, " +
"--offset, or --partition-offset-map."
offsetMapSpec := "--partition-offset-map option cannot be used with --partitions."

cfg := resetOffsetsConfig

numOffsetSpecs := numTrue(
cfg.toEarliest,
cfg.toLatest,
cfg.delete,
cmd.Flags().Changed("offset"),
len(cfg.partitionOffsetMap) > 0,
)

} else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest {
return errors.New(resetOffsetSpecification)
if numOffsetSpecs > 1 {
return errors.New(resetOffsetSpec)
}

} else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) {
return errors.New(resetOffsetSpecification)
if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 {
return errors.New(offsetMapSpec)
}
return resetOffsetsConfig.shared.validate()

return cfg.shared.validate()
}

func resetOffsetsRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true)
cfg := resetOffsetsConfig

adminClient, err := cfg.shared.getAdminClient(ctx, nil, true)
if err != nil {
return err
}

defer adminClient.Close()

connector := adminClient.GetConnector()

topic := args[0]
group := args[1]

topicInfo, err := adminClient.GetTopic(ctx, topic, false)
if err != nil {
return err
}
partitionIDsMap := map[int]struct{}{}

partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions))
for _, partitionInfo := range topicInfo.Partitions {
partitionIDsMap[partitionInfo.ID] = struct{}{}
}
var resetOffsetsStrategy string
if resetOffsetsConfig.toLatest {
resetOffsetsStrategy = groups.LatestResetOffsetsStrategy
} else if resetOffsetsConfig.toEarliest {
resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy

var strategy string

switch {
case resetOffsetsConfig.toLatest:
strategy = groups.LatestResetOffsetsStrategy
case resetOffsetsConfig.toEarliest:
strategy = groups.EarliestResetOffsetsStrategy
}
partitionOffsets := map[int]int64{}

if len(resetOffsetsConfig.partitionOffsetMap) > 0 {
for partition, offset := range resetOffsetsConfig.partitionOffsetMap {
var partitionID int
if partitionID, err = strconv.Atoi(partition); err != nil {
return fmt.Errorf("Partition value %s must be a number", partition)
}
if _, ok := partitionIDsMap[partitionID]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic)
}
// If explicit per-partition offsets were specified, set them now.
partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap)
if err != nil {
return err
}

partitionOffsets[partitionID] = offset
// Set explicit partitions (without offsets) if specified,
// otherwise operate on fetched partition info;
// these will only take effect of per-partition offsets were not specified.
partitions := cfg.partitions
if len(partitions) == 0 && len(partitionOffsets) == 0 {
convert := func(info admin.PartitionInfo) int { return info.ID }
partitions = convertSlice(topicInfo.Partitions, convert)
}

for _, partition := range partitions {
_, ok := partitionIDsMap[partition]
if !ok {
format := "Partition %d not found in topic %s"
return fmt.Errorf(format, partition, topic)
}

} else if len(resetOffsetsConfig.partitions) > 0 {
for _, partition := range resetOffsetsConfig.partitions {
if _, ok := partitionIDsMap[partition]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partition, topic)
}
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition)
if err != nil {
return err
}
} else {
partitionOffsets[partition] = resetOffsetsConfig.offset
}

if strategy == "" {
partitionOffsets[partition] = cfg.offset
return nil
}
} else {
for _, partitionInfo := range topicInfo.Partitions {
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID)
if err != nil {
return err
}
} else {
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset
}

offset, err := groups.GetEarliestOrLatestOffset(ctx, connector, topic, strategy, partition)
if err != nil {
return err
}

partitionOffsets[partition] = offset
}

log.Infof(
"This will reset the offsets for the following partitions in topic %s for group %s:\n%s",
"This will reset the offsets for the following partitions "+
"in topic %s for group %s:\n%s",
topic,
group,
groups.FormatPartitionOffsets(partitionOffsets),
)
log.Info(
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
)

log.Info("Please ensure that all other consumers are stopped, " +
"otherwise the reset might be overridden.")

ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.ResetOffsets(
ctx,
topic,
group,
partitionOffsets,
)

if resetOffsetsConfig.delete {
input := groups.DeleteOffsetsInput{
GroupID: group,
Topic: topic,
Partitions: partitions,
}

return cliRunner.DeleteOffsets(ctx, &input)
}

return cliRunner.ResetOffsets(ctx, topic, group, partitionOffsets)
}

func numTrue(bools ...bool) int {
var n int
for _, b := range bools {
if b {
n++
}
}

return n
}

func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 {
out := make([]T2, len(input))

for i, v := range input {
out[i] = fn(v)
}

return out
}

func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) {
out := make(map[int]int64, len(input))

for partition, offset := range input {
partitionID, err := strconv.Atoi(partition)
if err != nil {
format := "Partition value %s must be an integer"
return nil, fmt.Errorf(format, partition)
}

_, ok := partitionIDsMap[partitionID]
if !ok {
format := "Partition %d not found"
return nil, fmt.Errorf(format, partitionID)
}

out[partitionID] = offset
}

return out, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/segmentio/topicctl

go 1.18
go 1.21

require (
github.com/aws/aws-sdk-go v1.44.208
Expand Down
22 changes: 22 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,11 @@ func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
return nil
}

// DeleteOffsets removes offsets for a single consumer group / topic combination.
func (c *CLIRunner) DeleteOffsets(ctx context.Context, input *groups.DeleteOffsetsInput) error {
return invoke(ctx, c, input, groups.DeleteOffsets)
}

// ResetOffsets resets the offsets for a single consumer group / topic combination.
func (c *CLIRunner) ResetOffsets(
ctx context.Context,
Expand Down Expand Up @@ -649,6 +654,7 @@ func (c *CLIRunner) Tail(
10e3,
10e6,
)

stats, err := tailer.LogMessages(ctx, maxMessages, filterRegexp, raw, headers)
filtered := filterRegexp != ""

Expand Down Expand Up @@ -689,6 +695,22 @@ func (c *CLIRunner) stopSpinner() {
}
}

type invokeFunc[T any] func(context.Context, *admin.Connector, T) error

func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) error {
c.startSpinner()

err := fn(ctx, c.adminClient.GetConnector(), v)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Success")

return nil
}

func stringsToInts(strs []string) ([]int, error) {
ints := []int{}

Expand Down
Loading

0 comments on commit 28261b1

Please sign in to comment.