Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

public interface SpecializedGetters {

Expand All @@ -50,6 +52,10 @@ public interface SpecializedGetters {

byte[] getBinary(int ordinal);

GeographyVal getGeography(int ordinal);

GeometryVal getGeometry(int ordinal);

CalendarInterval getInterval(int ordinal);

VariantVal getVariant(int ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public static Object read(
if (physicalDataType instanceof PhysicalStringType) {
return obj.getUTF8String(ordinal);
}
if (physicalDataType instanceof PhysicalGeographyType) {
return obj.getGeography(ordinal);
}
if (physicalDataType instanceof PhysicalGeometryType) {
return obj.getGeometry(ordinal);
}
if (physicalDataType instanceof PhysicalDecimalType dt) {
return obj.getDecimal(ordinal, dt.precision(), dt.scale());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

Expand Down Expand Up @@ -222,6 +224,18 @@ public byte[] getBinary(int ordinal) {
return bytes;
}

@Override
public GeographyVal getGeography(int ordinal) {
byte[] bytes = getBinary(ordinal);
return (bytes == null) ? null : GeographyVal.fromBytes(bytes);
}

@Override
public GeometryVal getGeometry(int ordinal) {
byte[] bytes = getBinary(ordinal);
return (bytes == null) ? null : GeometryVal.fromBytes(bytes);
}

@Override
public CalendarInterval getInterval(int ordinal) {
if (isNullAt(ordinal)) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

Expand Down Expand Up @@ -417,6 +419,18 @@ public byte[] getBinary(int ordinal) {
}
}

@Override
public GeographyVal getGeography(int ordinal) {
byte[] bytes = getBinary(ordinal);
return (bytes == null) ? null : GeographyVal.fromBytes(bytes);
}

@Override
public GeometryVal getGeometry(int ordinal) {
byte[] bytes = getBinary(ordinal);
return (bytes == null) ? null : GeometryVal.fromBytes(bytes);
}

@Override
public CalendarInterval getInterval(int ordinal) {
if (isNullAt(ordinal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

Expand Down Expand Up @@ -111,6 +113,14 @@ public final void write(int ordinal, UTF8String input) {
writeUnalignedBytes(ordinal, input.getBaseObject(), input.getBaseOffset(), input.numBytes());
}

public final void write(int ordinal, GeographyVal input) {
write(ordinal, input.getBytes());
}

public final void write(int ordinal, GeometryVal input) {
write(ordinal, input.getBytes());
}

public final void write(int ordinal, byte[] input) {
write(ordinal, input, 0, input.length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

/**
* An interface representing in-memory columnar data in Spark. This interface defines the main APIs
Expand Down Expand Up @@ -288,6 +290,16 @@ public final ColumnarRow getStruct(int rowId) {
*/
public abstract byte[] getBinary(int rowId);

public GeographyVal getGeography(int rowId) {
byte[] bytes = getBinary(rowId);
return (bytes == null) ? null : GeographyVal.fromBytes(bytes);
}

public GeometryVal getGeometry(int rowId) {
byte[] bytes = getBinary(rowId);
return (bytes == null) ? null : GeometryVal.fromBytes(bytes);
}

/**
* Returns the calendar interval type value for {@code rowId}. If the slot for
* {@code rowId} is null, it should return null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

/**
* Array abstraction in {@link ColumnVector}.
Expand Down Expand Up @@ -174,6 +176,16 @@ public byte[] getBinary(int ordinal) {
return data.getBinary(offset + ordinal);
}

@Override
public GeographyVal getGeography(int ordinal) {
return data.getGeography(offset + ordinal);
}

@Override
public GeometryVal getGeometry(int ordinal) {
return data.getGeometry(offset + ordinal);
}

@Override
public CalendarInterval getInterval(int ordinal) {
return data.getInterval(offset + ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

/**
* This class wraps an array of {@link ColumnVector} and provides a row view.
Expand Down Expand Up @@ -72,6 +74,10 @@ public InternalRow copy() {
row.update(i, getUTF8String(i).copy());
} else if (pdt instanceof PhysicalBinaryType) {
row.update(i, getBinary(i));
} else if (pdt instanceof PhysicalGeographyType) {
row.update(i, getGeography(i));
} else if (pdt instanceof PhysicalGeometryType) {
row.update(i, getGeometry(i));
} else if (pdt instanceof PhysicalDecimalType t) {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (pdt instanceof PhysicalStructType t) {
Expand Down Expand Up @@ -132,6 +138,16 @@ public byte[] getBinary(int ordinal) {
return columns[ordinal].getBinary(rowId);
}

@Override
public GeographyVal getGeography(int ordinal) {
return columns[ordinal].getGeography(rowId);
}

@Override
public GeometryVal getGeometry(int ordinal) {
return columns[ordinal].getGeometry(rowId);
}

@Override
public CalendarInterval getInterval(int ordinal) {
return columns[ordinal].getInterval(rowId);
Expand Down Expand Up @@ -177,6 +193,10 @@ public Object get(int ordinal, DataType dataType) {
return getUTF8String(ordinal);
} else if (dataType instanceof BinaryType) {
return getBinary(ordinal);
} else if (dataType instanceof GeographyType) {
return getGeography(ordinal);
} else if (dataType instanceof GeometryType) {
return getGeometry(ordinal);
} else if (dataType instanceof DecimalType t) {
return getDecimal(ordinal, t.precision(), t.scale());
} else if (dataType instanceof DateType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
import org.apache.spark.unsafe.types.GeographyVal;
import org.apache.spark.unsafe.types.GeometryVal;

/**
* Row abstraction in {@link ColumnVector}.
Expand Down Expand Up @@ -77,6 +79,10 @@ public InternalRow copy() {
row.update(i, getUTF8String(i).copy());
} else if (pdt instanceof PhysicalBinaryType) {
row.update(i, getBinary(i));
} else if (pdt instanceof PhysicalGeographyType) {
row.update(i, getGeography(i));
} else if (pdt instanceof PhysicalGeometryType) {
row.update(i, getGeometry(i));
} else if (pdt instanceof PhysicalDecimalType t) {
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
} else if (pdt instanceof PhysicalStructType t) {
Expand Down Expand Up @@ -137,6 +143,16 @@ public byte[] getBinary(int ordinal) {
return data.getChild(ordinal).getBinary(rowId);
}

@Override
public GeographyVal getGeography(int ordinal) {
return data.getChild(ordinal).getGeography(rowId);
}

@Override
public GeometryVal getGeometry(int ordinal) {
return data.getChild(ordinal).getGeometry(rowId);
}

@Override
public CalendarInterval getInterval(int ordinal) {
return data.getChild(ordinal).getInterval(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{DataType, Decimal, StructType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.unsafe.types._

/**
* An [[InternalRow]] that projects particular columns from another [[InternalRow]] without copying
Expand Down Expand Up @@ -93,6 +93,14 @@ case class ProjectingInternalRow(schema: StructType,
row.getBinary(colOrdinals(ordinal))
}

override def getGeography(ordinal: Int): GeographyVal = {
row.getGeography(colOrdinals(ordinal))
}

override def getGeometry(ordinal: Int): GeometryVal = {
row.getGeometry(colOrdinals(ordinal))
}

override def getInterval(ordinal: Int): CalendarInterval = {
row.getInterval(colOrdinals(ordinal))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, C
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.types.{PhysicalBinaryType, PhysicalIntegerType, PhysicalLongType}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalType, DoubleType, FloatType, GeographyType, GeometryType, IntegerType, LongType, MapType, ObjectType, ShortType, StringType, StructType, TimestampNTZType, TimestampType, TimeType, UserDefinedType, VariantType, YearMonthIntervalType}
import org.apache.spark.unsafe.types.{CalendarInterval, GeographyVal, GeometryVal, UTF8String, VariantVal}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -107,6 +107,8 @@ object EncoderUtils {
case _: StructType => classOf[InternalRow]
case _: ArrayType => classOf[ArrayData]
case _: MapType => classOf[MapData]
case _: GeographyType => classOf[GeographyVal]
case _: GeometryType => classOf[GeometryVal]
case ObjectType(cls) => cls
case _ => typeJavaMapping.getOrElse(dt, classOf[java.lang.Object])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ object InterpretedUnsafeProjection {

case _: PhysicalStringType => (v, i) => writer.write(i, v.getUTF8String(i))

case _: PhysicalGeographyType => (v, i) => writer.write(i, v.getGeography(i))

case _: PhysicalGeometryType => (v, i) => writer.write(i, v.getGeometry(i))

case PhysicalVariantType => (v, i) => writer.write(i, v.getVariant(i))

case PhysicalStructType(fields) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.unsafe.types._

/**
* A mutable wrapper that makes two rows appear as a single concatenated row. Designed to
Expand Down Expand Up @@ -114,6 +114,12 @@ class JoinedRow extends InternalRow {
override def getBinary(i: Int): Array[Byte] =
if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields)

override def getGeography(i: Int): GeographyVal =
if (i < row1.numFields) row1.getGeography(i) else row2.getGeography(i - row1.numFields)

override def getGeometry(i: Int): GeometryVal =
if (i < row1.numFields) row1.getGeometry(i) else row2.getGeometry(i - row1.numFields)

override def getArray(i: Int): ArrayData =
if (i < row1.numFields) row1.getArray(i) else row2.getArray(i - row1.numFields)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,8 @@ object CodeGenerator extends Logging {
classOf[Platform].getName,
classOf[InternalRow].getName,
classOf[UnsafeRow].getName,
classOf[GeographyVal].getName,
classOf[GeometryVal].getName,
classOf[UTF8String].getName,
classOf[Decimal].getName,
classOf[CalendarInterval].getName,
Expand Down Expand Up @@ -1682,6 +1684,8 @@ object CodeGenerator extends Logging {
case _ => PhysicalDataType(dataType) match {
case _: PhysicalArrayType => s"$input.getArray($ordinal)"
case PhysicalBinaryType => s"$input.getBinary($ordinal)"
case _: PhysicalGeographyType => s"$input.getGeography($ordinal)"
case _: PhysicalGeometryType => s"$input.getGeometry($ordinal)"
case PhysicalCalendarIntervalType => s"$input.getInterval($ordinal)"
case t: PhysicalDecimalType => s"$input.getDecimal($ordinal, ${t.precision}, ${t.scale})"
case _: PhysicalMapType => s"$input.getMap($ordinal)"
Expand Down Expand Up @@ -1960,6 +1964,8 @@ object CodeGenerator extends Logging {
* Returns the Java type for a DataType.
*/
def javaType(dt: DataType): String = dt match {
case _: GeographyType => "GeographyVal"
case _: GeometryType => "GeometryVal"
case udt: UserDefinedType[_] => javaType(udt.sqlType)
case ObjectType(cls) if cls.isArray => s"${javaType(ObjectType(cls.getComponentType))}[]"
case ObjectType(cls) => cls.getName
Expand Down Expand Up @@ -1995,6 +2001,8 @@ object CodeGenerator extends Logging {
case DoubleType => java.lang.Double.TYPE
case _: DecimalType => classOf[Decimal]
case BinaryType => classOf[Array[Byte]]
case _: GeographyType => classOf[GeographyVal]
case _: GeometryType => classOf[GeometryVal]
case _: StringType => classOf[UTF8String]
case CalendarIntervalType => classOf[CalendarInterval]
case _: StructType => classOf[InternalRow]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.unsafe.types._
import org.apache.spark.util.ArrayImplicits._

/**
Expand All @@ -45,6 +45,8 @@ trait BaseGenericInternalRow extends InternalRow {
override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = getAs(ordinal)
override def getUTF8String(ordinal: Int): UTF8String = getAs(ordinal)
override def getBinary(ordinal: Int): Array[Byte] = getAs(ordinal)
override def getGeography(ordinal: Int): GeographyVal = getAs(ordinal)
override def getGeometry(ordinal: Int): GeometryVal = getAs(ordinal)
override def getArray(ordinal: Int): ArrayData = getAs(ordinal)
override def getInterval(ordinal: Int): CalendarInterval = getAs(ordinal)
override def getVariant(ordinal: Int): VariantVal = getAs(ordinal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.unsafe.types._

class GenericArrayData(val array: Array[Any]) extends ArrayData {

Expand Down Expand Up @@ -72,6 +72,8 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData {
override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = getAs(ordinal)
override def getUTF8String(ordinal: Int): UTF8String = getAs(ordinal)
override def getBinary(ordinal: Int): Array[Byte] = getAs(ordinal)
override def getGeography(ordinal: Int): GeographyVal = getAs(ordinal)
override def getGeometry(ordinal: Int): GeometryVal = getAs(ordinal)
override def getInterval(ordinal: Int): CalendarInterval = getAs(ordinal)
override def getVariant(ordinal: Int): VariantVal = getAs(ordinal)
override def getStruct(ordinal: Int, numFields: Int): InternalRow = getAs(ordinal)
Expand Down
Loading