Skip to content

Commit e3cb356

Browse files
committed
issue #757 Initialization failure handling control
1 parent e95f676 commit e3cb356

File tree

7 files changed

+51
-6
lines changed

7 files changed

+51
-6
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2023 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.harvester.conf;
18+
19+
public enum InitFailureHandlingMode {
20+
21+
// Log errors and continue without lineage tracking
22+
LOG,
23+
24+
// Propagate errors to the Spark process
25+
BREAK,
26+
}

core/src/main/resources/spline.default.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ spline:
2626
# (DON'T MODIFY UNLESS YOU UNDERSTAND THE IMPLICATIONS)
2727
internal.execPlan.uuid.version: 5
2828

29+
# How the agent should respond to initialization errors:
30+
# - LOG (log the error and disable the agent. Spark job continues unaffected without lineage tracking.)
31+
# - BREAK (propagate the error to the Spark process.)
32+
onInitFailure: LOG
33+
2934
# Should the agent capture failed executions:
3035
# - NONE (only capture successful executions)
3136
# - NON_FATAL (capture successful executions, and failed executions, but only when the error is non-fatal)

core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSession
2121
import za.co.absa.spline.HierarchicalObjectFactory
2222
import za.co.absa.spline.agent.AgentConfig.ConfProperty
2323
import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
24-
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
24+
import za.co.absa.spline.harvester.conf.{InitFailureHandlingMode, SQLFailureCaptureMode, SplineMode}
2525
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
2626
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
2727
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}
@@ -32,6 +32,7 @@ import scala.reflect.ClassTag
3232

3333
private[spline] trait AgentBOM {
3434
def splineMode: SplineMode
35+
def initFailureHandlingMode: InitFailureHandlingMode
3536
def sqlFailureCaptureMode: SQLFailureCaptureMode
3637
def postProcessingFilter: Option[PostProcessingFilter]
3738
def lineageDispatcher: LineageDispatcher
@@ -58,6 +59,10 @@ object AgentBOM {
5859
mergedConfig.getRequiredEnum[SQLFailureCaptureMode](ConfProperty.SQLFailureCaptureMode)
5960
}
6061

62+
override def initFailureHandlingMode: InitFailureHandlingMode = {
63+
mergedConfig.getRequiredEnum[InitFailureHandlingMode](ConfProperty.InitFailureHandlingMode)
64+
}
65+
6166
override def execPlanUUIDVersion: UUIDVersion = {
6267
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
6368
}

core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object AgentConfig {
3434

3535
def from(configuration: Configuration): AgentConfig = from(
3636
configuration
37-
.getKeys.asScala.toSeq.asInstanceOf[Seq[String]]
37+
.getKeys.asScala.toSeq
3838
.map(k => k -> configuration.getProperty(k)))
3939

4040
def from(options: Iterable[(String, Any)]): AgentConfig =
@@ -95,10 +95,17 @@ object AgentConfig {
9595
*/
9696
val Mode = "spline.mode"
9797

98+
/**
99+
* How Spline should handle initialization errors.
100+
*
101+
* @see [[za.co.absa.spline.harvester.conf.InitFailureHandlingMode]]
102+
*/
103+
val InitFailureHandlingMode = "spline.onInitFailure"
104+
98105
/**
99106
* How Spline should handle failed SQL executions.
100107
*
101-
* @see [[SQLFailureCaptureMode]]
108+
* @see [[za.co.absa.spline.harvester.conf.SQLFailureCaptureMode]]
102109
*/
103110
val SQLFailureCaptureMode = "spline.sql.failure.capture"
104111

core/src/main/scala/za/co/absa/spline/harvester/SparkLineageInitializer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
134134
logInfo("initialization aborted")
135135
None
136136
}
137-
else withErrorHandling {
137+
else withErrorHandling(bom.initFailureHandlingMode) {
138138
if (isCodelessInit)
139139
Some(createListener(bom))
140140
else
@@ -171,11 +171,11 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
171171
}
172172
}
173173

174-
private def withErrorHandling(body: => Option[QueryExecutionListener]) = {
174+
private def withErrorHandling(initFailureMode: InitFailureHandlingMode)(body: => Option[QueryExecutionListener]) = {
175175
try {
176176
body
177177
} catch {
178-
case NonFatal(e) =>
178+
case NonFatal(e) if initFailureMode == InitFailureHandlingMode.LOG =>
179179
logError(s"Spline initialization failed! Spark Lineage tracking is DISABLED.", e)
180180
None
181181
}

examples/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ RUN chmod a+x /entrypoint.sh
4444
# Bind environment variables
4545
ENV SPLINE_PRODUCER_URL=
4646
ENV SPLINE_MODE=ENABLED
47+
ENV ON_INIT_FAILURE=BREAK
4748
ENV DISABLE_SSL_VALIDATION=false
4849

4950
ENV HTTP_PROXY_HOST=

examples/entrypoint.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ exec ./run.sh \
2121
-Dspline.lineageDispatcher=http \
2222
-Dspline.lineageDispatcher.http.producer.url="$SPLINE_PRODUCER_URL" \
2323
-Dspline.lineageDispatcher.http.disableSslValidation="$DISABLE_SSL_VALIDATION" \
24+
-Dspline.onInitFailure="$ON_INIT_FAILURE" \
2425
-Dspline.mode="$SPLINE_MODE" \
2526
-Dhttp.proxyHost="$HTTP_PROXY_HOST" \
2627
-Dhttp.proxyPort="$HTTP_PROXY_PORT" \

0 commit comments

Comments
 (0)