diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 14b22d80..16f1c09c 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -36,6 +36,7 @@ type applyCmdConfig struct { retentionDropStepDurationStr string skipConfirm bool sleepLoopDuration time.Duration + failFast bool shared sharedOptions @@ -105,6 +106,12 @@ func init() { 10*time.Second, "Amount of time to wait between partition checks", ) + applyCmd.Flags().BoolVar( + &applyConfig.failFast, + "fail-fast", + true, + "Fail upon the first error encountered during apply process", + ) addSharedConfigOnlyFlags(applyCmd, &applyConfig.shared) RootCmd.AddCommand(applyCmd) @@ -125,6 +132,14 @@ func applyPreRun(cmd *cobra.Command, args []string) error { return nil } +func appendError(aggregatedErr error, err error) error { + if aggregatedErr == nil { + return err + } + + return fmt.Errorf("%v\n%v", aggregatedErr, err) +} + func applyRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -138,6 +153,8 @@ func applyRun(cmd *cobra.Command, args []string) error { // Keep a cache of the admin clients with the cluster config path as the key adminClients := map[string]admin.Client{} + // Keep track of any errors that occur during the apply process + var errs error defer func() { for _, adminClient := range adminClients { @@ -160,7 +177,10 @@ func applyRun(cmd *cobra.Command, args []string) error { for _, match := range matches { matchCount++ if err := applyTopic(ctx, match, adminClients); err != nil { - return err + if applyConfig.failFast { + return err + } + errs = appendError(errs, err) } } } @@ -169,7 +189,7 @@ func applyRun(cmd *cobra.Command, args []string) error { return fmt.Errorf("No topic configs match the provided args (%+v)", args) } - return nil + return errs } func applyTopic(