From 1ee5499a70254f2afa270ecb0b872d75a9e6945f Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 6 Feb 2024 18:48:35 -0800 Subject: [PATCH] get action controllerid, clusterid and tests --- cmd/topicctl/subcmd/get.go | 26 +++++++++++++++++-- pkg/admin/brokerclient_test.go | 31 ++++++++++++++++++++++ pkg/admin/format.go | 31 +++++++++++++++++++++- pkg/admin/zkclient_test.go | 47 ++++++++++++++++++++++++++++++++++ pkg/cli/cli.go | 14 ++++++++++ 5 files changed, 146 insertions(+), 3 deletions(-) diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index cb6d9638..a3d86252 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -61,6 +61,7 @@ func init() { balanceCmd(), brokersCmd(), controllerCmd(), + clusterIDCmd(), configCmd(), groupsCmd(), lagsCmd(), @@ -137,8 +138,8 @@ func brokersCmd() *cobra.Command { func controllerCmd() *cobra.Command { return &cobra.Command{ - Use: "controller", - Short: "Displays active controller broker ID.", + Use: "controllerid", + Short: "Displays active controller broker id.", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() @@ -156,6 +157,27 @@ func controllerCmd() *cobra.Command { } } +func clusterIDCmd() *cobra.Command { + return &cobra.Command{ + Use: "clusterid", + Short: "Displays cluster id.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetClusterID(ctx, getConfig.full) + }, + } +} + func configCmd() *cobra.Command { return &cobra.Command{ Use: "config [broker or topic]", diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 763110b6..8691195a 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -13,6 +13,37 @@ import ( "github.com/stretchr/testify/require" ) +func TestBrokerClientControllerID(t *testing.T) { + if !util.CanTestBrokerAdmin() { + t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") + } + + ctx := context.Background() + client, err := NewBrokerAdminClient( + ctx, + BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + BrokerAddr: util.TestKafkaAddr(), + }, + }, + ) + require.NoError(t, err) + + brokerIDs, err := client.GetBrokerIDs(ctx) + require.NoError(t, err) + assert.Equal( + t, + []int{1, 2, 3, 4, 5, 6}, + brokerIDs, + ) + + controllerID, err := client.GetControllerID(ctx) + require.NoError(t, err) + assert.Condition(t, func() bool { + return controllerID >= 1 && controllerID <= 6 + }, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID)) +} + func TestBrokerClientGetClusterID(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/format.go b/pkg/admin/format.go index cbe424b8..8d2263f9 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -108,7 +108,7 @@ func FormatBrokers(brokers []BrokerInfo, full bool) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } -// FormatControllerID creates a pretty table from a list of brokers. +// FormatControllerID creates a pretty table for controller broker. func FormatControllerID(brokerID int) string { buf := &bytes.Buffer{} table := tablewriter.NewWriter(buf) @@ -137,6 +137,35 @@ func FormatControllerID(brokerID int) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatClusterID creates a pretty table for cluster ID. +func FormatClusterID(clusterID string) string { + buf := &bytes.Buffer{} + table := tablewriter.NewWriter(buf) + headers := []string{"Kafka Cluster ID"} + table.SetHeader(headers) + + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + table.Append([]string{ + clusterID, + }) + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + // FormatBrokerReplicas creates a pretty table that shows how many replicas are in each // position (i.e., leader, second, third) by broker across all topics. Useful for showing // total-topic balance. diff --git a/pkg/admin/zkclient_test.go b/pkg/admin/zkclient_test.go index 3301a737..44e80cd1 100644 --- a/pkg/admin/zkclient_test.go +++ b/pkg/admin/zkclient_test.go @@ -15,6 +15,53 @@ import ( "github.com/stretchr/testify/require" ) +func TestZkClientControllerID(t *testing.T) { + zkConn, _, err := szk.Connect( + []string{util.TestZKAddr()}, + 5*time.Second, + ) + require.NoError(t, err) + require.NotNil(t, zkConn) + defer zkConn.Close() + + clusterName := testClusterID("clusterID") + zk.CreateNodes( + t, + zkConn, + []zk.PathTuple{ + { + Path: fmt.Sprintf("/%s", clusterName), + Obj: nil, + }, + { + Path: fmt.Sprintf("/%s/controller", clusterName), + Obj: map[string]interface{}{ + "version": 1, + "brokerid": 3, + "timestamp": "1589603217000", + }, + }, + }, + ) + + ctx := context.Background() + adminClient, err := NewZKAdminClient( + ctx, + ZKAdminClientConfig{ + ZKAddrs: []string{util.TestZKAddr()}, + ZKPrefix: clusterName, + BootstrapAddrs: []string{util.TestKafkaAddr()}, + ReadOnly: true, + }, + ) + require.NoError(t, err) + defer adminClient.Close() + + controllerID, err := adminClient.GetControllerID(ctx) + assert.NoError(t, err) + assert.Equal(t, 3, controllerID) +} + func TestZkClientGetClusterID(t *testing.T) { zkConn, _, err := szk.Connect( []string{util.TestZKAddr()}, diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index f4a1def9..19b41067 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -93,6 +93,20 @@ func (c *CLIRunner) GetControllerID(ctx context.Context, full bool) error { return nil } +// Get cluster ID +func (c *CLIRunner) GetClusterID(ctx context.Context, full bool) error { + c.startSpinner() + + clusterID, err := c.adminClient.GetClusterID(ctx) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Cluster ID:\n%s", admin.FormatClusterID(clusterID)) + return nil +} + // ApplyTopic does an apply run according to the spec in the argument config. func (c *CLIRunner) ApplyTopic( ctx context.Context,