Skip to content

Commit

Permalink
apply: Allow ignoring a specific error returned from updatePartitions()
Browse files Browse the repository at this point in the history
When applying topic's configuration, we'd like to be able
avoid failing in case the desired topic's partition count
is smaller than the actual topic's partitions count (on
the broker).
We know that Kafka doesn't allow partitions decrease,
so instead of failing the operation we'd like to
continue without doing anything.
This is very convenient when trying to apply a batch
of topics in which case, we don't want to fail the rest
of the batch for one topic that its partition count
configuration is lower than the actual partitions count.

This change is backward compatible, as the default behavior
is kept.

Signed-off-by: shimon-armis <[email protected]>
  • Loading branch information
shimonturjeman committed Mar 27, 2024
1 parent b83ceba commit 6794218
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
8 changes: 8 additions & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type applyCmdConfig struct {
autoContinueRebalance bool
retentionDropStepDurationStr string
skipConfirm bool
ignoreFewerPartitionsError bool
sleepLoopDuration time.Duration

shared sharedOptions
Expand Down Expand Up @@ -99,6 +100,12 @@ func init() {
false,
"Skip confirmation prompts during apply process",
)
applyCmd.Flags().BoolVar(
&applyConfig.ignoreFewerPartitionsError,
"ignore-fewer-partitions-error",
false,
"Don't return error when topic's config specifies fewer partitions than it currently has",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
Expand Down Expand Up @@ -231,6 +238,7 @@ func applyTopic(
AutoContinueRebalance: applyConfig.autoContinueRebalance,
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
log "github.com/sirupsen/logrus"
)

var ErrFewerPartitions = errors.New("fewer partitions in topic config")

// TopicApplierConfig contains the configuration for a TopicApplier struct.
type TopicApplierConfig struct {
BrokerThrottleMBsOverride int
Expand All @@ -34,6 +36,7 @@ type TopicApplierConfig struct {
AutoContinueRebalance bool
RetentionDropStepDuration time.Duration
SkipConfirm bool
IgnoreFewerPartitionsError bool
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}
Expand Down Expand Up @@ -213,6 +216,11 @@ func (t *TopicApplier) applyExistingTopic(
}

if err := t.updatePartitions(ctx, topicInfo); err != nil {
if errors.Is(err, ErrFewerPartitions) && t.config.IgnoreFewerPartitionsError {
log.Warnf("UpdatePartitions failure ignored. topic: %v, error: %v", t.topicName, err)
return nil
}

return err
}

Expand Down Expand Up @@ -477,7 +485,8 @@ func (t *TopicApplier) updatePartitions(

if currPartitions > t.topicConfig.Spec.Partitions {
return fmt.Errorf(
"Fewer partitions in topic config (%d) than observed (%d); this cannot be resolved by topicctl",
"%w (%d) than observed (%d); this cannot be resolved by topicctl",
ErrFewerPartitions,
t.topicConfig.Spec.Partitions,
currPartitions,
)
Expand Down

0 comments on commit 6794218

Please sign in to comment.