Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8d49e2f
Added initial implementation for writing lineage messages using HDFS …
rkrumins Oct 12, 2025
a35a99c
Added creation of the base customLineagePath directory ensuring linea…
rkrumins Oct 12, 2025
7a824bf
Updated documentation for Hdfs Dispatcher in spline.default.yaml file
rkrumins Oct 12, 2025
96373b4
Added initial integration tests for HDFS Lineage Dispatcher with cent…
rkrumins Oct 12, 2025
2b01b81
Aliging tests to be running same evaluation as when using default mode
rkrumins Oct 12, 2025
6f0ecdf
Minor cleanup in HDFSLineageDispatcher
rkrumins Oct 12, 2025
b064041
Fixed issue with unmatched types for resolveLineagePath in HDFSLineag…
rkrumins Oct 12, 2025
578c876
Fixing issues as per SonarQube for HDFSLineageDispatcher
rkrumins Oct 12, 2025
bebf4c9
Fix for the issue in fsScheme!
rkrumins Oct 12, 2025
04eebe3
Constant for file extension in HDFSLineageDispatcherSpec
rkrumins Oct 12, 2025
5f63f61
Removed outputSource filename when writing file to new location and u…
rkrumins Oct 12, 2025
8bb78a1
Fixing issues as per static code analysis
rkrumins Oct 13, 2025
7a93705
Fixing issues in logic for HDFSLineageDispatcher
rkrumins Oct 13, 2025
32db7ac
Fixing issue with exception handling for mkdirs in HDFSLineageDispatcher
rkrumins Oct 13, 2025
30f2f40
Updated integration test for custom lineage path in HDFSLineageDispat…
rkrumins Oct 13, 2025
87936b8
Ensuring the edgecase with Spark AppName containing non-standard char…
rkrumins Oct 13, 2025
4e699dd
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
8a4b431
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
6bc068d
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
d155a98
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
4c7d723
Fixed integration test and changed the filename to avoid clashed from…
rkrumins Oct 13, 2025
cd535c9
Added more robust check to ensure no _LINEAGE file is created
rkrumins Oct 13, 2025
0e5c69a
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
bebc869
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
7bd5b0a
Updated spline.default.yaml as per up-to-date details for HDFSLineage…
rkrumins Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,28 @@ spline:
fileBufferSize: 4096
# file/directory permissions, provided as umask string, either in octal or symbolic format
filePermissions: 777

customLineagePath:
# OPTIONAL: Custom path for centralized lineage storage.
# If left empty, null, or not specified → DEFAULT MODE: lineage written alongside target data files
# If set to a path → CENTRALIZED MODE: all lineage written to this location with unique filenames
#
# CENTRALIZED MODE filename format: {timestamp}_{planId}_{appId}
# - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for natural chronological sorting and easy filtering
# Example: 2025-10-12_14-30-45-123
# - planId: Execution plan UUID for guaranteed uniqueness (prevents collisions from concurrent writes)
# Example: 550e8400-e29b-41d4-a716-446655440000
# - appId: Spark application ID for traceability to specific runs (correlates with Spark UI, logs, monitoring)
# Example: app-20251012143045-0001
#
# Examples:
# - Local: customLineagePath: /my/centralized/lineage
# Output: /my/centralized/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
# - S3: customLineagePath: s3://my-bucket/lineage
# Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
# - GCS: customLineagePath: gs://my-bucket/lineage
# Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
# - HDFS: customLineagePath: hdfs://cluster/lineage
# Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
# -------------------------------------------
# Open Lineage HTTP dispatcher
# -------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}

import java.net.URI
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId}
import scala.concurrent.blocking
import scala.util.{Try, Success, Failure}

/**
* A port of https://github.com/AbsaOSS/spline/tree/release/0.3.9/persistence/hdfs/src/main/scala/za/co/absa/spline/persistence/hdfs
Expand All @@ -40,16 +43,34 @@ import scala.concurrent.blocking
* for every generic use case in a real production application.
*
* It is NOT thread-safe, strictly synchronous assuming a predefined order of method calls: `send(plan)` and then `send(event)`
*
* TWO MODES OF OPERATION:
*
* 1. DEFAULT MODE (customLineagePath = None, null, or empty string):
* Lineage files are written alongside the target data files.
* Example: Writing to /data/output/file.parquet creates /data/output/_LINEAGE
*
* 2. CENTRALIZED MODE (customLineagePath set to a valid path):
* All lineage files are written to a single centralized location with unique filenames.
* Filename format: {timestamp}_{planId}_{appId}
* - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for chronological sorting and filtering
* - planId: Execution plan UUID for guaranteed uniqueness (prevents collisions from concurrent writes)
* - appId: Spark application ID for traceability to Spark UI, logs, and monitoring systems
*
* The timestamp-first format ensures natural chronological sorting and easy date-based filtering.
* Parent directories are automatically created with proper permissions for multi-user access (HDFS/local).
* For object storage (S3, GCS, Azure), directory creation is skipped since they use key prefixes.
*/
@Experimental
class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int)
class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int, customLineagePath: Option[String])
extends LineageDispatcher
with Logging {

def this(conf: Configuration) = this(
filename = conf.getRequiredString(FileNameKey),
permission = new FsPermission(conf.getRequiredObject(FilePermissionsKey).toString),
bufferSize = conf.getRequiredInt(BufferSizeKey)
bufferSize = conf.getRequiredInt(BufferSizeKey),
customLineagePath = conf.getOptionalString(CustomLineagePathKey).map(_.trim).filter(_.nonEmpty)
)

@volatile
Expand All @@ -67,7 +88,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID")

try {
val path = s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename"
val path = resolveLineagePath()
val planWithEvent = Map(
"executionPlan" -> this._lastSeenPlan,
"executionEvent" -> event
Expand All @@ -80,10 +101,124 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
}
}

/**
* Resolves the lineage file path based on configuration.
* If customLineagePath is specified, lineage files are written to that centralized location.
* Otherwise, lineage files are written alongside the target data file (current behavior).
*
* @return The full path where the lineage file should be written
*/
private def resolveLineagePath(): String = {
val outputSource = s"${this._lastSeenPlan.operations.write.outputSource}"
customLineagePath match {
case Some(customPath) =>
// Centralized mode: write to custom path with unique filename
val cleanCustomPath = customPath.stripSuffix("/")
val uniqueFilename = generateUniqueFilename()
s"$cleanCustomPath/$uniqueFilename"
case None =>
// Default mode: write alongside target data file
s"${outputSource.stripSuffix("/")}/$filename"
}
}

/**
* Generates a unique filename for centralized lineage storage.
*
* Format: {timestamp}_{planId}_{appId}
* Example: 2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
*
* This format optimizes for operational debugging use cases:
* - Timestamp FIRST: Ensures natural chronological sorting (most recent files appear together)
* - Plan ID: Guarantees uniqueness even for concurrent writes from the same application
* - Application ID: Enables immediate filtering by Spark application (correlates with Spark UI, logs, monitoring)
*
* The planId ensures zero collision risk even when:
* - Multiple writes happen in the same millisecond
* - The same application writes multiple datasets concurrently
* - Different applications run simultaneously
*
* The appId enables operators to quickly filter all lineage files from a specific Spark application run,
* making it trivial to correlate with Spark UI, logs, and monitoring systems where the human-readable
* application name is already available.
*
* @return A unique filename optimized for filtering, sorting, and guaranteed uniqueness
*/
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val planId = this._lastSeenPlan.id.getOrElse(
throw new IllegalStateException("Execution plan ID is missing")
).toString
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${planId}_${appId}"
}

/**
* Ensures that parent directories exist with proper permissions for centralized lineage storage.
* This is only called when customLineagePath is specified and is critical for multi-user access.
*
* Note: Object storage systems (S3, GCS, Azure Blob) don't have true directories - they use key prefixes.
* Directory creation is automatically skipped for these systems to avoid unnecessary operations.
*
* @param fs The Hadoop FileSystem
* @param path The target file path whose parent directories should be created
*/
private def ensureParentDirectoriesExist(fs: FileSystem, path: Path): Unit = {
// Object storage systems (S3, GCS, Azure Blob) don't have real directories - they're just key prefixes
// Skip directory creation for these systems to avoid unnecessary operations
val fsScheme = fs.getUri.getScheme
val scheme = Option(fsScheme).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
val isObjectStorage = scheme != null && (
scheme.startsWith("s3") || // S3: s3, s3a, s3n
scheme.startsWith("gs") || // Google Cloud Storage: gs
scheme.startsWith("wasb") || // Azure Blob Storage: wasb, wasbs
scheme.startsWith("abfs") || // Azure Data Lake Storage Gen2: abfs, abfss
scheme.startsWith("adl") // Azure Data Lake Storage Gen1: adl
)

if (isObjectStorage) {
logDebug(s"Skipping directory creation for object storage filesystem ($scheme) - directories are implicit key prefixes")
} else {
logDebug(s"Ensuring parent directories exist for centralized lineage storage: $path")
val parentDir = path.getParent
if (parentDir != null && !fs.exists(parentDir)) {
Try {
// Create directories with multi-user friendly permissions to allow all service accounts to write
// This uses the same permission object that's already configured for files
val created = fs.mkdirs(parentDir, permission)
if (created) {
logInfo(s"Created parent directories: $parentDir with permissions $permission")
}
} match {
case Success(_) =>
// Directory creation succeeded
case Failure(e: org.apache.hadoop.fs.FileAlreadyExistsException) =>
// Race condition: another process created the directory - this is fine
logDebug(s"Directory $parentDir already exists (created by another process)")
case Failure(e: RuntimeException) =>
logWarning(s"Failed to create parent directories: $parentDir", e)
throw e
case Failure(e) =>
// Handle any other exceptions (IOException, etc.)
logWarning(s"Failed to create parent directories: $parentDir", e)
throw new RuntimeException(s"Failed to create parent directories: $parentDir", e)
}
}
}
}

private def persistToHadoopFs(content: String, fullLineagePath: String): Unit = blocking {
val (fs, path) = pathStringToFsWithPath(fullLineagePath)
logDebug(s"Opening HadoopFs output stream to $path")

// Only ensure parent directories exist when using centralized mode (customLineagePath is specified)
// In default mode, the parent directories should already exist as they're alongside the data files
if (customLineagePath.isDefined) {
ensureParentDirectoriesExist(fs, path)
}

val replication = fs.getDefaultReplication(path)
val blockSize = fs.getDefaultBlockSize(path)
val outputStream = fs.create(path, permission, true, bufferSize, replication, blockSize, null)
Expand All @@ -104,6 +239,7 @@ object HDFSLineageDispatcher {
private val FileNameKey = "fileName"
private val FilePermissionsKey = "filePermissions"
private val BufferSizeKey = "fileBufferSize"
private val CustomLineagePathKey = "customLineagePath"

/**
* Converts string full path to Hadoop FS and Path, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.spline.commons.version.Version._
import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

import org.apache.spark.sql.SparkSession
import java.io.File

class HDFSLineageDispatcherSpec
Expand All @@ -37,15 +37,21 @@ class HDFSLineageDispatcherSpec

behavior of "HDFSLineageDispatcher"

it should "save lineage file to a filesystem" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val lineageDispatcherConfigKeyName = "spark.spline.lineageDispatcher"
val lineageDispatcherConfigValueName = "hdfs"
val lineageDispatcherConfigClassNameKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.className"
val lineageDispatcherConfigCustomLineagePathKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.customLineagePath"
val destFilePathExtension = ".parquet"

it should "save lineage file to a filesystem in DEFAULT mode" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
withIsolatedSparkSession(_
.config("spark.spline.lineageDispatcher", "hdfs")
.config("spark.spline.lineageDispatcher.hdfs.className", classOf[HDFSLineageDispatcher].getName)
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._
val dummyDF = Seq((1, 2)).toDF
val destPath = TempDirectory("spline_", ".parquet", pathOnly = true).deleteOnExit()
val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
Expand All @@ -63,4 +69,106 @@ class HDFSLineageDispatcherSpec
}
}

Seq(
("without customLineagePath config", None),
("with empty string customLineagePath", Some("")),
("with whitespace-only customLineagePath", Some(" "))
).foreach { case (scenarioDesc, customPathValue) =>
it should s"use DEFAULT mode $scenarioDesc" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val builder = (b: SparkSession.Builder) => {
val configured = b
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
customPathValue.fold(configured)(path => configured.config(lineageDispatcherConfigCustomLineagePathKeyName, path))
}

withIsolatedSparkSession(builder) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._
val dummyDF = Seq((1, 2)).toDF
val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
} yield {
val lineageFile = new File(destPath.asString, "_LINEAGE")
lineageFile.exists should be(true)
lineageFile.length should be > 0L

val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
lineageJson should contain key "executionPlan"
lineageJson should contain key "executionEvent"
lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
}
}
}
}
}

it should "save lineage files in a custom lineage path" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val centralizedPath = TempDirectory("spline_centralized").deleteOnExit()
centralizedPath.delete()

withIsolatedSparkSession(_
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
.config(lineageDispatcherConfigCustomLineagePathKeyName, centralizedPath.asString)
) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._

// Test with multiple data writes to verify unique filenames
val dummyDF1 = Seq((1, 2)).toDF
val dummyDF2 = Seq((3, 4)).toDF
val destPath1 = TempDirectory("spline_1_", destFilePathExtension, pathOnly = true).deleteOnExit()
val destPath2 = TempDirectory("spline_2_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF1.write.save(destPath1.asString))
(_, _) <- captor.lineageOf(dummyDF2.write.save(destPath2.asString))
} yield {
val centralizedDir = new File(centralizedPath.asString)
centralizedDir.exists should be(true)
centralizedDir.isDirectory should be(true)

val appId = spark.sparkContext.applicationId

val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File])
val lineageFilesOnly = lineageFiles.filter(f => f.isFile && !f.getName.endsWith(".crc"))
lineageFilesOnly.length should be(2)

// Verify naming convention aligns with centralized lineage pattern (timestamp_planId_appId)
// Format: {timestamp}_{planId}_{appId}
// Example: 2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001
val filenamePattern = """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}_.+_.+"""
lineageFilesOnly.foreach { file =>
val name = file.getName
withClue(s"Lineage filename '$name' should follow the timestamp_planId_appId pattern") {
name.matches(filenamePattern) shouldBe true
}
// Verify the appId appears in the filename (ends with appId)
withClue(s"Lineage filename '$name' should end with application ID '$appId'") {
name.endsWith(appId) shouldBe true
}

}

// Verify each file has the correct format and content
lineageFilesOnly.foreach { lineageFile =>
val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
lineageJson should contain key "executionPlan"
lineageJson should contain key "executionEvent"
}

// Verify no lineage files in destination directories
val dest1Files = Option(new File(destPath1.asString).listFiles()).getOrElse(Array.empty[File])
val dest2Files = Option(new File(destPath2.asString).listFiles()).getOrElse(Array.empty[File])

dest1Files.exists(_.getName.contains("_LINEAGE")) should be(false)
dest2Files.exists(_.getName.contains("_LINEAGE")) should be(false)
}
}
}
}

}