Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1822,19 +1822,9 @@ private Fetch<K, V> pollForFetches(Timer timer) {
* of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and
* the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch}
* for returning.
*
* <p/>
*
* This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
*/
private Fetch<K, V> collectFetch() {
final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);

// Notify the network thread to wake up and start the next round of fetching.
applicationEventHandler.wakeupNetworkThread();

return fetch;
return fetchCollector.collectFetch(fetchBuffer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

Check notice on line 44 in clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java

View workflow job for this annotation

GitHub Actions / build / Compile and Check (Merge Ref)

Checkstyle error

Unused import - java.util.concurrent.TimeUnit.
import java.util.function.Supplier;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
Expand Down Expand Up @@ -252,12 +253,6 @@
return running;
}

public void wakeup() {
// The network client can be null if the initializeResources method has not yet been called.
if (networkClientDelegate != null)
networkClientDelegate.wakeup();
}

/**
* Returns the delay for which the application thread can safely wait before it should be responsive
* to results from the request managers. For example, the subscription state can change when heartbeats
Expand Down Expand Up @@ -311,7 +306,9 @@
log.trace("Signaling the consumer network thread to close in {}ms", timeoutMs);
running = false;
closeTimeout = timeout;
wakeup();

if (networkClientDelegate != null)
networkClientDelegate.wakeup();

try {
join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,23 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
trySend(currentTimeMs);

long pollTimeoutMs = timeoutMs;
if (!unsentRequests.isEmpty()) {

if (!hasAnyPendingRequests()) {
// If there aren't any outgoing or inflight requests, execute a cursory poll so that the
// internal NetworkClient can do all of its work, but don't block unnecessarily.
pollTimeoutMs = 0;
} else if (!unsentRequests.isEmpty()) {
pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
}
this.client.poll(pollTimeoutMs, currentTimeMs);

List<ClientResponse> clientResponses = this.client.poll(pollTimeoutMs, currentTimeMs);
maybePropagateMetadataError();
checkDisconnects(currentTimeMs);
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), currentTimeMs);

// To mimic the classic Consumer, if there were responses, wake up the Selector.
if (clientResponses != null && !clientResponses.isEmpty())
client.wakeup();
}

private void maybePropagateMetadataError() {
Expand Down Expand Up @@ -182,6 +192,8 @@ public boolean hasAnyPendingRequests() {
*/
private void trySend(final long currentTimeMs) {
Iterator<UnsentRequest> iterator = unsentRequests.iterator();
boolean requestSent = false;

while (iterator.hasNext()) {
UnsentRequest unsent = iterator.next();
unsent.timer.update(currentTimeMs);
Expand All @@ -199,7 +211,12 @@ private void trySend(final long currentTimeMs) {
}
iterator.remove();
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - unsent.enqueueTimeMs());
requestSent = true;
}

// Mimic the classic Consumer in that if any of the enqueued requests were sent, wake up the Selector.
if (requestSent)
client.wakeup();
}

boolean doSend(final UnsentRequest r, final long currentTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,26 +645,17 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack
if (fetch.isEmpty()) {
// Check for any acknowledgements which could have come from control records (GAP) and include them.
applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap, fetch.takeAcknowledgedRecords()));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
} else if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
}
return fetch;
} else {
if (!acknowledgementsMap.isEmpty()) {
// Asynchronously commit any waiting acknowledgements
Timer timer = time.timer(defaultApiTimeoutMs);
applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, calculateDeadlineMs(timer)));

// Notify the network thread to wake up and start the next round of fetching
applicationEventHandler.wakeupNetworkThread();
}
if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
// We cannot leave unacknowledged records in EXPLICIT acknowledgement mode, so we throw an exception to the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public ApplicationEventHandler(final LogContext logContext,
}

/**
* Add an {@link ApplicationEvent} to the handler and then internally invoke {@link #wakeupNetworkThread()}
* to alert the network I/O thread that it has something to process.
* Add an {@link ApplicationEvent} to the handler which will internally alert the network I/O thread that it
* has something to process.
*
* @param event An {@link ApplicationEvent} created by the application thread
*/
Expand All @@ -85,14 +85,6 @@ public void add(final ApplicationEvent event) {
// to avoid race conditions (the background thread is continuously removing from this queue)
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size() + 1);
applicationEventQueue.add(event);
wakeupNetworkThread();
}

/**
* Wakeup the {@link ConsumerNetworkThread network I/O thread} to pull the next event(s) from the queue.
*/
public void wakeupNetworkThread() {
networkThread.wakeup();
}

/**
Expand Down
Loading