Skip to content
5 changes: 5 additions & 0 deletions phoenix-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@
<artifactId>phoenix-hbase-compat-2.6.0</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-hbase-compat-2.6.4</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
3 changes: 3 additions & 0 deletions phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@
|| ("${hbase.compat.version}".equals("2.6.0")
&amp;&amp; hbaseMinor == 6
&amp;&amp; hbasePatch &gt;=0)
|| ("${hbase.compat.version}".equals("2.6.4")
&amp;&amp; hbaseMinor == 6
&amp;&amp; hbasePatch &gt;=4)
)</condition>
</evaluateBeanshell>
</rules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
Expand Down Expand Up @@ -163,6 +164,20 @@ private void updateMetrics() {
changeMetric(scanMetricsHolder.getCountOfBytesScanned(),
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
changeMetric(scanMetricsHolder.getFsReadTime(),
CompatScanMetrics.getFsReadTime(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromFS(),
CompatScanMetrics.getBytesReadFromFs(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromMemstore(),
CompatScanMetrics.getBytesReadFromMemstore(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBytesReadFromBlockcache(),
CompatScanMetrics.getBytesReadFromBlockCache(scanMetricsMap));
changeMetric(scanMetricsHolder.getCountOfBlockReadOps(),
CompatScanMetrics.getBlockReadOpsCount(scanMetricsMap));
changeMetric(scanMetricsHolder.getRpcScanProcessingTime(),
CompatScanMetrics.getRpcScanProcessingTime(scanMetricsMap));
changeMetric(scanMetricsHolder.getRpcScanQueueWaitTime(),
CompatScanMetrics.getRpcScanQueueWaitTime(scanMetricsMap));

changeMetric(GLOBAL_SCAN_BYTES, scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_RPC_CALLS, scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,18 @@ public enum MetricType {
COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries", LogLevel.DEBUG, PLong.INSTANCE),
COUNT_ROWS_SCANNED("ws", "Number of rows scanned", LogLevel.DEBUG, PLong.INSTANCE),
COUNT_ROWS_FILTERED("wf", "Number of rows filtered", LogLevel.DEBUG, PLong.INSTANCE),
FS_READ_TIME("frt", "Time spent in filesystem read", LogLevel.DEBUG, PLong.INSTANCE),
BYTES_READ_FROM_FS("brff", "Number of bytes read from filesystem", LogLevel.DEBUG,
PLong.INSTANCE),
BYTES_READ_FROM_MEMSTORE("brfm", "Number of bytes read from memstore", LogLevel.DEBUG,
PLong.INSTANCE),
BYTES_READ_FROM_BLOCKCACHE("brfc", "Number of bytes read from blockcache", LogLevel.DEBUG,
PLong.INSTANCE),
BLOCK_READ_OPS_COUNT("broc", "Number of block read operations", LogLevel.DEBUG, PLong.INSTANCE),
RPC_SCAN_PROCESSING_TIME("rsp", "Time spent in RPC scan processing", LogLevel.DEBUG,
PLong.INSTANCE),
RPC_SCAN_QUEUE_WAIT_TIME("rsqw", "Time spent in RPC scan queue wait", LogLevel.DEBUG,
PLong.INSTANCE),
COUNTER_METADATA_INCONSISTENCY("mi", "Number of times the metadata inconsistencies ",
LogLevel.DEBUG, PLong.INSTANCE),
NUM_SYSTEM_TABLE_RPC_SUCCESS("nstrs", "Number of successful system table RPC calls",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.phoenix.monitoring;

import static org.apache.phoenix.monitoring.MetricType.BLOCK_READ_OPS_COUNT;
import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_BLOCKCACHE;
import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_FS;
import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_MEMSTORE;
import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
Expand All @@ -28,7 +32,10 @@
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.MetricType.FS_READ_TIME;
import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.RPC_SCAN_PROCESSING_TIME;
import static org.apache.phoenix.monitoring.MetricType.RPC_SCAN_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;

import java.io.IOException;
Expand All @@ -52,6 +59,13 @@ public class ScanMetricsHolder {
private final CombinableMetric countOfRowsFiltered;
private final CombinableMetric countOfBytesScanned;
private final CombinableMetric countOfRowsPaged;
private final CombinableMetric fsReadTime;
private final CombinableMetric countOfBytesReadFromFS;
private final CombinableMetric countOfBytesReadFromMemstore;
private final CombinableMetric countOfBytesReadFromBlockcache;
private final CombinableMetric countOfBlockReadOps;
private final CombinableMetric rpcScanProcessingTime;
private final CombinableMetric rpcScanQueueWaitTime;
private Map<String, Long> scanMetricMap;
private Object scan;

Expand Down Expand Up @@ -83,6 +97,13 @@ private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName, Scan sc
countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, tableName);
countOfBytesScanned = readMetrics.allotMetric(SCAN_BYTES, tableName);
countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
fsReadTime = readMetrics.allotMetric(FS_READ_TIME, tableName);
countOfBytesReadFromFS = readMetrics.allotMetric(BYTES_READ_FROM_FS, tableName);
countOfBytesReadFromMemstore = readMetrics.allotMetric(BYTES_READ_FROM_MEMSTORE, tableName);
countOfBytesReadFromBlockcache = readMetrics.allotMetric(BYTES_READ_FROM_BLOCKCACHE, tableName);
countOfBlockReadOps = readMetrics.allotMetric(BLOCK_READ_OPS_COUNT, tableName);
rpcScanProcessingTime = readMetrics.allotMetric(RPC_SCAN_PROCESSING_TIME, tableName);
rpcScanQueueWaitTime = readMetrics.allotMetric(RPC_SCAN_QUEUE_WAIT_TIME, tableName);
}

public CombinableMetric getCountOfRemoteRPCcalls() {
Expand Down Expand Up @@ -141,6 +162,34 @@ public CombinableMetric getCountOfRowsPaged() {
return countOfRowsPaged;
}

public CombinableMetric getFsReadTime() {
return fsReadTime;
}

public CombinableMetric getCountOfBytesReadFromFS() {
return countOfBytesReadFromFS;
}

public CombinableMetric getCountOfBytesReadFromMemstore() {
return countOfBytesReadFromMemstore;
}

public CombinableMetric getCountOfBytesReadFromBlockcache() {
return countOfBytesReadFromBlockcache;
}

public CombinableMetric getCountOfBlockReadOps() {
return countOfBlockReadOps;
}

public CombinableMetric getRpcScanProcessingTime() {
return rpcScanProcessingTime;
}

public CombinableMetric getRpcScanQueueWaitTime() {
return rpcScanQueueWaitTime;
}

public void setScanMetricMap(Map<String, Long> scanMetricMap) {
this.scanMetricMap = scanMetricMap;
}
Expand All @@ -154,5 +203,4 @@ public String toString() {
return "{\"Exception while converting scan metrics to Json\":\"" + e.getMessage() + "\"}";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;

import java.util.Map;
import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.compat.hbase.CompatThreadLocalServerSideScanMetrics;

/**
* Stores scan metrics from data table operations performed during:
* <ul>
* <li>Uncovered global index scans</li>
* <li>Read repair operations</li>
* </ul>
* These metrics help identify latency variations that occur when both data table and index table
* are scanned together, and are used to populate {@link ThreadLocalServerSideScanMetrics} for index
* table RPC calls.
*/
public class DataTableScanMetrics {
private final long fsReadTimeInMs;
private final long bytesReadFromFS;
private final long bytesReadFromMemstore;
private final long bytesReadFromBlockcache;
private final long blockReadOps;

protected DataTableScanMetrics(long fsReadTimeInMs, long bytesReadFromFS,
long bytesReadFromMemstore, long bytesReadFromBlockcache, long blockReadOps) {
this.fsReadTimeInMs = fsReadTimeInMs;
this.bytesReadFromFS = bytesReadFromFS;
this.bytesReadFromMemstore = bytesReadFromMemstore;
this.bytesReadFromBlockcache = bytesReadFromBlockcache;
this.blockReadOps = blockReadOps;
}

public long getFsReadTimeInMs() {
return fsReadTimeInMs;
}

public long getBytesReadFromFS() {
return bytesReadFromFS;
}

public long getBytesReadFromMemstore() {
return bytesReadFromMemstore;
}

public long getBytesReadFromBlockcache() {
return bytesReadFromBlockcache;
}

public long getBlockReadOps() {
return blockReadOps;
}

public static class Builder {
protected long fsReadTimeInMs = 0;
protected long bytesReadFromFS = 0;
protected long bytesReadFromMemstore = 0;
protected long bytesReadFromBlockcache = 0;
protected long blockReadOps = 0;

public Builder setFsReadTimeInMs(long fsReadTimeInMs) {
this.fsReadTimeInMs = fsReadTimeInMs;
return this;
}

public Builder setBytesReadFromFS(long bytesReadFromFS) {
this.bytesReadFromFS = bytesReadFromFS;
return this;
}

public Builder setBytesReadFromMemstore(long bytesReadFromMemstore) {
this.bytesReadFromMemstore = bytesReadFromMemstore;
return this;
}

public Builder setBytesReadFromBlockcache(long bytesReadFromBlockcache) {
this.bytesReadFromBlockcache = bytesReadFromBlockcache;
return this;
}

public Builder setBlockReadOps(long blockReadOps) {
this.blockReadOps = blockReadOps;
return this;
}

public DataTableScanMetrics build() {
return new DataTableScanMetrics(fsReadTimeInMs, bytesReadFromFS, bytesReadFromMemstore,
bytesReadFromBlockcache, blockReadOps);
}
}

public static void buildDataTableScanMetrics(Map<String, Long> scanMetrics, Builder builder) {
builder.setFsReadTimeInMs(CompatScanMetrics.getFsReadTime(scanMetrics))
.setBytesReadFromFS(CompatScanMetrics.getBytesReadFromFs(scanMetrics))
.setBytesReadFromMemstore(CompatScanMetrics.getBytesReadFromMemstore(scanMetrics))
.setBytesReadFromBlockcache(CompatScanMetrics.getBytesReadFromBlockCache(scanMetrics))
.setBlockReadOps(CompatScanMetrics.getBlockReadOpsCount(scanMetrics));
}

public void populateThreadLocalServerSideScanMetrics() {
CompatThreadLocalServerSideScanMetrics.addFsReadTime(fsReadTimeInMs);
CompatThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesReadFromFS);
CompatThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(bytesReadFromMemstore);
CompatThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(bytesReadFromBlockcache);
CompatThreadLocalServerSideScanMetrics.addBlockReadOpsCount(blockReadOps);
}
}
Loading