Skip to content

Commit f0030ed

Browse files
committed
[SPARK-53915] Add RealTimeScanExec and ability to execute long running batches
1 parent c5b58b5 commit f0030ed

File tree

14 files changed

+1030
-33
lines changed

14 files changed

+1030
-33
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3850,6 +3850,12 @@
38503850
],
38513851
"sqlState" : "42601"
38523852
},
3853+
"INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL" : {
3854+
"message" : [
3855+
"The real-time trigger interval is set to <interval> ms. This is less than the <minBatchDuration> ms minimum specified by spark.sql.streaming.realTimeMode.minBatchDuration."
3856+
],
3857+
"sqlState" : "22023"
3858+
},
38533859
"INVALID_SUBQUERY_EXPRESSION" : {
38543860
"message" : [
38553861
"Invalid subquery:"
@@ -5552,6 +5558,29 @@
55525558
],
55535559
"sqlState" : "XXKST"
55545560
},
5561+
"STREAMING_REAL_TIME_MODE" : {
5562+
"message" : [
5563+
"Streaming real-time mode has the following limitation:"
5564+
],
5565+
"subClass" : {
5566+
"ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED" : {
5567+
"message" : [
5568+
"Async progress tracking is not supported in real-time mode. Set option asyncProgressTrackingEnabled to false and retry your query."
5569+
]
5570+
},
5571+
"IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED" : {
5572+
"message" : [
5573+
"Real-time mode does not support union on two or more identical streaming data sources in a single query. This includes scenarios such as referencing the same source DataFrame more than once, or using two data sources with identical configurations for some sources. For Kafka, avoid reusing the same DataFrame and create different ones. Sources provided in the query: <sources>"
5574+
]
5575+
},
5576+
"INPUT_STREAM_NOT_SUPPORTED" : {
5577+
"message" : [
5578+
"The input stream <className> is not supported in Real-time Mode."
5579+
]
5580+
}
5581+
},
5582+
"sqlState" : "0A000"
5583+
},
55555584
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
55565585
"message" : [
55575586
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",

core/src/main/scala/org/apache/spark/util/Clock.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] trait Clock {
5858
/**
5959
* A clock backed by the actual time from the OS as reported by the `System` API.
6060
*/
61-
private[spark] class SystemClock extends Clock {
61+
private[spark] class SystemClock extends Clock with Serializable {
6262

6363
val minPollTime = 25L
6464

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ case class StreamingDataSourceV2Relation(
176176
catalog: Option[CatalogPlugin],
177177
identifier: Option[Identifier],
178178
options: CaseInsensitiveStringMap,
179-
metadataPath: String)
179+
metadataPath: String,
180+
realTimeModeDuration: Option[Long] = None)
180181
extends DataSourceV2RelationBase(table, output, catalog, identifier, options) {
181182

182183
override def isStreaming: Boolean = true

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3017,6 +3017,13 @@ object SQLConf {
30173017
.booleanConf
30183018
.createWithDefault(false)
30193019

3020+
val STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION = buildConf(
3021+
"spark.sql.streaming.realTimeMode.minBatchDuration")
3022+
.doc("The minimum long-running batch duration in milliseconds for real-time mode.")
3023+
.version("4.1.0")
3024+
.timeConf(TimeUnit.MILLISECONDS)
3025+
.createWithDefault(5000)
3026+
30203027
val VARIABLE_SUBSTITUTE_ENABLED =
30213028
buildConf("spark.sql.variable.substitute")
30223029
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.internal.connector;
18+
19+
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
20+
21+
/**
22+
* Internal class for real time mode to pass partition offset from executors to the driver.
23+
*/
24+
private[sql] case class PartitionOffsetWithIndex(index: Long, partitionOffset: PartitionOffset);

sql/core/src/main/scala/org/apache/spark/sql/classic/StreamingQueryManager.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import javax.annotation.concurrent.GuardedBy
2424
import scala.collection.mutable
2525
import scala.jdk.CollectionConverters._
2626

27+
import org.apache.spark.SparkIllegalArgumentException
2728
import org.apache.spark.annotation.Evolving
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.internal.LogKeys.{CLASS_NAME, QUERY_ID, RUN_ID}
@@ -189,6 +190,21 @@ class StreamingQueryManager private[sql] (
189190
val analyzedPlan = df.queryExecution.analyzed
190191
df.queryExecution.assertAnalyzed()
191192

193+
if (trigger.isInstanceOf[RealTimeTrigger]) {
194+
val minBatchDuration =
195+
sparkSession.conf.get(SQLConf.STREAMING_REAL_TIME_MODE_MIN_BATCH_DURATION)
196+
val realTimeTrigger = trigger.asInstanceOf[RealTimeTrigger]
197+
if (realTimeTrigger.batchDurationMs < minBatchDuration) {
198+
throw new SparkIllegalArgumentException(
199+
errorClass = "INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL",
200+
messageParameters = Map(
201+
"interval" -> realTimeTrigger.batchDurationMs.toString,
202+
"minBatchDuration" -> minBatchDuration.toString
203+
)
204+
)
205+
}
206+
}
207+
192208
val dataStreamWritePlan = WriteToStreamStatement(
193209
userSpecifiedName,
194210
userSpecifiedCheckpointLocation,
@@ -216,6 +232,11 @@ class StreamingQueryManager private[sql] (
216232
analyzedStreamWritePlan))
217233
case _ =>
218234
val microBatchExecution = if (useAsyncProgressTracking(extraOptions)) {
235+
if (trigger.isInstanceOf[RealTimeTrigger]) {
236+
throw new SparkIllegalArgumentException(
237+
errorClass = "STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED"
238+
)
239+
}
219240
new AsyncProgressTrackingMicroBatchExecution(
220241
sparkSession,
221242
trigger,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.hadoop.fs.Path
2323

24-
import org.apache.spark.SparkException
24+
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.internal.LogKeys.EXPR
2727
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
@@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.catalog.index.SupportsIndex
3939
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
4040
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
4141
import org.apache.spark.sql.connector.read.LocalScan
42-
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
42+
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream, SupportsRealTimeMode}
4343
import org.apache.spark.sql.connector.write.V1Write
4444
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
4545
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan, SparkStrategy => Strategy}
@@ -170,10 +170,28 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
170170
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
171171
if r.startOffset.isDefined && r.endOffset.isEmpty =>
172172

173-
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
174-
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
175-
// initialize partitions
176-
scanExec.inputPartitions
173+
val scanExec = if (r.relation.realTimeModeDuration.isDefined) {
174+
if (!r.stream.isInstanceOf[SupportsRealTimeMode]) {
175+
throw new SparkIllegalArgumentException(
176+
errorClass = "STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED",
177+
messageParameters = Map("className" -> r.stream.getClass.getName)
178+
)
179+
}
180+
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
181+
new RealTimeStreamScanExec(
182+
r.output,
183+
r.scan,
184+
microBatchStream,
185+
r.startOffset.get,
186+
r.relation.realTimeModeDuration.get
187+
)
188+
} else {
189+
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
190+
val s = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
191+
// initialize partitions
192+
s.inputPartitions
193+
s
194+
}
177195

178196
// Add a Project here to make sure we produce unsafe rows.
179197
DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RealTimeStreamScanExec.scala

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,19 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20-
import org.apache.spark.util.{Clock, SystemClock}
20+
import java.util.Objects
21+
22+
import org.apache.spark.{SparkContext, TaskContext}
23+
import org.apache.spark.internal.{Logging, LogKeys}
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder}
27+
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan}
28+
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, SupportsRealTimeMode, SupportsRealTimeRead}
29+
import org.apache.spark.sql.connector.read.streaming.SupportsRealTimeRead.RecordStatus
30+
import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
31+
import org.apache.spark.util.{Clock, CollectionAccumulator, SystemClock}
32+
import org.apache.spark.util.ArrayImplicits._
2133

2234
/* The singleton object to control the time in testing */
2335
object LowLatencyClock {
@@ -37,3 +49,122 @@ object LowLatencyClock {
3749
clock = inputClock
3850
}
3951
}
52+
53+
/**
54+
* A wrap reader that turns a Partition Reader extending SupportsRealTimeRead to a
55+
* normal PartitionReader and follow the task termination time `lowLatencyEndTime`, and
56+
* report end offsets in the end to `endOffsets`.
57+
*/
58+
case class LowLatencyReaderWrap(
59+
reader: SupportsRealTimeRead[InternalRow],
60+
lowLatencyEndTime: Long,
61+
endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
62+
extends PartitionReader[InternalRow] {
63+
64+
override def next(): Boolean = {
65+
val curTime = LowLatencyClock.getTimeMillis()
66+
val ret = if (curTime >= lowLatencyEndTime) {
67+
RecordStatus.newStatusWithoutArrivalTime(false)
68+
} else {
69+
reader.nextWithTimeout(lowLatencyEndTime - curTime)
70+
}
71+
72+
if (!ret.hasRecord) {
73+
// The way of using TaskContext.get().partitionId() to map to a partition
74+
// may be fragile as it relies on thread locals.
75+
endOffsets.add(
76+
new PartitionOffsetWithIndex(TaskContext.get().partitionId(), reader.getOffset)
77+
)
78+
}
79+
ret.hasRecord
80+
}
81+
82+
override def get(): InternalRow = {
83+
reader.get()
84+
}
85+
86+
override def close(): Unit = {}
87+
}
88+
89+
/**
90+
* Wrapper factory that creates LowLatencyReaderWrap from reader as SupportsRealTimeRead
91+
*/
92+
case class LowLatencyReaderFactoryWrap(
93+
partitionReaderFactory: PartitionReaderFactory,
94+
lowLatencyEndTime: Long,
95+
endOffsets: CollectionAccumulator[PartitionOffsetWithIndex])
96+
extends PartitionReaderFactory
97+
with Logging {
98+
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
99+
val rowReader = partitionReaderFactory.createReader(partition)
100+
assert(rowReader.isInstanceOf[SupportsRealTimeRead[InternalRow]])
101+
logInfo(
102+
log"Creating low latency PartitionReader, stopping at " +
103+
log"${MDC(LogKeys.TO_TIME, lowLatencyEndTime)}"
104+
)
105+
LowLatencyReaderWrap(
106+
rowReader.asInstanceOf[SupportsRealTimeRead[InternalRow]],
107+
lowLatencyEndTime,
108+
endOffsets
109+
)
110+
}
111+
}
112+
113+
/**
114+
* Physical plan node for Real-time Mode to scan/read data from a data source.
115+
*/
116+
case class RealTimeStreamScanExec(
117+
output: Seq[Attribute],
118+
@transient scan: Scan,
119+
@transient stream: MicroBatchStream,
120+
@transient start: Offset,
121+
batchDurationMs: Long)
122+
extends DataSourceV2ScanExecBase {
123+
124+
assert(stream.isInstanceOf[SupportsRealTimeMode])
125+
126+
override def keyGroupedPartitioning: Option[Seq[Expression]] = None
127+
128+
override def ordering: Option[Seq[SortOrder]] = None
129+
130+
val endOffsetsAccumulator: CollectionAccumulator[PartitionOffsetWithIndex] = {
131+
SparkContext.getActive.map(_.collectionAccumulator[PartitionOffsetWithIndex]).get
132+
}
133+
134+
override def equals(other: Any): Boolean = other match {
135+
case other: RealTimeStreamScanExec =>
136+
this.stream == other.stream &&
137+
this.batchDurationMs == other.batchDurationMs
138+
case _ => false
139+
}
140+
141+
override def hashCode(): Int = Objects.hashCode(stream, batchDurationMs)
142+
143+
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
144+
145+
override lazy val inputPartitions: Seq[InputPartition] = {
146+
val lls = stream.asInstanceOf[SupportsRealTimeMode]
147+
assert(lls != null)
148+
lls.planInputPartitions(start).toImmutableArraySeq
149+
}
150+
151+
override def simpleString(maxFields: Int): String =
152+
s"${super.simpleString(maxFields)} [batchDurationMs=${batchDurationMs}ms]"
153+
154+
override lazy val inputRDD: RDD[InternalRow] = {
155+
156+
val inputRDD = new DataSourceRDD(
157+
sparkContext,
158+
partitions,
159+
LowLatencyReaderFactoryWrap(
160+
readerFactory,
161+
LowLatencyClock.getTimeMillis() + batchDurationMs,
162+
endOffsetsAccumulator
163+
),
164+
supportsColumnar,
165+
customMetrics
166+
)
167+
postDriverMetrics()
168+
inputRDD
169+
}
170+
}

0 commit comments

Comments
 (0)