Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -660,6 +661,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand All @@ -677,6 +679,7 @@ interface ApplicationEventHandlerFactory {
ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.KafkaThread;
Expand All @@ -41,8 +46,13 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import javax.security.auth.spi.LoginModule;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static org.apache.kafka.common.utils.Utils.closeQuietly;

Expand All @@ -68,6 +78,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
private RequestManagers requestManagers;
private volatile boolean running;
private final IdempotentCloser closer = new IdempotentCloser();
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private final AtomicReference<KafkaException> initializationError = new AtomicReference<>();
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
private long lastPollTimeMs = 0L;
Expand All @@ -92,13 +104,57 @@ public ConsumerNetworkThread(LogContext logContext,
this.asyncConsumerMetrics = asyncConsumerMetrics;
}

/**
* Start the network thread and let it complete its initialization before proceeding. The
* {@link ClassicKafkaConsumer} constructor blocks during creation of its {@link NetworkClient}, providing
* precedent for waiting here.
*
* In certain cases (e.g. an invalid {@link LoginModule} in {@link SaslConfigs#SASL_JAAS_CONFIG}), an error
* could be thrown during {@link #initializeResources()}. This would result in the {@link #run()} method
* exiting, no longer able to process events, which means that the consumer effectively hangs.
*
* @param timeoutMs Length of time, in milliseconds, to wait for the thread to start and complete initialization
*/
public void start(int timeoutMs) {
// start() is invoked internally instead of by the caller to avoid SpotBugs errors about starting a thread
// in a constructor.
start();

try {
if (!initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
maybeSetInitializationError(
new TimeoutException("Consumer network thread resource initialization timed out after " + timeoutMs + " ms")
);
}
} catch (InterruptedException e) {
maybeSetInitializationError(
new InterruptException("Consumer network thread resource initialization was interrupted", e)
);
}

KafkaException e = initializationError.get();

if (e != null)
throw e;
}

@Override
public void run() {
try {
log.debug("Consumer network thread started");

// Wait until we're securely in the background network thread to initialize these objects...
initializeResources();
try {
initializeResources();
} catch (Throwable t) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
maybeSetInitializationError(e);

// This will still call cleanup() via the `finally` section below.
return;
} finally {
initializationLatch.countDown();
}

while (running) {
try {
Expand All @@ -108,13 +164,20 @@ public void run() {
log.error("Unexpected error caught in consumer network thread", e);
}
}
} catch (final Throwable e) {
log.error("Failed to initialize resources for consumer network thread", e);
} catch (Throwable t) {
log.error("Unexpected failure in consumer network thread", t);
} finally {
cleanup();
}
}

private void maybeSetInitializationError(KafkaException error) {
if (initializationError.compareAndSet(null, error))
return;

log.error("Consumer network thread resource initialization error ({}) will be suppressed as an error was already set", error.getMessage(), error);
}

void initializeResources() {
applicationEventProcessor = applicationEventProcessorSupplier.get();
networkClientDelegate = networkClientDelegateSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -413,6 +414,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
this.applicationEventHandler = new ApplicationEventHandler(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -478,6 +480,7 @@ interface ApplicationEventHandlerFactory {
ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ApplicationEventHandler implements Closeable {

public ApplicationEventHandler(final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand All @@ -69,7 +70,8 @@ public ApplicationEventHandler(final LogContext logContext,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics);
this.networkThread.start();

this.networkThread.start(initializationTimeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -131,6 +133,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
Expand All @@ -152,6 +155,7 @@

import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.security.auth.login.LoginException;

import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
Expand Down Expand Up @@ -3796,6 +3800,58 @@ void testMonitorablePlugins(GroupProtocol groupProtocol) {
}
}

/**
* This test ensures that both {@link Consumer} implementations fail on creation when the underlying
* {@link NetworkClient} fails creation.
*
* The logic to check for this case is admittedly a bit awkward because the constructor can fail for all
* manner of reasons. So a failure case is created by specifying an invalid
* {@link javax.security.auth.spi.LoginModule} class name, which in turn causes the {@link NetworkClient}
* to fail.
*
* This test was created to validate the change for KAFKA-19394 for the {@link AsyncKafkaConsumer}. The fix
* should handle the case where failure during initialization of resources (in this test, the underlying
* {@link NetworkClient}) will not cause the creation of the {@link AsyncKafkaConsumer} to hang.
*/
@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testConstructorFailOnNetworkClientConstructorFailure(GroupProtocol groupProtocol) {
Map<String, Object> configs = Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name,
SaslConfigs.SASL_MECHANISM, "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)
);

KafkaException e = assertThrows(KafkaException.class, () -> {
try (KafkaConsumer<String, String> ignored = new KafkaConsumer<>(configs)) {
fail("Should not be able to create the consumer");
}
});

assertEquals("Failed to construct kafka consumer", e.getMessage());

// The root cause is multiple exceptions deep. This code is more concise and should hopefully be trivial
// to update should the underlying implementation change.
Throwable cause = e.getCause();
assertNotNull(cause);
assertInstanceOf(KafkaException.class, cause);
assertEquals("Failed to create new NetworkClient", cause.getMessage());

cause = cause.getCause();
assertNotNull(cause);
assertInstanceOf(KafkaException.class, cause);
assertEquals(LoginException.class.getName() + ": No LoginModule found for org.example.InvalidLoginModule", cause.getMessage());

cause = cause.getCause();
assertNotNull(cause);
assertInstanceOf(LoginException.class, cause);
assertEquals("No LoginModule found for org.example.InvalidLoginModule", cause.getMessage());
}

private MetricName expectedMetricName(String clientId, String config, Class<?> clazz) {
Map<String, String> expectedTags = new LinkedHashMap<>();
expectedTags.put("client-id", clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,31 @@
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

public class ApplicationEventHandlerTest {
private final Time time = new MockTime();
private final int initializationTimeoutMs = 50;
private final BlockingQueue<ApplicationEvent> applicationEventsQueue = new LinkedBlockingQueue<>();
private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class);
private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class);
Expand All @@ -53,6 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) {
ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler(
new LogContext(),
time,
initializationTimeoutMs,
applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
Expand All @@ -65,4 +74,52 @@ public void testRecordApplicationEventQueueSize(String groupName) {
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
}
}

@Test
public void testFailOnInitializeResources() {
RuntimeException rootFailure = new RuntimeException("root failure");
KafkaException error = assertInitializeResourcesError(
KafkaException.class,
() -> {
throw rootFailure;
}
);
assertEquals(rootFailure, error.getCause());
}

@Test
public void testDelayInInitializeResources() {
assertInitializeResourcesError(
TimeoutException.class,
() -> {
long delayMs = initializationTimeoutMs * 2;
org.apache.kafka.common.utils.Utils.sleep(delayMs);
return networkClientDelegate;
}
);
}

@Test
public void testInterruptInInitializeResources() {
Thread.currentThread().interrupt();
assertInitializeResourcesError(InterruptException.class, () -> networkClientDelegate);
}

private <T extends Throwable> T assertInitializeResourcesError(Class<T> exceptionClass,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier) {
try (Metrics metrics = new Metrics();
AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics, "test-group"))) {
return assertThrows(exceptionClass, () -> new ApplicationEventHandler(
new LogContext(),
time,
initializationTimeoutMs,
applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
networkClientDelegateSupplier,
() -> requestManagers,
asyncConsumerMetrics
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private AsyncKafkaConsumer<String, String> newConsumerWithStreamRebalanceData(
new StringDeserializer(),
new StringDeserializer(),
time,
(logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
(logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
logContext -> backgroundEventReaper,
(logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector,
(consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata,
Expand All @@ -238,7 +238,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig config) {
new StringDeserializer(),
new StringDeserializer(),
time,
(logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
(logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
logContext -> backgroundEventReaper,
(logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector,
(consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata,
Expand Down
Loading
Loading