From b5060222ffc07b67b7efe003bd326e48b2896fc8 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com> Date: Thu, 12 Dec 2024 22:58:39 +0000 Subject: [PATCH] Revert "Allow KafkaRoller talk to controller directly (#10016)" from 0.45.x release (#10944) Signed-off-by: Gantigmaa Selenge --- .../operator/cluster/model/KafkaCluster.java | 5 +- .../operator/resource/KafkaRoller.java | 129 ++++++++---------- .../operator/cluster/ResourceUtils.java | 10 -- .../operator/resource/KafkaRollerTest.java | 77 +---------- .../KubernetesRestartEventsMockTest.java | 10 -- .../operator/common/AdminClientProvider.java | 26 +--- .../common/DefaultAdminClientProvider.java | 19 +-- .../DefaultAdminClientProviderTest.java | 64 ++------- 8 files changed, 83 insertions(+), 257 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 377692d8633..bd33a45b9f6 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -124,10 +124,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp protected static final String REPLICATION_PORT_NAME = "tcp-replication"; protected static final int KAFKA_AGENT_PORT = 8443; protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent"; - /** - * Port number used for control plane - */ - public static final int CONTROLPLANE_PORT = 9090; + protected static final int CONTROLPLANE_PORT = 9090; protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index 8356e07384e..4134db4c8e1 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -20,7 +20,6 @@ import io.strimzi.operator.cluster.operator.resource.events.KubernetesRestartEventPublisher; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; import io.strimzi.operator.common.AdminClientProvider; -import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.BackOff; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationLogger; @@ -36,8 +35,10 @@ import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.SslAuthenticationException; @@ -110,6 +111,7 @@ public class KafkaRoller { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRoller.class); private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME = "controller.quorum.fetch.timeout.ms"; private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT = "2000"; + private final PodOperator podOperations; private final long pollingIntervalMs; protected final long operationTimeoutMs; @@ -196,7 +198,7 @@ public KafkaRoller(Reconciliation reconciliation, Vertx vertx, PodOperator podOp private boolean maybeInitBrokerAdminClient() { if (this.brokerAdminClient == null) { try { - this.brokerAdminClient = brokerAdminClient(nodes); + this.brokerAdminClient = adminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()), false); } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create brokerAdminClient.", e); return false; @@ -209,19 +211,16 @@ private boolean maybeInitBrokerAdminClient() { * Initializes controllerAdminClient if it has not been initialized yet * @return true if the creation of AC succeeded, false otherwise */ - private boolean maybeInitControllerAdminClient(String currentVersion) { + private boolean maybeInitControllerAdminClient() { if (this.controllerAdminClient == null) { - // Prior to 3.9.0, Kafka did not support directly connecting to controllers nodes - // via Kafka Admin API when running in KRaft mode. - // Therefore, use brokers to initialise adminClient for quorum health check - // when the version is older than 3.9.0. try { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - this.controllerAdminClient = controllerAdminClient(nodes); - } else { - this.controllerAdminClient = brokerAdminClient(Set.of()); - - } + // TODO: Currently, when running in KRaft mode Kafka does not support using Kafka Admin API with controller + // nodes. This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9692. + // Therefore use broker nodes of the cluster to initialise adminClient for quorum health check. + // Once Kafka Admin API is supported for controllers, nodes.stream().filter(NodeRef:controller) + // can be used here. Until then pass an empty set of nodes so the client is initialized with + // the brokers service. + this.controllerAdminClient = adminClient(Set.of(), false); } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e); return false; @@ -455,11 +454,9 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext) // change and the desired roles still apply. boolean isBroker = Labels.booleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL, nodeRef.broker()); boolean isController = Labels.booleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL, nodeRef.controller()); - // This is relevant when creating admin client for controllers - String currentVersion = Annotations.stringAnnotation(pod, KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, "0.0.0", null); try { - checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext, currentVersion); + checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext); if (restartContext.forceRestart) { LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef); restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext); @@ -589,7 +586,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) { * Determine whether the pod should be restarted, or the broker reconfigured. */ @SuppressWarnings("checkstyle:CyclomaticComplexity") - private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext, String currentVersion) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { + private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem { RestartReasons reasonToRestartPod = restartContext.restartReasons; if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) { // If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference @@ -610,13 +607,35 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null; boolean needsReconfig = false; - // if it is a pure controller, initialise the admin client specifically for controllers - if (isController && !isBroker) { - if (!maybeInitControllerAdminClient(currentVersion)) { - handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod, currentVersion); - return; + if (isController) { + if (maybeInitControllerAdminClient()) { + String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT; + String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId()); + + if (desiredConfig != null) { + OrderedProperties orderedProperties = new OrderedProperties(); + controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT); + } + + restartContext.quorumCheck = quorumCheck(controllerAdminClient, Long.parseLong(controllerQuorumFetchTimeout)); + } else { + //TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is complete + // we should change this logic to immediately restart this pod because we cannot connect to it. + if (isBroker) { + // If it is a combined node (controller and broker) and the admin client cannot be initialised, + // restart this pod. There is no reason to continue as we won't be able to + // connect an admin client to this pod for other checks later. + LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " + + "Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef); + reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); + markRestartContextWithForceRestart(restartContext); + return; + } else { + // If it is a controller only node throw an UnforceableProblem, so we try again until the backOff + // is finished, then it will move on to the next controller and eventually the brokers. + throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); + } } - restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef); } if (isBroker) { @@ -627,11 +646,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont return; } - // If it is a mixed node, initialise quorum check with the broker admin client - if (isController) { - restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef); - } - // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can // connect to the broker and that it's capable of responding. Config brokerConfig; @@ -677,21 +691,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont restartContext.brokerLoggingDiff = brokerLoggingDiff; } - private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod, String currentVersion) throws UnforceableProblem { - if (KafkaVersion.compareDottedVersions(currentVersion, "3.9.0") >= 0) { - // If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised. - // There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later. - LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the controllers do not seem to responding to connection attempts.", nodeRef); - reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); - markRestartContextWithForceRestart(restartContext); - } else { - // If the version does not support talking to controllers, the admin client should be connecting to the broker nodes. - // Since connection to the brokers failed, throw an UnforceableProblem so that broker nodes can be checked later - // which may potentially resolve the connection issue. - throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); - } - } - /** * Returns a config of the given broker. * @param nodeRef The reference of the broker. @@ -906,48 +905,34 @@ protected Future restart(Pod pod, RestartContext restartContext) { * Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an * empty set, use the brokers service to bootstrap the client. */ - /* test */ Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { - // If no nodes are passed, initialize the admin client using the bootstrap service - // This is still needed for versions older than 3.9.0, so that when only controller nodes being rolled, - // it can use brokers to get quorum information via AdminClient. + /* test */ Admin adminClient(Set nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem { + // If no nodes are passed initialize the admin client using the brokers service + // TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is completed review whether + // this function can be reverted to expect nodes to be non empty String bootstrapHostnames; if (nodes.isEmpty()) { bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT); } else { - bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); + bootstrapHostnames = nodes.stream().map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(",")); } try { LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity()); + } catch (KafkaException e) { + if (ceShouldBeFatal && (e instanceof ConfigException + || e.getCause() instanceof ConfigException)) { + throw new FatalProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); + } else { + throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); + } } catch (RuntimeException e) { throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); } } - /** - * Returns an AdminClient instance bootstrapped from the given controller nodes. - */ - /* test */ Admin controllerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { - String bootstrapHostnames = nodes.stream().filter(NodeRef::controller).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.CONTROLPLANE_PORT).collect(Collectors.joining(",")); - - try { - LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); - return adminClientProvider.createControllerAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity()); - } catch (RuntimeException e) { - throw new ForceableProblem("An error while try to create an admin client with bootstrap controllers " + bootstrapHostnames, e); - } - } - - /* test */ KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) { - String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT; - String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId()); - - if (desiredConfig != null) { - OrderedProperties orderedProperties = new OrderedProperties(); - controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT); - } - return new KafkaQuorumCheck(reconciliation, ac, vertx, Long.parseLong(controllerQuorumFetchTimeout)); + /* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { + return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs); } /* test */ KafkaAvailability availability(Admin ac) { @@ -990,7 +975,7 @@ int controller(NodeRef nodeRef, long timeout, TimeUnit unit, RestartContext rest // This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9373. // Use admin client connected directly to this broker here, then any exception or timeout trying to connect to // the current node will be caught and handled from this method, rather than appearing elsewhere. - try (Admin ac = brokerAdminClient(Set.of(nodeRef))) { + try (Admin ac = adminClient(Set.of(nodeRef), false)) { Node controllerNode = null; try { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java index d0ec62d0f7e..6de9e54ab88 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java @@ -518,20 +518,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); } - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { - return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); - } - @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { return mockAdminClient; } - - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { - return mockAdminClient; - } }; } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index 4dee8386c23..b613225d69d 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -9,7 +9,6 @@ import io.fabric8.kubernetes.client.KubernetesClientException; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.operator.cluster.KafkaVersionTestUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.NodeRef; import io.strimzi.operator.cluster.model.RestartReason; import io.strimzi.operator.cluster.model.RestartReasons; @@ -160,43 +159,6 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() { return mock; } - @Test - public void testTalkingToControllersLatestVersion(VertxTestContext testContext) { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version()); - AdminClientProvider mock = mock(AdminClientProvider.class); - when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers")); - - TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps, - noException(), null, noException(), noException(), noException(), - brokerId -> succeededFuture(true), - true, mock, mockKafkaAgentClientProvider(), true, null, -1); - - // When admin client cannot be created for a controller node, we expect it to be force restarted. - doSuccessfulRollingRestart(testContext, kafkaRoller, - asList(0), - asList(0)); - } - - @Test - public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException { - PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0"); - - AdminClientProvider mock = mock(AdminClientProvider.class); - when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers")); - - TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps, - noException(), null, noException(), noException(), noException(), - brokerId -> succeededFuture(true), - true, mock, mockKafkaAgentClientProvider(), true, null, -1); - - // If the controller has older version (< 3.9.0), we should only be creating admin client for brokers - // and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck - doFailingRollingRestart(testContext, kafkaRoller, - asList(0), - KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts", - emptyList()); - } - private static KafkaAgentClientProvider mockKafkaAgentClientProvider() { return mock(KafkaAgentClientProvider.class); } @@ -835,17 +797,12 @@ public void clearRestarted() { } private PodOperator mockPodOps(Function> readiness) { - return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version()); - } - - private PodOperator mockPodOpsWithVersion(Function> readiness, String version) { PodOperator podOps = mock(PodOperator.class); when(podOps.get(any(), any())).thenAnswer( invocation -> new PodBuilder() .withNewMetadata() - .withNamespace(invocation.getArgument(0)) - .withName(invocation.getArgument(1)) - .addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version) + .withNamespace(invocation.getArgument(0)) + .withName(invocation.getArgument(1)) .endMetadata() .build() ); @@ -944,33 +901,9 @@ KafkaAgentClient initKafkaAgentClient() { } @Override - protected Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { - if (delegateAdminClientCall) { - return super.brokerAdminClient(nodes); - } - RuntimeException exception = acOpenException.apply(nodes); - if (exception != null) { - throw new ForceableProblem("An error while try to create the admin client", exception); - } - Admin ac = mock(AdminClient.class, invocation -> { - if ("close".equals(invocation.getMethod().getName())) { - Admin mock = (Admin) invocation.getMock(); - unclosedAdminClients.remove(mock); - if (acCloseException != null) { - throw acCloseException; - } - return null; - } - throw new RuntimeException("Not mocked " + invocation.getMethod()); - }); - unclosedAdminClients.put(ac, new Throwable("Pod " + nodes)); - return ac; - } - - @Override - protected Admin controllerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { + protected Admin adminClient(Set nodes, boolean b) throws ForceableProblem, FatalProblem { if (delegateAdminClientCall) { - return super.controllerAdminClient(nodes); + return super.adminClient(nodes, b); } RuntimeException exception = acOpenException.apply(nodes); if (exception != null) { @@ -1012,7 +945,7 @@ Future canRoll(int podId) { } @Override - protected KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) { + protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { Admin admin = mock(Admin.class); DescribeMetadataQuorumResult qrmResult = mock(DescribeMetadataQuorumResult.class); when(admin.describeMetadataQuorum()).thenReturn(qrmResult); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java index 5797cf889f8..269dcb62d14 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java @@ -616,20 +616,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return adminClientSupplier.get(); } - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { - return adminClientSupplier.get(); - } - @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { return adminClientSupplier.get(); } - - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { - return adminClientSupplier.get(); - } }; return new ResourceOperatorSupplier(vertx, diff --git a/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java b/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java index ebc879776bc..6765304f094 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java @@ -16,7 +16,7 @@ public interface AdminClientProvider { /** - * Create a Kafka Admin interface instance for brokers + * Create a Kafka Admin interface instance * * @param bootstrapHostnames Kafka hostname to connect to for administration operations * @param kafkaCaTrustSet Trust set for connecting to Kafka @@ -26,17 +26,7 @@ public interface AdminClientProvider { Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity); /** - * Create a Kafka Admin interface instance for controllers - * - * @param controllerBootstrapHostnames Kafka controller hostname to connect to for administration operations - * @param kafkaCaTrustSet Trust set for connecting to Kafka - * @param authIdentity Identity for TLS client authentication for connecting to Kafka - * @return Instance of Kafka Admin interface - */ - Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity); - - /** - * Create a Kafka Admin interface instance for brokers + * Create a Kafka Admin interface instance * * @param bootstrapHostnames Kafka hostname to connect to for administration operations * @param kafkaCaTrustSet Trust set for connecting to Kafka @@ -46,16 +36,4 @@ public interface AdminClientProvider { * @return Instance of Kafka Admin interface */ Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config); - - /** - * Create a Kafka Admin interface instance for controllers - * - * @param controllerBootstrapHostnames Kafka hostname to connect to for administration operations - * @param kafkaCaTrustSet Trust set for connecting to Kafka - * @param authIdentity Identity for TLS client authentication for connecting to Kafka - * @param config Additional configuration for the Kafka Admin Client - * - * @return Instance of Kafka Admin interface - */ - Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config); } diff --git a/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java b/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java index 99de4c4f7c4..bf9fd43a8b9 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java @@ -21,11 +21,6 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); } - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { - return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); - } - /** * Create a Kafka Admin interface instance handling the following different scenarios: * @@ -49,30 +44,26 @@ public Admin createControllerAdminClient(String controllerBootstrapHostnames, Pe */ @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { - config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames); - return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config)); - } - - @Override - public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { - config.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, controllerBootstrapHostnames); - return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config)); + return Admin.create(adminClientConfiguration(bootstrapHostnames, kafkaCaTrustSet, authIdentity, config)); } /** * Utility method for preparing the Admin client configuration * + * @param bootstrapHostnames Kafka bootstrap address * @param kafkaCaTrustSet Trust set for connecting to Kafka * @param authIdentity Identity for TLS client authentication for connecting to Kafka * @param config Custom Admin client configuration or empty properties instance * * @return Admin client configuration */ - /* test */ Properties adminClientConfiguration(PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { + /* test */ static Properties adminClientConfiguration(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { if (config == null) { throw new InvalidConfigurationException("The config parameter should not be null"); } + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames); + // configuring TLS encryption if requested if (kafkaCaTrustSet != null) { config.putIfAbsent(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); diff --git a/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java b/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java index a73f312c2a3..8ce181d9d85 100644 --- a/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java +++ b/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java @@ -9,7 +9,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.config.SslConfigs; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; import java.util.Properties; @@ -17,10 +16,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class DefaultAdminClientProviderTest { @@ -30,6 +26,7 @@ public class DefaultAdminClientProviderTest { private static final String USER_KEY = "user-key"; private void assertDefaultConfigs(Properties config) { + assertThat(config.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), is("my-kafka:9092")); assertThat(config.get(AdminClientConfig.METADATA_MAX_AGE_CONFIG), is("30000")); assertThat(config.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), is("10000")); assertThat(config.get(AdminClientConfig.RETRIES_CONFIG), is("3")); @@ -38,10 +35,9 @@ private void assertDefaultConfigs(Properties config) { @Test public void testPlainConnection() { - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - Properties config = defaultAdminClientProvider.adminClientConfiguration(null, null, new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, null, new Properties()); - assertThat(config.size(), is(4)); + assertThat(config.size(), is(5)); assertDefaultConfigs(config); } @@ -51,10 +47,10 @@ public void testCustomConfig() { customConfig.setProperty(AdminClientConfig.RETRIES_CONFIG, "5"); // Override a value we have default for customConfig.setProperty(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "13000"); // Override a value we do not use - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - Properties config = defaultAdminClientProvider.adminClientConfiguration(null, null, customConfig); + Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, null, customConfig); - assertThat(config.size(), is(5)); + assertThat(config.size(), is(6)); + assertThat(config.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), is("my-kafka:9092")); assertThat(config.get(AdminClientConfig.METADATA_MAX_AGE_CONFIG), is("30000")); assertThat(config.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), is("10000")); assertThat(config.get(AdminClientConfig.RETRIES_CONFIG), is("5")); @@ -64,10 +60,9 @@ public void testCustomConfig() { @Test public void testTlsConnection() { - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - Properties config = defaultAdminClientProvider.adminClientConfiguration(mockPemTrustSet(), null, new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", mockPemTrustSet(), null, new Properties()); - assertThat(config.size(), is(7)); + assertThat(config.size(), is(8)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), is("PEM")); @@ -77,10 +72,9 @@ public void testTlsConnection() { @Test public void testMTlsConnection() { - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - Properties config = defaultAdminClientProvider.adminClientConfiguration(mockPemTrustSet(), mockPemAuthIdentity(), new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", mockPemTrustSet(), mockPemAuthIdentity(), new Properties()); - assertThat(config.size(), is(10)); + assertThat(config.size(), is(11)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), is("PEM")); @@ -93,10 +87,9 @@ public void testMTlsConnection() { @Test public void testMTlsWithPublicCAConnection() { - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - Properties config = defaultAdminClientProvider.adminClientConfiguration(null, mockPemAuthIdentity(), new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, mockPemAuthIdentity(), new Properties()); - assertThat(config.size(), is(8)); + assertThat(config.size(), is(9)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG).toString(), is("PEM")); @@ -106,41 +99,10 @@ public void testMTlsWithPublicCAConnection() { @Test public void testNullConfig() { - DefaultAdminClientProvider defaultAdminClientProvider = new DefaultAdminClientProvider(); - InvalidConfigurationException ex = assertThrows(InvalidConfigurationException.class, () -> defaultAdminClientProvider.adminClientConfiguration(null, mockPemAuthIdentity(), null)); + InvalidConfigurationException ex = assertThrows(InvalidConfigurationException.class, () -> DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, mockPemAuthIdentity(), null)); assertThat(ex.getMessage(), is("The config parameter should not be null")); } - @Test - public void tesCreateControllerAdminClientConfig() { - DefaultAdminClientProvider defaultAdminClientProvider = spy(DefaultAdminClientProvider.class); - // We expect a failure from creating an actual admin client since the bootstrap is not real - assertThrows(RuntimeException.class, () -> defaultAdminClientProvider.createControllerAdminClient("my-kafka-controller:9090", null, null)); - - ArgumentCaptor configsCapture = ArgumentCaptor.forClass(Properties.class); - verify(defaultAdminClientProvider).adminClientConfiguration(eq(null), eq(null), configsCapture.capture()); - Properties configs = configsCapture.getValue(); - - assertThat(configs.size(), is(5)); - assertThat(configs.getProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG), is("my-kafka-controller:9090")); - assertDefaultConfigs(configs); - } - - @Test - public void testCreateBrokerAdminClient() { - DefaultAdminClientProvider defaultAdminClientProvider = spy(DefaultAdminClientProvider.class); - // We expect a failure from creating an actual admin client since the bootstrap is not real - assertThrows(RuntimeException.class, () -> defaultAdminClientProvider.createAdminClient("my-kafka-broker:9092", null, null)); - - ArgumentCaptor configsCapture = ArgumentCaptor.forClass(Properties.class); - verify(defaultAdminClientProvider).adminClientConfiguration(eq(null), eq(null), configsCapture.capture()); - Properties configs = configsCapture.getValue(); - - assertThat(configs.size(), is(5)); - assertThat(configs.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), is("my-kafka-broker:9092")); - assertDefaultConfigs(configs); - } - public static PemTrustSet mockPemTrustSet() { PemTrustSet mockTrustSet = mock(PemTrustSet.class); when(mockTrustSet.trustedCertificatesString()).thenReturn(String.format("%s%n%s", CA1, CA2));