Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.RowSizeParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
Expand Down Expand Up @@ -752,6 +753,16 @@ protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement
protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select,
boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, boolean inJoin,
boolean inUnion) throws SQLException {
for (AliasedNode node : select.getSelect()) {
if (node.getNode() instanceof RowSizeParseNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not clear with the usage. Does it mean we can't use query where row_size needs to be fetched for each row (e.g. select row_size() from table or select row_size() from table group by tenant_id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the exception message and also the row size test to see how to get individual row sizes.

throw new SQLException(
"ROW_SIZE() can only be an argument to an aggregation function in a select clause. \n"
+ "To get the size of a single row, an aggregation function can be used over the row, e.g., \n"
+ "SELECT SUM(ROW_SIZE()) ... WHERE PK = <my PK>. To return the row sizes for multiple rows, \n"
+ "a group by clause can be used to have single row groups, e.g., \n"
+ "SELECT SUM(ROW_SIZE()) ... WHERE PK = <my PK> GROUP BY PK ");
}
}
boolean isApplicable = true;
PTable projectedTable = null;
if (this.projectTuples) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class StatementContext {
private Set<StatementContext> subStatementContexts;
private boolean totalSegmentsFunction = false;
private Integer totalSegmentsValue;
private boolean hasRowSizeFunction = false;
private boolean hasRawRowSizeFunction = false;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
Expand Down Expand Up @@ -121,6 +123,8 @@ public StatementContext(StatementContext context) {
this.subStatementContexts = Sets.newHashSet();
this.totalSegmentsFunction = context.totalSegmentsFunction;
this.totalSegmentsValue = context.totalSegmentsValue;
this.hasRowSizeFunction = context.hasRowSizeFunction;
this.hasRawRowSizeFunction = context.hasRawRowSizeFunction;
}

/**
Expand Down Expand Up @@ -448,6 +452,22 @@ public void setTotalSegmentsFunction(boolean totalSegmentsFunction) {
this.totalSegmentsFunction = totalSegmentsFunction;
}

public boolean hasRowSizeFunction() {
return hasRowSizeFunction;
}

public boolean hasRawRowSizeFunction() {
return hasRawRowSizeFunction;
}

public void setHasRowSizeFunctionFunction(boolean hasRowSizeFunction) {
this.hasRowSizeFunction = hasRowSizeFunction;
}

public void setHasRawRowSizeFunctionFunction(boolean hasRawRowSizeFunction) {
this.hasRawRowSizeFunction = hasRawRowSizeFunction;
}

public Integer getTotalSegmentsValue() {
return totalSegmentsValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.RowKeyComparisonFilter;
import org.apache.phoenix.filter.RowLevelFilter;
import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
import org.apache.phoenix.parse.ColumnParseNode;
Expand Down Expand Up @@ -540,7 +541,10 @@ private static void setScanFilter(StatementContext context, FilterableStatement
scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_FILTER_STR,
Bytes.toBytes(whereClause.toString()));
}
} else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
} else if (
whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)
&& !context.hasRowSizeFunction() && !context.hasRawRowSizeFunction()
) {
Filter filter = null;
final Counter counter = new Counter();
whereClause.accept(new KeyValueExpressionVisitor() {
Expand Down Expand Up @@ -599,6 +603,12 @@ public Void visit(KeyValueColumnExpression expression) {
break;
}
scan.setFilter(filter);
} else if (whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)) {
if (context.hasRawRowSizeFunction()) {
scan.setFilter(new RowLevelFilter(whereClause, true));
} else {
scan.setFilter(new RowLevelFilter(whereClause, false));
}
}

ScanRanges scanRanges = context.getScanRanges();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,16 @@ public interface Expression extends PDatum, Writable {
default boolean contains(Expression other) {
return this.equals(other);
}

/**
* Determine if the expression should be evaluated over the entire row
*/
default boolean isRowLevel() {
for (Expression child : getChildren()) {
if (child.isRowLevel()) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ public enum ExpressionType {
SubBinaryFunction(SubBinaryFunction.class),
ScanStartKeyFunction(ScanStartKeyFunction.class),
ScanEndKeyFunction(ScanEndKeyFunction.class),
TotalSegmentsFunction(TotalSegmentsFunction.class);
TotalSegmentsFunction(TotalSegmentsFunction.class),
RowSizeFunction(RowSizeFunction.class),
RawRowSizeFunction(RawRowSizeFunction.class);

ExpressionType(Class<? extends Expression> clazz) {
this.clazz = clazz;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.expression.function;

import java.util.List;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.parse.RawRowSizeParseNode;

/**
* Function to return the total size of the all HBase cells versions and delete markers for a given
* row
*/
@BuiltInFunction(name = RawRowSizeFunction.NAME, nodeClass = RawRowSizeParseNode.class, args = {})
public class RawRowSizeFunction extends RowSizeFunction {

public static final String NAME = "RAW_ROW_SIZE";

public RawRowSizeFunction() {
}

public RawRowSizeFunction(List<Expression> children) {
super(children);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.expression.function;

import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.parse.RowSizeParseNode;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PUnsignedLong;

/**
* Function to return the total size of the HBase cells that constitute a given row
*/
@BuiltInFunction(name = RowSizeFunction.NAME, nodeClass = RowSizeParseNode.class, args = {})
public class RowSizeFunction extends ScalarFunction {
Copy link
Contributor

@ujjawal4046 ujjawal4046 Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the scalar function evaluated on server side as well (or only on client side) ? If it's client side, then we need to fetch the whole row back to client for size computation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be evaluated on the client size to check if the where clause evaluates to true on an empty tuple and, or when it is specified as a top level expression node in a select clause. This PR does not allow the row_size function to be a top level node in a select clause. In this PR, a row is never returned to the client; only its size is returned as part of an aggregation function result.


public static final String NAME = "ROW_SIZE";

public RowSizeFunction() {
}

public RowSizeFunction(List<Expression> children) {
super(children);
}

@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
if (tuple == null) {
return false;
}
long size = 0;
for (int i = 0; i < tuple.size(); i++) {
size += tuple.getValue(i).getSerializedSize();
}
ptr.set(PUnsignedLong.INSTANCE.toBytes(size));
return true;
}

@Override
public PDataType getDataType() {
return PUnsignedLong.INSTANCE;
}

@Override
public String getName() {
return NAME;
}

@Override
public boolean isStateless() {
return false;
}

@Override
public Determinism getDeterminism() {
return Determinism.PER_ROW;
}

@Override
public boolean isRowLevel() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.filter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;

/**
* Filter used when expressions reference to the entire row
*/
public class RowLevelFilter extends BooleanExpressionFilter {
private boolean allVersions = false;
private boolean keepRow = false;

public RowLevelFilter() {
}

public RowLevelFilter(Expression expression, boolean allVersions) {
super(expression);
this.allVersions = allVersions;
}

@Override
public void reset() {
super.reset();
keepRow = false;
}

// No @Override for HBase 3 compatibility
public ReturnCode filterKeyValue(Cell v) {
return filterCell(v);
}

@Override
public ReturnCode filterCell(Cell v) {
return allVersions ? ReturnCode.INCLUDE : ReturnCode.INCLUDE_AND_NEXT_COL;
}

@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
Tuple tuple = new MultiKeyValueTuple();
tuple.setKeyValues(kvs);
keepRow = Boolean.TRUE.equals(evaluate(tuple));
}

@Override
public boolean filterRow() {
return !this.keepRow;
}

@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
allVersions = input.readBoolean();
}

@Override
public void write(DataOutput output) throws IOException {
super.write(output);
output.writeBoolean(allVersions);
}

public static RowLevelFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
try {
return (RowLevelFilter) Writables.getWritable(pbBytes, new RowLevelFilter());
} catch (IOException e) {
throw new DeserializationException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer
}
// Add FirstKeyOnlyFilter or EmptyColumnOnlyFilter if there are no references
// to key value columns. We use FirstKeyOnlyFilter when possible
if (keyOnlyFilter) {
if (keyOnlyFilter && !context.hasRowSizeFunction() && !context.hasRawRowSizeFunction()) {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
byte[] ecq = table.getEncodingScheme() == NON_ENCODED_QUALIFIERS
? QueryConstants.EMPTY_COLUMN_BYTES
Expand Down Expand Up @@ -361,12 +361,22 @@ private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
if (context.hasRowSizeFunction() || context.hasRawRowSizeFunction()) {
scan.getFamilyMap().clear();
if (context.hasRawRowSizeFunction()) {
scan.setRaw(true);
scan.readAllVersions();
}
}
}
}

private static void setQualifierRanges(boolean keyOnlyFilter, PTable table, Scan scan,
StatementContext context) throws SQLException {
if (EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan)) {
if (
!context.hasRowSizeFunction() && !context.hasRawRowSizeFunction()
&& EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan, context)
) {
Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] cq = whereCol.getSecond();
Expand Down Expand Up @@ -476,7 +486,7 @@ private static void optimizeProjection(StatementContext context, Scan scan, PTab
if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
// Allow seeking to column during filtering
preventSeekToColumn = false;
} else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan)) {
} else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan, context)) {
/*
* preventSeekToColumn cannot be true, even if hinted, when encoded qualifier list
* optimization is being used. When using the optimization, it is necessary that we explicitly
Expand Down
Loading