Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.kinesis
import java.util.Locale
import java.util.concurrent.{ExecutionException, TimeUnit}

import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration}
Expand Down Expand Up @@ -96,6 +96,38 @@ object CachedKinesisProducer extends Logging {
.setRegion(region)
.setRecordTtl(recordTTL)

// set endpoint url if provided
if (producerConfiguration.contains(KinesisOptions.ENDPOINT_URL.toLowerCase(Locale.ROOT))) {
val endpointUrlOpt = producerConfiguration.get(KinesisOptions.ENDPOINT_URL.toLowerCase(Locale.ROOT))

endpointUrlOpt.foreach { endpointUrl =>
Try {
val uri = java.net.URI.create(endpointUrl)
val host = uri.getHost
val port = uri.getPort
val isLocalhost = host == "localhost" || host == "127.0.0.1" || host == "::1"

kinesisProducerConfiguration
.setKinesisEndpoint(host)
.setCloudwatchEndpoint(host)
.setStsEndpoint(host)
.setVerifyCertificate(!isLocalhost)

if (port != -1) {
kinesisProducerConfiguration
.setKinesisPort(port)
.setCloudwatchPort(port)
.setStsPort(port)
}
} match {
case Success(_) =>
logDebug(s"Successfully configured endpoint URL: $endpointUrl")
case Failure(e) =>
logWarning(s"Invalid endpoint URL: $endpointUrl - ${e.getMessage}", e)
}
}
}

// check for proxy settings
if (producerConfiguration.contains(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))) {
val proxyAddress = producerConfiguration.get(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))
Expand Down