Skip to content

Commit

Permalink
Revert "Allow KafkaRoller talk to controller directly (#10016)" from …
Browse files Browse the repository at this point in the history
…0.45.x release (#10944)

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge authored Dec 12, 2024
1 parent ca44bff commit b506022
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
protected static final String REPLICATION_PORT_NAME = "tcp-replication";
protected static final int KAFKA_AGENT_PORT = 8443;
protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent";
/**
* Port number used for control plane
*/
public static final int CONTROLPLANE_PORT = 9090;
protected static final int CONTROLPLANE_PORT = 9090;
protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
Expand Down Expand Up @@ -160,43 +159,6 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() {
return mock;
}

@Test
public void testTalkingToControllersLatestVersion(VertxTestContext testContext) {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version());
AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers"));

TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps,
noException(), null, noException(), noException(), noException(),
brokerId -> succeededFuture(true),
true, mock, mockKafkaAgentClientProvider(), true, null, -1);

// When admin client cannot be created for a controller node, we expect it to be force restarted.
doSuccessfulRollingRestart(testContext, kafkaRoller,
asList(0),
asList(0));
}

@Test
public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0");

AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers"));

TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps,
noException(), null, noException(), noException(), noException(),
brokerId -> succeededFuture(true),
true, mock, mockKafkaAgentClientProvider(), true, null, -1);

// If the controller has older version (< 3.9.0), we should only be creating admin client for brokers
// and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck
doFailingRollingRestart(testContext, kafkaRoller,
asList(0),
KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts",
emptyList());
}

private static KafkaAgentClientProvider mockKafkaAgentClientProvider() {
return mock(KafkaAgentClientProvider.class);
}
Expand Down Expand Up @@ -835,17 +797,12 @@ public void clearRestarted() {
}

private PodOperator mockPodOps(Function<Integer, Future<Void>> readiness) {
return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version());
}

private PodOperator mockPodOpsWithVersion(Function<Integer, Future<Void>> readiness, String version) {
PodOperator podOps = mock(PodOperator.class);
when(podOps.get(any(), any())).thenAnswer(
invocation -> new PodBuilder()
.withNewMetadata()
.withNamespace(invocation.getArgument(0))
.withName(invocation.getArgument(1))
.addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version)
.withNamespace(invocation.getArgument(0))
.withName(invocation.getArgument(1))
.endMetadata()
.build()
);
Expand Down Expand Up @@ -944,33 +901,9 @@ KafkaAgentClient initKafkaAgentClient() {
}

@Override
protected Admin brokerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
if (delegateAdminClientCall) {
return super.brokerAdminClient(nodes);
}
RuntimeException exception = acOpenException.apply(nodes);
if (exception != null) {
throw new ForceableProblem("An error while try to create the admin client", exception);
}
Admin ac = mock(AdminClient.class, invocation -> {
if ("close".equals(invocation.getMethod().getName())) {
Admin mock = (Admin) invocation.getMock();
unclosedAdminClients.remove(mock);
if (acCloseException != null) {
throw acCloseException;
}
return null;
}
throw new RuntimeException("Not mocked " + invocation.getMethod());
});
unclosedAdminClients.put(ac, new Throwable("Pod " + nodes));
return ac;
}

@Override
protected Admin controllerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
protected Admin adminClient(Set<NodeRef> nodes, boolean b) throws ForceableProblem, FatalProblem {
if (delegateAdminClientCall) {
return super.controllerAdminClient(nodes);
return super.adminClient(nodes, b);
}
RuntimeException exception = acOpenException.apply(nodes);
if (exception != null) {
Expand Down Expand Up @@ -1012,7 +945,7 @@ Future<Boolean> canRoll(int podId) {
}

@Override
protected KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) {
protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
Admin admin = mock(Admin.class);
DescribeMetadataQuorumResult qrmResult = mock(DescribeMetadataQuorumResult.class);
when(admin.describeMetadataQuorum()).thenReturn(qrmResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,20 +616,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return adminClientSupplier.get();
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return adminClientSupplier.get();
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return adminClientSupplier.get();
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return adminClientSupplier.get();
}
};

return new ResourceOperatorSupplier(vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public interface AdminClientProvider {

/**
* Create a Kafka Admin interface instance for brokers
* Create a Kafka Admin interface instance
*
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
Expand All @@ -26,17 +26,7 @@ public interface AdminClientProvider {
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);

/**
* Create a Kafka Admin interface instance for controllers
*
* @param controllerBootstrapHostnames Kafka controller hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @return Instance of Kafka Admin interface
*/
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);

/**
* Create a Kafka Admin interface instance for brokers
* Create a Kafka Admin interface instance
*
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
Expand All @@ -46,16 +36,4 @@ public interface AdminClientProvider {
* @return Instance of Kafka Admin interface
*/
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);

/**
* Create a Kafka Admin interface instance for controllers
*
* @param controllerBootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @param config Additional configuration for the Kafka Admin Client
*
* @return Instance of Kafka Admin interface
*/
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

/**
* Create a Kafka Admin interface instance handling the following different scenarios:
*
Expand All @@ -49,30 +44,26 @@ public Admin createControllerAdminClient(String controllerBootstrapHostnames, Pe
*/
@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
config.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, controllerBootstrapHostnames);
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
return Admin.create(adminClientConfiguration(bootstrapHostnames, kafkaCaTrustSet, authIdentity, config));
}

/**
* Utility method for preparing the Admin client configuration
*
* @param bootstrapHostnames Kafka bootstrap address
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @param config Custom Admin client configuration or empty properties instance
*
* @return Admin client configuration
*/
/* test */ Properties adminClientConfiguration(PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
/* test */ static Properties adminClientConfiguration(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
if (config == null) {
throw new InvalidConfigurationException("The config parameter should not be null");
}

config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);

// configuring TLS encryption if requested
if (kafkaCaTrustSet != null) {
config.putIfAbsent(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL");
Expand Down
Loading

0 comments on commit b506022

Please sign in to comment.