Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
Expand Down Expand Up @@ -190,7 +190,7 @@ public static final long getChecksumFailuresCount() {
}

public static final void updateReadLatency(long latencyMillis, boolean pread, boolean tooSlow) {
RpcServer.getCurrentCall().ifPresent(call -> call.updateFsReadTime(latencyMillis));
ThreadLocalServerSideScanMetrics.addFsReadTime(latencyMillis);
if (pread) {
MetricsIO.getInstance().updateFsPreadTime(latencyMillis);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,4 @@ void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowa

/** Returns A short string format of this call without possibly lengthy params */
String toShortString();

void updateFsReadTime(long latencyMillis);

long getFsReadTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
Expand Down Expand Up @@ -461,19 +462,18 @@ public Pair<Message, ExtendedCellScanner> call(RpcCall call, MonitoredRPCHandler
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
int totalTime = (int) (endTime - receiveTime);
long fsReadTime = ThreadLocalServerSideScanMetrics.getFsReadTimeCounter().get();
if (LOG.isTraceEnabled()) {
LOG.trace(
"{}, response: {}, receiveTime: {}, queueTime: {}, processingTime: {}, "
+ "totalTime: {}, fsReadTime: {}",
CurCall.get().toString(), TextFormat.shortDebugString(result),
CurCall.get().getReceiveTime(), qTime, processingTime, totalTime,
CurCall.get().getFsReadTime());
CurCall.get().getReceiveTime(), qTime, processingTime, totalTime, fsReadTime);
}
// Use the raw request call size for now.
long requestSize = call.getSize();
long responseSize = result.getSerializedSize();
long responseBlockSize = call.getBlockBytesScanned();
long fsReadTime = call.getFsReadTime();
if (call.isClientCellBlockSupported()) {
// Include the payload size in HBaseRpcController
responseSize += call.getResponseCellSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa

private long responseCellSize = 0;
private long responseBlockSize = 0;
private long fsReadTimeMillis = 0;
// cumulative size of serialized exceptions
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
Expand Down Expand Up @@ -604,14 +603,4 @@ public int getRemotePort() {
public synchronized BufferChain getResponse() {
return response;
}

@Override
public void updateFsReadTime(long latencyMillis) {
fsReadTimeMillis += latencyMillis;
}

@Override
public long getFsReadTime() {
return fsReadTimeMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.hadoop.hbase.monitoring;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* Thread-local storage for server-side scan metrics that captures performance data separately for
Expand Down Expand Up @@ -61,7 +63,8 @@
* @see RegionScanner
* @see org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
@InterfaceStability.Evolving
public final class ThreadLocalServerSideScanMetrics {
private ThreadLocalServerSideScanMetrics() {
}
Expand All @@ -81,6 +84,9 @@ private ThreadLocalServerSideScanMetrics() {
private static final ThreadLocal<AtomicLong> BLOCK_READ_OPS_COUNT =
ThreadLocal.withInitial(() -> new AtomicLong(0));

private static final ThreadLocal<AtomicLong> FS_READ_TIME =
ThreadLocal.withInitial(() -> new AtomicLong(0));

public static void setScanMetricsEnabled(boolean enable) {
IS_SCAN_METRICS_ENABLED.set(enable);
}
Expand All @@ -101,6 +107,10 @@ public static long addBlockReadOpsCount(long count) {
return BLOCK_READ_OPS_COUNT.get().addAndGet(count);
}

public static long addFsReadTime(long time) {
return FS_READ_TIME.get().addAndGet(time);
}

public static boolean isScanMetricsEnabled() {
return IS_SCAN_METRICS_ENABLED.get();
}
Expand All @@ -121,6 +131,10 @@ public static AtomicLong getBlockReadOpsCountCounter() {
return BLOCK_READ_OPS_COUNT.get();
}

public static AtomicLong getFsReadTimeCounter() {
return FS_READ_TIME.get();
}

public static long getBytesReadFromFsAndReset() {
return getBytesReadFromFsCounter().getAndSet(0);
}
Expand All @@ -137,11 +151,16 @@ public static long getBlockReadOpsCountAndReset() {
return getBlockReadOpsCountCounter().getAndSet(0);
}

public static long getFsReadTimeAndReset() {
return getFsReadTimeCounter().getAndSet(0);
}

public static void reset() {
getBytesReadFromFsAndReset();
getBytesReadFromBlockCacheAndReset();
getBytesReadFromMemstoreAndReset();
getBlockReadOpsCountAndReset();
getFsReadTimeAndReset();
}

public static void populateServerSideScanMetrics(ServerSideScanMetrics metrics) {
Expand All @@ -156,5 +175,7 @@ public static void populateServerSideScanMetrics(ServerSideScanMetrics metrics)
getBytesReadFromMemstoreCounter().get());
metrics.addToCounter(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
getBlockReadOpsCountCounter().get());
metrics.addToCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
getFsReadTimeCounter().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
Expand Down Expand Up @@ -3519,10 +3520,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// from block size progress before writing into the response
scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
scannerContext.getBlockSizeProgress());
if (rpcCall != null) {
scanMetrics.setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
rpcCall.getFsReadTime());
}
}
}
} finally {
Expand Down Expand Up @@ -3589,6 +3586,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
throw new ServiceException(e);
}
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(trackMetrics);
if (trackMetrics) {
ThreadLocalServerSideScanMetrics.reset();
}
requestCount.increment();
rpcScanRequestCount.increment();
RegionScannerContext rsx;
Expand Down Expand Up @@ -3659,7 +3661,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
ServerSideScanMetrics scanMetrics = trackMetrics ? new ServerSideScanMetrics() : null;
if (rows > 0) {
boolean done = false;
Expand Down Expand Up @@ -3741,6 +3742,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
rpcQueueWaitTime);
}
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
Map<String, Long> metrics = scanMetrics.getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
Expand Down Expand Up @@ -96,8 +95,6 @@ public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {

private RegionServerServices rsServices;

private ServerSideScanMetrics scannerInitMetrics = null;

@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
Expand Down Expand Up @@ -148,16 +145,7 @@ private static boolean hasNonce(HRegion region, long nonce) {
} finally {
region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
}
boolean isScanMetricsEnabled = scan.isScanMetricsEnabled();
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
if (isScanMetricsEnabled) {
this.scannerInitMetrics = new ServerSideScanMetrics();
ThreadLocalServerSideScanMetrics.reset();
}
Comment on lines -151 to -156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bit confused, do we really not need this at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed code in RegionScannerImpl was doing two things:

  • Capturing server side scan metrics even when RegionScanner is initialized and passing it to the next() call in RegionScannerImpl to update ServerSideScanMetrics from ScannerContext.
  • Resetting the state of thread local variable to start fresh for a scan.

For the first point given the call to RegionScannerImpl constructor and next() happens from same thread and its the same thread which is handling RSRpcServices#scan() call so, we can avoid passing around ServerSideScanMetrics from constructor to next() call if we populate ServerSideScanMetrics in RSRpcServices as done for some other scan metrics already.

For second point, I am taking care of resetting the thread local variables for each call in RSRpcServices#scan() call and its much cleaner also.

So, we are still doing same thing in RSRpcServices which we used to do in RegionScannerImpl it's just that its more concise and cleaner now. Please let me know if I am missing something. Thanks

initializeScanners(scan, additionalScanners);
if (isScanMetricsEnabled) {
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scannerInitMetrics);
}
}

public ScannerContext getContext() {
Expand Down Expand Up @@ -291,16 +279,6 @@ public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext sca
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
boolean isScanMetricsEnabled = scannerContext.isTrackingMetrics();
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
if (isScanMetricsEnabled) {
ThreadLocalServerSideScanMetrics.reset();
ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
if (scannerInitMetrics != null) {
scannerInitMetrics.getMetricsMap().forEach(scanMetrics::addToCounter);
scannerInitMetrics = null;
}
}
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
Expand All @@ -310,10 +288,6 @@ public boolean nextRaw(List<? super ExtendedCell> outResults, ScannerContext sca
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
if (isScanMetricsEnabled) {
ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
}
region.addReadRequestsCount(1);
if (region.getMetrics() != null) {
region.getMetrics().updateReadRequestCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,16 +919,6 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public void updateFsReadTime(long latencyMillis) {

}

@Override
public long getFsReadTime() {
return 0;
}
};
return rpcCall;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,6 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public void updateFsReadTime(long latencyMillis) {

}

@Override
public long getFsReadTime() {
return 0;
}
};
return rpcCall;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,6 @@ public long getResponseExceptionSize() {
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}

@Override
public void updateFsReadTime(long latencyMillis) {

}

@Override
public long getFsReadTime() {
return 0;
}
};
}
}