Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8799de5
[SPARK-52617][SQL]Cast TIME to/from TIMESTAMP_NTZ
subham611 Jul 5, 2025
c37bbf0
Adds time to TimestampNTZType conversion
subham611 Jul 5, 2025
352aacb
Fix linting
subham611 Jul 5, 2025
7faed87
Remove unused import
subham611 Jul 5, 2025
11c3a26
Adds can cast
subham611 Jul 5, 2025
4be2758
Fix import
subham611 Jul 5, 2025
e020fae
Resolved ambiguous import
subham611 Jul 5, 2025
5b9a10e
Fix UT failure
subham611 Jul 5, 2025
d6f4424
Enable casting in ansi mode
subham611 Jul 5, 2025
f60c6cb
Calculate current day outside buildCast
subham611 Jul 5, 2025
828d214
Adds RewriteTimeCastToTimestampNTZ rule
subham611 Jul 6, 2025
90482d6
Fix additional change
subham611 Jul 6, 2025
a9d1bdd
Fix unused import
subham611 Jul 6, 2025
fdc9b92
Fix UT
subham611 Jul 6, 2025
b919e4e
Fix UT failure
subham611 Jul 6, 2025
e2ef8af
Fix UT
subham611 Jul 7, 2025
0bada28
Fix UT
subham611 Jul 7, 2025
2a5e9ad
Move to resolver
subham611 Jul 7, 2025
7d77c1f
Revert unwanted changes
subham611 Jul 7, 2025
cb6ad55
Resolve comment
subham611 Jul 7, 2025
cd503a5
Delete resolver
subham611 Jul 7, 2025
8bc58da
Fix import
subham611 Jul 7, 2025
8a30b2a
Add back RewriteTimeCastToTimestampNTZ rule
subham611 Jul 8, 2025
fd1aef3
Fix UT
subham611 Jul 8, 2025
5c6c97f
Resolve comments
subham611 Jul 8, 2025
6169ea4
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/anal…
SubhamSinghal Jul 8, 2025
29ca860
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util…
SubhamSinghal Jul 8, 2025
011b254
Merge master
subham611 Jul 8, 2025
ca02825
Resolve comments
subham611 Jul 8, 2025
72bb9b5
remove new line
subham611 Jul 8, 2025
68f0c98
Fix lint
subham611 Jul 8, 2025
6b5ed62
Lint fix
subham611 Jul 8, 2025
e53bfe3
Modify rule
subham611 Jul 8, 2025
b488aae
Rewrite rule
subham611 Jul 8, 2025
867967e
Fix import order
subham611 Jul 8, 2025
ebf16ed
Adds codegen
subham611 Jul 9, 2025
58c0bea
Fix UT
subham611 Jul 9, 2025
b82e21c
Resolve comment
subham611 Jul 10, 2025
4239bef
Lint fix
subham611 Jul 10, 2025
fd5e76e
Adds type coersion rule
subham611 Jul 10, 2025
89dfef0
Resolve merge conflict
subham611 Jul 16, 2025
68dc4a9
Update nanos in a day function
subham611 Jul 16, 2025
76abdaf
Merge master
subham611 Jul 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
StackCoercion ::
Division ::
IntegralDivision ::
RewriteTimeCastToTimestampNTZ ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate rule? I think we should be able to make it part of one of the existing rule, maybe DateTimeOperations @MaxGekk ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually is this Cast coming from internal type coercion or user specified casts? If former, we should perform this change in the place where we add a cast. If latter, we should have an analysis rule for rewrite and not a type coercion one, since there is no coercion going on here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk Can you help here?

ImplicitTypeCasts ::
DateTimeOperations ::
WindowFrameCoercion ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ object TypeCoercion extends TypeCoercionBase {
StackCoercion ::
Division ::
IntegralDivision ::
RewriteTimeCastToTimestampNTZ ::
ImplicitTypeCasts ::
DateTimeOperations ::
WindowFrameCoercion ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.{
CaseWhen,
Cast,
Concat,
CurrentDate,
Elt,
Expression,
MakeTimestampNTZ,
MapZipWith,
Stack,
WindowSpecDefinition
Expand All @@ -51,7 +53,7 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure
import org.apache.spark.sql.errors.DataTypeErrors.cannotMergeIncompatibleDataTypesError
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, TimestampNTZType, TimeType}

abstract class TypeCoercionBase extends TypeCoercionHelper {

Expand Down Expand Up @@ -480,4 +482,35 @@ abstract class TypeCoercionBase extends TypeCoercionHelper {
case withChildrenResolved => StringLiteralTypeCoercion(withChildrenResolved)
}
}

/**
* Rewrites a cast from [[TimeType]] to [[TimestampNTZType]] into a [[MakeTimestampNTZ]]
* expression.
*
* The conversion from TIME to TIMESTAMP_NTZ requires a date component, which TIME itself does
* not provide. This rule injects [[CurrentDate]] as the implicit date part, effectively
* treating the TIME value as a time of day on the current date. This rewrite ensures that all
* such casts within a query use a consistent date, as required by the [[ComputeCurrentTime]]
* rule which replaces [[CurrentDate]] with a fixed value during analysis.
*
* For example, the following SQL:
* {{{
* SELECT CAST(make_time(12, 30, 0) AS TIMESTAMP_NTZ)
* }}}
* will be rewritten to:
* {{{
* SELECT make_timestamp_ntz(current_date, make_time(12, 30, 0))
* }}}
*/
object RewriteTimeCastToTimestampNTZ extends TypeCoercionRule {
override def transform: PartialFunction[Expression, Expression] = {
case c @ Cast(child, TimestampNTZType, _, _) if child.resolved =>
child.dataType match {
case _: TimeType =>
// Convert TIME -> TIMESTAMP_NTZ using MakeTimestampNTZ(CurrentDate(), time)
MakeTimestampNTZ(CurrentDate(), child)
case _ => c
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis.resolver

import org.apache.spark.sql.catalyst.expressions.{
Cast,
CurrentDate,
DefaultStringProducingExpression,
Expression,
TimeZoneAwareExpression
}
import org.apache.spark.sql.types.StringType
MakeTimestampNTZ,
TimeZoneAwareExpression}
import org.apache.spark.sql.types.{StringType, TimestampNTZType, TimeType}

/**
* Resolves [[TimeZoneAwareExpressions]] by applying the session's local timezone.
Expand Down Expand Up @@ -58,12 +59,14 @@ class TimezoneAwareExpressionResolver(expressionResolver: ExpressionResolver)
expressionWithResolvedChildren,
traversals.current.sessionLocalTimeZone
)

coerceExpressionTypes(
expression = expressionWithResolvedChildrenAndTimeZone,
expressionTreeTraversal = traversals.current
) match {
expressionTreeTraversal = traversals.current) match {
case cast: Cast if traversals.current.defaultCollation.isDefined =>
tryCollapseCast(cast, traversals.current.defaultCollation.get)
case Cast(child, TimestampNTZType, _, _) if child.dataType.isInstanceOf[TimeType] =>
MakeTimestampNTZ(CurrentDate(), child)
Comment on lines +68 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to have this if we have a type coercion rule. Type coercion rules run implicitly in single-pass

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihailotim-db are you suggesting to revert all change in TimeZoneAwareExpressionResolver class and type coercion rule will take care of single pass and fixed point analyzer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take on this is that we should have a separate Analyzer rule (not type coercion rule) that does this transformation given that this cast is user specified. Because it might affect schema I would put this rule after the main resolution batch to avoid any alias computation issues. In that case, we would need this code in single-pass analyzer as well.

case other =>
other
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ object Cast extends QueryErrorsBase {
case (_: StringType, _: TimeType) => true
case (TimestampType, DateType) => true
case (TimestampNTZType, DateType) => true
case (TimestampNTZType, _: TimeType) => true

case (_: NumericType, _: NumericType) => true
case (_: StringType, _: NumericType) => true
Expand All @@ -137,6 +138,7 @@ object Cast extends QueryErrorsBase {
case (_, VariantType) => variant.VariantGet.checkDataType(from, allowStructsAndMaps = false)

case (_: TimeType, _: TimeType) => true
case (_: TimeType, TimestampNTZType) => true
case (_: TimeType, _: IntegralType) => true

// non-null variants can generate nulls even in ANSI mode
Expand Down Expand Up @@ -232,6 +234,7 @@ object Cast extends QueryErrorsBase {
case (_: StringType, _: TimeType) => true
case (TimestampType, DateType) => true
case (TimestampNTZType, DateType) => true
case (TimestampNTZType, _: TimeType) => true

case (_: TimeType, _: DecimalType) => true

Expand Down Expand Up @@ -259,6 +262,7 @@ object Cast extends QueryErrorsBase {
case (_, VariantType) => variant.VariantGet.checkDataType(from, allowStructsAndMaps = false)

case (_: TimeType, _: TimeType) => true
case (_: TimeType, TimestampNTZType) => true
case (_: TimeType, _: IntegralType) => true

case (ArrayType(fromType, fn), ArrayType(toType, tn)) =>
Expand Down Expand Up @@ -763,6 +767,13 @@ case class Cast(
}
case _: TimeType =>
buildCast[Long](_, nanos => DateTimeUtils.truncateTimeToPrecision(nanos, to.precision))
case _: TimestampNTZType =>
buildCast[Long](
_,
micros => {
val nanosInDay = DateTimeUtils.getNanosInADay(micros)
DateTimeUtils.truncateTimeToPrecision(nanosInDay, to.precision)
})
}

// IntervalConverter
Expand Down Expand Up @@ -1427,6 +1438,13 @@ case class Cast(
code"""
$evPrim = $dateTimeUtilsCls.truncateTimeToPrecision($nanos, ${to.precision});
"""
case _: TimestampNTZType =>
val nanos = ctx.freshName("nanos")
(micros, evPrim, _) =>
code"""
final long $nanos = $dateTimeUtilsCls.getNanosInADay($micros);
$evPrim = $dateTimeUtilsCls.truncateTimeToPrecision($nanos, ${to.precision});
"""
case _ =>
(_, _, evNull) => code"$evNull = true;"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,4 +874,17 @@ object DateTimeUtils extends SparkDateTimeUtils {
time, timePrecision, interval, intervalEndField)
}
}

/**
* Returns the number of nanoseconds past midnight for a given timestamp in microseconds.
*
* This method uses `toJulianDay`, which splits the timestamp into (Julian day, nanoseconds in
* day), and returns only the nanosecond component.
*
* @param micros The timestamp in microseconds since the epoch.
* @return The number of nanoseconds past midnight on that day.
*/
def getNanosInADay(micros: Long): Long = {
localTimeToNanos(microsToLocalDateTime(micros).toLocalTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,13 @@ class TypeCoercionSuite extends TypeCoercionSuiteBase {
assert(wp1.isInstanceOf[Project])
assert(wp1.expressions.forall(!_.exists(_ == t1.output.head)))
}

test("SPARK-52617: RewriteTimeCastToTimestampNTZ: TIME to TIMESTAMP_NTZ coercion") {
val expr = Cast(Literal.create(123456789L, TimeType(6)), TimestampNTZType)
val coerced = RewriteTimeCastToTimestampNTZ.transform.apply(expr)
val expected = MakeTimestampNTZ(CurrentDate(), Literal.create(123456789L, TimeType(6)))
assert(coerced.semanticEquals(expected))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.spark.sql.catalyst.analysis.resolver

import java.sql.Time

import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.FunctionResolution
import org.apache.spark.sql.catalyst.expressions.{
AttributeReference,
Cast,
CurrentDate,
Expression,
Literal,
MakeTimestampNTZ,
TimeZoneAwareExpression
}
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.types.{IntegerType, StringType, TimestampNTZType, TimeType}

class TimezoneAwareExpressionResolverSuite extends SparkFunSuite {

Expand Down Expand Up @@ -82,6 +87,30 @@ class TimezoneAwareExpressionResolverSuite extends SparkFunSuite {
assert(resolvedExpression.getTagValue(Cast.USER_SPECIFIED_CAST).nonEmpty)
}

test("SPARK-52617: Rewrite Cast(TimeType -> TimestampNTZType) to MakeTimestampNTZ") {
val millis = Time.valueOf("12:34:56").getTime
val timeExpr = Literal(millis * 1000L, TimeType(6)) // microseconds since midnight

val input = Cast(timeExpr, TimestampNTZType)

val newExpressionResolver = new HardCodedExpressionResolver(
catalogManager = mock[CatalogManager],
resolvedExpression = timeExpr)

val newTimezoneAwareExpressionResolver =
new TimezoneAwareExpressionResolver(newExpressionResolver)

val resolvedExpr =
newExpressionResolver.getExpressionTreeTraversals.withNewTraversal(OneRowRelation()) {
newTimezoneAwareExpressionResolver.resolve(input)
}
assert(resolvedExpr.isInstanceOf[MakeTimestampNTZ])

val makeTs = resolvedExpr.asInstanceOf[MakeTimestampNTZ]
assert(makeTs.left.isInstanceOf[CurrentDate])
assert(makeTs.right.semanticEquals(timeExpr))
}

test("Timezone is applied recursively") {
val expressionWithTimezone =
TimezoneAwareExpressionResolver.resolveTimezone(nestedCasts, "UTC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.catalyst.util.IntervalUtils.microsToDuration
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1611,6 +1611,33 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-52617: cast TimestampNTZType to time") {
specialTs.foreach { s =>
val ldt = LocalDateTime.parse(s) // parsed as local timestamp
val micros = DateTimeUtils.localDateTimeToMicros(ldt)

val nanosOfDay = ldt.toLocalTime().toNanoOfDay
val expected = DateTimeUtils.truncateTimeToPrecision(nanosOfDay, TimeType.DEFAULT_PRECISION)

checkEvaluation(Cast(Literal(micros, TimestampNTZType), TimeType(0)), expected)
}
}

test("SPARK-52617: cast time to TimestampNTZType") {
val testCases = Seq(
("2023-01-01T15:30:00.123456", 6),
("2023-01-01T15:30:00", 0))

testCases.foreach { case (s, precision) =>
val ldt = LocalDateTime.parse(s)
val micros = DateTimeUtils.localDateTimeToMicros(ldt)
val nanosOfDay = ldt.toLocalTime().toNanoOfDay
val expected = DateTimeUtils.truncateTimeToPrecision(nanosOfDay, precision)

checkEvaluation(Cast(Literal(micros, TimestampNTZType), TimeType(precision)), expected)
}
}

test("SPARK-52620: cast time to decimal with sufficient precision and scale") {
// Test various TIME values converted to DecimalType(14, 9), which always has sufficient
// precision and scale to represent the number of (nano)seconds since midnight. Note that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,29 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
localTime(23, 59, 59, 900000))
}

test("get nanos in a day") {
def toMicros(hours: Int, minutes: Int, seconds: Int): Long = {
val microsInSecond = 1_000_000L
(hours * 3600L + minutes * 60 + seconds) * microsInSecond
}

// Midnight: 00:00:00
val midnightMicros = toMicros(0, 0, 0)
assert(getNanosInADay(midnightMicros) === 0L)

// Noon: 12:00:00
val noonMicros = toMicros(12, 0, 0)
assert(getNanosInADay(noonMicros) === 12L * 3600 * 1_000_000_000L)

// 23:59:59
val endOfDayMicros = toMicros(23, 59, 59)
assert(getNanosInADay(endOfDayMicros) === ((23L * 3600 + 59 * 60 + 59) * 1_000_000_000L))

// 01:30:15
val earlyMorningMicros = toMicros(1, 30, 15)
assert(getNanosInADay(earlyMorningMicros) === ((1L * 3600 + 30 * 60 + 15) * 1_000_000_000L))
}

test("add day-time interval to time") {
assert(timeAddInterval(0, 0, 0, SECOND, 6) == localTime())
assert(timeAddInterval(0, 6, MICROS_PER_DAY - 1, SECOND, 6) ==
Expand Down