-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring to reduce boilerplate #890
base: ctr-staging
Are you sure you want to change the base?
Conversation
f4a2b69
to
eef91c2
Compare
return (List<ConsumerInterceptor<K, V>>) ClientUtils.interceptors(config, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class); | ||
} | ||
|
||
final static class PartitionComparator implements Comparator<TopicPartition>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: these Comparator
classes were in an internal class named Utils
. I renamed Utils
to ConsumerUtils
and added the utility methods.
@@ -57,13 +57,11 @@ public class CoordinatorRequestManager implements RequestManager { | |||
private long totalDisconnectedMin = 0; | |||
private Node coordinator; | |||
|
|||
public CoordinatorRequestManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: formatting.
@@ -881,18 +832,6 @@ public KafkaConsumer(Map<String, Object> configs, | |||
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); | |||
} | |||
|
|||
private static Metrics buildMetrics(ConsumerConfig config, Time time, String clientId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: moved buildMetrics
to ConsumerUtils.metrics
@@ -62,13 +62,12 @@ public class CommitRequestManager implements RequestManager { | |||
private final boolean throwOnFetchStableOffsetUnsupported; | |||
final PendingRequests pendingRequests; | |||
|
|||
public CommitRequestManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: formatting.
} | ||
|
||
@Override | ||
public void run() { | ||
running = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: moved setting running
to true
from the constructor to the run
method.
@@ -178,46 +169,34 @@ public void run() { | |||
* 3. Poll the networkClient to send and retrieve the response. | |||
*/ | |||
void runOnce() { | |||
drain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: consolidated a lot of small methods into the runOnce
method for clarity.
@@ -179,27 +147,6 @@ public boolean add(final ApplicationEvent event) { | |||
return applicationEventQueue.add(event); | |||
} | |||
|
|||
// bootstrap a metadata object with the bootstrap server IP address, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: moved this code to the constructor and it's now using the utility classes/methods for less verbosity.
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public Deserializers(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: this code came from KafkaConsumer
's constructor. I moved it here to use it in the new consumer too.
@@ -202,16 +202,13 @@ public static class UnsentRequest { | |||
private Optional<Node> node; // empty if random node can be choosen | |||
private Timer timer; | |||
|
|||
public UnsentRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: formatting.
@@ -546,19 +524,6 @@ public ConsumerRecords<K, V> poll(long timeout) { | |||
throw new KafkaException("method not implemented"); | |||
} | |||
|
|||
private static <K, V> ClusterResourceListeners configureClusterResourceListeners( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: moved this to the ClientUtils
class.
@@ -576,23 +541,6 @@ else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) | |||
return newConfigs; | |||
} | |||
|
|||
private static Metrics buildMetrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: this is also now in the ClientUtils
class.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
Show resolved
Hide resolved
DefaultBackgroundThread backgroundThread = mockBackgroundThread(); | ||
backgroundThread.start(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer's note: we need to inject a small wait here since I moved the running
flag from the constructor to the run
method.
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Outdated
Show resolved
Hide resolved
There's a lint check error in some code that I didn't touch ( |
00bf9dd
to
e3dfa9a
Compare
e3dfa9a
to
d0689a1
Compare
|
Overview:
KafkaConsumer
andPrototypeAsyncConsumer
matches as closely as possible