Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.api.Config;
import datadog.trace.api.DDSpanId;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.datastreams.PathwayContext;
Expand Down Expand Up @@ -43,6 +44,11 @@ public class OpenlineageParentContext implements AgentSpanContext {
public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId";

public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
if (!Config.get().isDataJobsOpenLineageEnabled()) {
log.debug(
"OpenLineage - Data Jobs integration disabled. Not returning OpenlineageParentContext");
return Optional.empty();
}
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
|| !sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAME)
|| !sparkConf.contains(OPENLINEAGE_PARENT_RUN_ID)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package datadog.trace.instrumentation.spark

import datadog.trace.test.util.DDSpecification
import org.apache.spark.SparkConf
import spock.lang.Specification

class OpenlineageParentContextTest extends Specification {
class OpenlineageParentContextTest extends DDSpecification {
def "should create OpenLineageParentContext with particular trace id based on root parent id" () {
given:
injectSysConfig("data.jobs.openlineage.enabled", "true")
SparkConf mockSparkConf = Mock(SparkConf)

when:
Expand Down Expand Up @@ -38,6 +39,7 @@ class OpenlineageParentContextTest extends Specification {

def "should create empty OpenLineageParentContext if any required field is missing" () {
given:
injectSysConfig("data.jobs.openlineage.enabled", "true")
SparkConf mockSparkConf = Mock(SparkConf)

when:
Expand Down Expand Up @@ -66,6 +68,7 @@ class OpenlineageParentContextTest extends Specification {

def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () {
given:
injectSysConfig("data.jobs.openlineage.enabled", "true")
SparkConf mockSparkConf = Mock(SparkConf)

when:
Expand Down Expand Up @@ -94,6 +97,7 @@ class OpenlineageParentContextTest extends Specification {

def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () {
given:
injectSysConfig("data.jobs.openlineage.enabled", "true")
SparkConf mockSparkConf = Mock(SparkConf)

when:
Expand All @@ -119,5 +123,21 @@ class OpenlineageParentContextTest extends Specification {
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
}

def "should create empty OpenLineageParentContext if data jobs openlineage not enabled" () {
given:
injectSysConfig("data.jobs.openlineage.enabled", "falsebrn")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❔ question: ‏Is this a typo?

SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
!parentContext.isPresent()
}
}

Loading