Skip to content

Commit

Permalink
Retrying the import
Browse files Browse the repository at this point in the history
  • Loading branch information
kirktrue committed Apr 24, 2023
1 parent b47b063 commit d0689a1
Show file tree
Hide file tree
Showing 31 changed files with 653 additions and 578 deletions.
121 changes: 121 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -32,9 +37,11 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;

Expand All @@ -44,6 +51,12 @@ public final class ClientUtils {
private ClientUtils() {
}

public static List<InetSocketAddress> parseAndValidateAddresses(AbstractConfig config) {
List<String> urls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
String clientDnsLookupConfig = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG);
return parseAndValidateAddresses(urls, clientDnsLookupConfig);
}

public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookupConfig) {
return parseAndValidateAddresses(urls, ClientDnsLookup.forConfig(clientDnsLookupConfig));
}
Expand Down Expand Up @@ -134,4 +147,112 @@ static List<InetAddress> filterPreferredAddresses(InetAddress[] allAddresses) {
}
return preferredAddresses;
}

public static NetworkClient createNetworkClient(AbstractConfig config,
Metrics metrics,
String metricsGroupPrefix,
LogContext logContext,
ApiVersions apiVersions,
Time time,
int maxInFlightRequestsPerConnection,
Metadata metadata,
Sensor sensor) {
ChannelBuilder channelBuilder = null;
Selector selector = null;

try {
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics,
time,
metricsGroupPrefix,
channelBuilder,
logContext);
return new NetworkClient(selector,
metadata,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
maxInFlightRequestsPerConnection,
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
sensor,
logContext);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new NetworkClient", t);
}
}

public static NetworkClient createNetworkClient(AbstractConfig config,
Metrics metrics,
String metricsGroupPrefix,
LogContext logContext,
ApiVersions apiVersions,
Time time,
int maxInFlightRequestsPerConnection,
MetadataUpdater metadataUpdater,
HostResolver hostResolver,
int requestTimeoutMs) {
ChannelBuilder channelBuilder = null;
Selector selector = null;

try {
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics,
time,
metricsGroupPrefix,
channelBuilder,
logContext);
return new NetworkClient(metadataUpdater,
null,
selector,
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG),
maxInFlightRequestsPerConnection,
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
null,
logContext,
hostResolver);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new NetworkClient", t);
}
}

public static <T> List interceptors(AbstractConfig config,
String interceptorClassesConfigName,
Class<T> clazz) {
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
return config.getConfiguredInstances(
interceptorClassesConfigName,
clazz,
Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
}

public static ClusterResourceListeners configureClusterResourceListeners(List<?>... candidateLists) {
ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();

for (List<?> candidateList: candidateLists)
clusterResourceListeners.maybeAddAll(candidateList);

return clusterResourceListeners;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand Down Expand Up @@ -494,8 +492,6 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
String clientId = generateClientId(config);
ChannelBuilder channelBuilder = null;
Selector selector = null;
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = createLogContext(clientId);

Expand All @@ -505,9 +501,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
Expand All @@ -518,36 +512,21 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
metrics = new Metrics(metricConfig, reporters, time, metricsContext);
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext);
networkClient = new NetworkClient(
metadataManager.updater(),
null,
selector,
clientId,
1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
(int) TimeUnit.HOURS.toMillis(1),
config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
apiVersions,
null,
logContext,
(hostResolver == null) ? new DefaultHostResolver() : hostResolver);
networkClient = ClientUtils.createNetworkClient(config,
metrics,
"admin-client",
logContext,
apiVersions,
time,
1,
metadataManager.updater(),
hostResolver == null ? new DefaultHostResolver() : hostResolver,
(int) TimeUnit.HOURS.toMillis(1));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
closeQuietly(networkClient, "NetworkClient");
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
}
}
Expand Down
Loading

0 comments on commit d0689a1

Please sign in to comment.