Skip to content

Commit

Permalink
get action controllerid, clusterid and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ssingudasu committed Feb 7, 2024
1 parent ca396d7 commit 1ee5499
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 3 deletions.
26 changes: 24 additions & 2 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func init() {
balanceCmd(),
brokersCmd(),
controllerCmd(),
clusterIDCmd(),
configCmd(),
groupsCmd(),
lagsCmd(),
Expand Down Expand Up @@ -137,8 +138,8 @@ func brokersCmd() *cobra.Command {

func controllerCmd() *cobra.Command {
return &cobra.Command{
Use: "controller",
Short: "Displays active controller broker ID.",
Use: "controllerid",
Short: "Displays active controller broker id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
Expand All @@ -156,6 +157,27 @@ func controllerCmd() *cobra.Command {
}
}

func clusterIDCmd() *cobra.Command {
return &cobra.Command{
Use: "clusterid",
Short: "Displays cluster id.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetClusterID(ctx, getConfig.full)
},
}
}

func configCmd() *cobra.Command {
return &cobra.Command{
Use: "config [broker or topic]",
Expand Down
31 changes: 31 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@ import (
"github.com/stretchr/testify/require"
)

func TestBrokerClientControllerID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

brokerIDs, err := client.GetBrokerIDs(ctx)
require.NoError(t, err)
assert.Equal(
t,
[]int{1, 2, 3, 4, 5, 6},
brokerIDs,
)

controllerID, err := client.GetControllerID(ctx)
require.NoError(t, err)
assert.Condition(t, func() bool {
return controllerID >= 1 && controllerID <= 6
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
}

func TestBrokerClientGetClusterID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
31 changes: 30 additions & 1 deletion pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func FormatBrokers(brokers []BrokerInfo, full bool) string {
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatControllerID creates a pretty table from a list of brokers.
// FormatControllerID creates a pretty table for controller broker.
func FormatControllerID(brokerID int) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
Expand Down Expand Up @@ -137,6 +137,35 @@ func FormatControllerID(brokerID int) string {
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatClusterID creates a pretty table for cluster ID.
func FormatClusterID(clusterID string) string {
buf := &bytes.Buffer{}
table := tablewriter.NewWriter(buf)
headers := []string{"Kafka Cluster ID"}
table.SetHeader(headers)

table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

table.Append([]string{
clusterID,
})

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatBrokerReplicas creates a pretty table that shows how many replicas are in each
// position (i.e., leader, second, third) by broker across all topics. Useful for showing
// total-topic balance.
Expand Down
47 changes: 47 additions & 0 deletions pkg/admin/zkclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,53 @@ import (
"github.com/stretchr/testify/require"
)

func TestZkClientControllerID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
5*time.Second,
)
require.NoError(t, err)
require.NotNil(t, zkConn)
defer zkConn.Close()

clusterName := testClusterID("clusterID")
zk.CreateNodes(
t,
zkConn,
[]zk.PathTuple{
{
Path: fmt.Sprintf("/%s", clusterName),
Obj: nil,
},
{
Path: fmt.Sprintf("/%s/controller", clusterName),
Obj: map[string]interface{}{
"version": 1,
"brokerid": 3,
"timestamp": "1589603217000",
},
},
},
)

ctx := context.Background()
adminClient, err := NewZKAdminClient(
ctx,
ZKAdminClientConfig{
ZKAddrs: []string{util.TestZKAddr()},
ZKPrefix: clusterName,
BootstrapAddrs: []string{util.TestKafkaAddr()},
ReadOnly: true,
},
)
require.NoError(t, err)
defer adminClient.Close()

controllerID, err := adminClient.GetControllerID(ctx)
assert.NoError(t, err)
assert.Equal(t, 3, controllerID)
}

func TestZkClientGetClusterID(t *testing.T) {
zkConn, _, err := szk.Connect(
[]string{util.TestZKAddr()},
Expand Down
14 changes: 14 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ func (c *CLIRunner) GetControllerID(ctx context.Context, full bool) error {
return nil
}

// Get cluster ID
func (c *CLIRunner) GetClusterID(ctx context.Context, full bool) error {
c.startSpinner()

clusterID, err := c.adminClient.GetClusterID(ctx)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Cluster ID:\n%s", admin.FormatClusterID(clusterID))
return nil
}

// ApplyTopic does an apply run according to the spec in the argument config.
func (c *CLIRunner) ApplyTopic(
ctx context.Context,
Expand Down

0 comments on commit 1ee5499

Please sign in to comment.