Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle producer fenced exceptions #6

Open
wants to merge 3 commits into
base: lh-trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
private void failBatch(ProducerBatch batch,
ProduceResponse.PartitionResponse response,
boolean adjustSequenceNumbers) {
log.info("Failing batch " + batch);
final RuntimeException topLevelException;
if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.TaskId;

import java.util.Set;

public class StreamsProducerException extends StreamsException {
private final Set<TaskId> taskIds;

public StreamsProducerException(final String message, final Set<TaskId> taskIds, final Throwable e) {
super(message, e);
this.taskIds = taskIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.apache.kafka.streams.errors;


import org.apache.kafka.streams.processor.TaskId;

import java.util.Set;

/**
* Indicates that all tasks belongs to the thread have migrated to another thread. This exception can be thrown when
* the thread gets fenced (either by the consumer coordinator or by the transaction coordinator), which means it is
Expand All @@ -25,12 +29,24 @@
public class TaskMigratedException extends StreamsException {

private static final long serialVersionUID = 1L;
private final Set<TaskId> migratedTasks;

public TaskMigratedException(final String message) {
super(message + "; it means all tasks belonging to this thread should be migrated.");
migratedTasks = Set.of();
}

public TaskMigratedException(final String message, final Throwable throwable) {
super(message + "; it means all tasks belonging to this thread should be migrated.", throwable);
migratedTasks = Set.of();
}

public TaskMigratedException(final String message, final Throwable throwable, final Set<TaskId> migratedTasks) {
super(message + "; it means all tasks belonging to this thread should be migrated.", throwable);
this.migratedTasks = migratedTasks;
}

public Set<TaskId> migratedTasks() {
return migratedTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public <K, V> void send(final String topic,
} catch (final RuntimeException fatal) {
sendException.set(new StreamsException("Producer.send `Callback` failed", fatal));
}
});
}, taskId);
}

private <K, V> void handleException(final ProductionExceptionHandler.SerializationExceptionOrigin origin,
Expand Down Expand Up @@ -428,9 +428,10 @@ private void recordSendError(final String topic,
productionException instanceof InvalidPidMappingException ||
productionException instanceof InvalidProducerEpochException ||
productionException instanceof OutOfOrderSequenceException) {
log.info("Task received a ProducerFencedException = " + taskId);
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
"indicating the task may be migrated out";
sendException.set(new TaskMigratedException(errorMessage, productionException));
sendException.set(new TaskMigratedException(errorMessage, productionException, streamsProducer.transactionInFlight().associatedPartitions));
} else {
final ProductionExceptionHandlerResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ private void handleTaskMigrated(final TaskMigratedException e) {
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);

taskManager.handleLostAll();
taskManager.handleLost(e.migratedTasks());
mainConsumer.unsubscribe();
subscribeConsumer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsProducerException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;

import org.slf4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -68,7 +72,7 @@ public class StreamsProducer {
private final Time time;

private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private final Transaction transactionInFlight = new Transaction();
private boolean transactionInitialized = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
Expand All @@ -94,7 +98,7 @@ boolean eosEnabled() {
return processingMode == EXACTLY_ONCE_V2;
}

boolean transactionInFlight() {
Transaction transactionInFlight() {
return transactionInFlight;
}

Expand Down Expand Up @@ -183,10 +187,10 @@ public double totalBlockedTime() {
}

private void maybeBeginTransaction() {
if (eosEnabled() && !transactionInFlight) {
if (eosEnabled() && !transactionInFlight.inflight) {
try {
producer.beginTransaction();
transactionInFlight = true;
transactionInFlight.init();
} catch (final ProducerFencedException | InvalidProducerEpochException | InvalidPidMappingException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to begin a new transaction"),
Expand All @@ -206,18 +210,21 @@ AtomicReference<KafkaException> sendException() {
}

Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record,
final Callback callback) {
final Callback callback, final TaskId taskId) {
maybeBeginTransaction();
try {
if (record.partition() != null) {
transactionInFlight.maybeAddChangelogPartition(record, taskId);
}
return producer.send(record, callback);
} catch (final KafkaException uncaughtException) {
if (isRecoverable(uncaughtException)) {
// producer.send() call may throw a KafkaException which wraps a FencedException,
// in this case we should throw its wrapped inner cause so that it can be
// captured and re-wrapped as TaskMigratedException
throw new TaskMigratedException(
throw new StreamsProducerException(
formatException("Producer got fenced trying to send a record"),
uncaughtException.getCause()
transactionInFlight.associatedPartitions, uncaughtException.getCause()
);
} else {
throw new StreamsException(
Expand Down Expand Up @@ -248,7 +255,7 @@ protected void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> of
try {
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
transactionInFlight = false;
transactionInFlight.end();
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to commit a transaction"),
Expand All @@ -272,7 +279,7 @@ void abortTransaction() {
if (!eosEnabled()) {
throw new IllegalStateException(formatException("Exactly-once is not enabled"));
}
if (transactionInFlight) {
if (transactionInFlight.inflight) {
try {
producer.abortTransaction();
} catch (final TimeoutException logAndSwallow) {
Expand All @@ -299,7 +306,7 @@ void abortTransaction() {
error
);
}
transactionInFlight = false;
transactionInFlight.end();
}
}

Expand All @@ -320,12 +327,42 @@ void flush() {

void close() {
producer.close();
transactionInFlight = false;
transactionInFlight.end();
transactionInitialized = false;
}

// for testing only
Producer<byte[], byte[]> kafkaProducer() {
return producer;
}

public static final class Transaction {

private boolean inflight = false;
public final Set<TaskId> associatedPartitions = new HashSet<>();
private Transaction() {

}

private void init() {
this.inflight = true;
}

private void end() {
this.inflight = false;
associatedPartitions.clear();
}

public boolean isInflight() {
return inflight;
}

private void maybeAddChangelogPartition(final ProducerRecord<byte[], byte[]> record, final TaskId taskId) {
final boolean isChangelogTopic = record.topic().endsWith("-changelog");
if (isChangelogTopic) {
associatedPartitions.add(taskId);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, OffsetAndMet
final Set<TaskId> corruptedTasks = new HashSet<>();

if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
if (!offsetsPerTask.isEmpty() || taskManager.streamsProducer().transactionInFlight()) {
if (!offsetsPerTask.isEmpty() || taskManager.streamsProducer().transactionInFlight().isInflight()) {
final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
.flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidOffsetException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -1082,7 +1081,7 @@ private void addTaskToStateUpdater(final Task task) {
} catch (final LockException lockException) {
// The state directory may still be locked by another thread, when the rebalance just happened.
// Retry in the next iteration.
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.", lockException);
log.info("Encountered lock exception. Reattempting locking the state in the next iteration.");
tasks.addPendingTasksToInit(Collections.singleton(task));
updateOrCreateBackoffRecord(task.id(), nowMs);
}
Expand Down Expand Up @@ -1282,7 +1281,7 @@ private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleLostAll() {
log.debug("Closing lost active tasks as zombies.");
log.info("Closing lost active tasks as zombies.");

closeRunningTasksDirty();
removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit();
Expand All @@ -1292,6 +1291,17 @@ void handleLostAll() {
}
}

void handleLost(final Set<TaskId> migratedTasks) {
log.info("Closing lost {} active tasks as zombies.", migratedTasks);

closeRunningTasksDirty(migratedTasks);
removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit();

if (processingMode == EXACTLY_ONCE_V2) {
activeTaskCreator.reInitializeProducer();
}
}

private void closeRunningTasksDirty() {
final Set<Task> allTask = tasks.allTasks();
final Set<TaskId> allTaskIds = tasks.allTaskIds();
Expand All @@ -1306,6 +1316,28 @@ private void closeRunningTasksDirty() {
maybeUnlockTasks(allTaskIds);
}

private void closeRunningTasksDirty(final Set<TaskId> taskIds) {
final Set<Task> allTask = tasks.allTasks();
maybeLockTasks(taskIds);
final Set<Task> tasksToCloseClean = new HashSet<>();
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can continue safely to maintain our
// standby tasks while we rejoin.
if (task.isActive()) {
if (taskIds.contains(task.id())) {
closeTaskDirty(task, true);
} else {
tasksToCloseClean.add(task);
}
}
}
final Collection<Task> failedTasks = tryCloseCleanActiveTasks(tasksToCloseClean, true, new AtomicReference<>());
for (final Task task : failedTasks) {
closeTaskDirty(task, true);
}
maybeUnlockTasks(taskIds);
}

private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
Expand Down Expand Up @@ -1543,7 +1575,7 @@ void shutdown(final boolean clean) {
throw fatalException;
}

log.info("Shutdown complete");
log.info("Shutdown complete clean=" + clean);
}

private void shutdownStateUpdater() {
Expand Down Expand Up @@ -1614,6 +1646,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeT
final boolean clean,
final AtomicReference<RuntimeException> firstException) {
if (!clean) {
log.info("Closing all active tasks dirty");
return activeTaskIterable();
}
final Comparator<Task> byId = Comparator.comparing(Task::id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ private void shouldClearOffsetsOnClose(final boolean clean) {
1,
1
);
when(streamsProducer.send(any(), any())).thenAnswer(invocation -> {
when(streamsProducer.send(any(), any(), any())).thenAnswer(invocation -> {
((Callback) invocation.getArgument(1)).onCompletion(metadata, null);
return null;
});
Expand Down Expand Up @@ -1812,7 +1812,7 @@ public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
when(streamsProducer.send(any(), any())).thenAnswer(
when(streamsProducer.send(any(), any(), any())).thenAnswer(
invocation -> {
final Callback callback = invocation.getArgument(1);
callback.onCompletion(null, new ProducerFencedException("KABOOM!"));
Expand Down
Loading
Loading