Skip to content

Commit

Permalink
perf: schedule in eventloop for batchWrite of endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent 666b4d7 commit 7dd809a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
37 changes: 24 additions & 13 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -388,29 +389,39 @@ private void writeToChannelAndFlush(RedisCommand<?, ?, ?> command) {
}

private void writeToChannelAndFlush(Collection<? extends RedisCommand<?, ?, ?>> commands) {
final Channel chan = this.channel;
final EventLoop eventLoop = chan.eventLoop();

QUEUE_SIZE.addAndGet(this, commands.size());

if (reliability == Reliability.AT_MOST_ONCE) {

// cancel on exceptions and remove from queue, because there is no housekeeping
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(AtMostOnceWriteListener.newInstance(this, command));
}
executeInEventLoop(eventLoop, () -> {
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(chan, command).addListener(AtMostOnceWriteListener.newInstance(this, command));
}
channelFlush(chan);
});
}

if (reliability == Reliability.AT_LEAST_ONCE) {

// commands are ok to stay within the queue, reconnect will retrigger them
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(command).addListener(RetryListener.newInstance(this, command));
}
executeInEventLoop(eventLoop, () -> {
for (RedisCommand<?, ?, ?> command : commands) {
channelWrite(chan, command).addListener(RetryListener.newInstance(this, command));
}
channelFlush(chan);
});
}
}

channelFlush();
private void executeInEventLoop(EventLoop eventLoop, Runnable runnable) {
if (eventLoop.inEventLoop()) {
runnable.run();
} else {
eventLoop.execute(runnable);
}
}

private void channelFlush() {
private void channelFlush(Channel channel) {

if (debugEnabled) {
logger.debug("{} write() channelFlush", logPrefix());
Expand All @@ -419,7 +430,7 @@ private void channelFlush() {
channel.flush();
}

private ChannelFuture channelWrite(RedisCommand<?, ?, ?> command) {
private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() channelWrite command {}", logPrefix(), command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
Expand All @@ -27,6 +31,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand All @@ -46,6 +51,7 @@
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.mockito.stubbing.Answer;

/**
* @author Mark Paluch
Expand All @@ -64,6 +70,9 @@ class DefaultEndpointUnitTests {
@Mock
private Channel channel;

@Mock
private EventLoop eventLoop;

@Mock
private ConnectionFacade connectionFacade;

Expand Down Expand Up @@ -117,6 +126,13 @@ void before() {
return promise;
});

when(channel.eventLoop()).thenReturn(eventLoop);
doAnswer((Answer<Void>) invocation -> {
Runnable runnable = (Runnable) invocation.getArguments()[0];
runnable.run(); // Directly execute the runnable
return null;
}).when(eventLoop).execute(any(Runnable.class));

sut = new DefaultEndpoint(ClientOptions.create(), clientResources);
sut.setConnectionFacade(connectionFacade);
}
Expand Down Expand Up @@ -337,8 +353,6 @@ void retryListenerDoesNotRetryCompletedCommands() {

DefaultEndpoint.RetryListener listener = DefaultEndpoint.RetryListener.newInstance(sut, command);

when(channel.eventLoop()).thenReturn(mock(EventLoop.class));

command.complete();
promise.tryFailure(new Exception());

Expand Down

0 comments on commit 7dd809a

Please sign in to comment.