diff --git a/CHANGELOG b/CHANGELOG index 8cc8e59..ef9d771 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,9 @@ Changelog ========= +v1.5.0 +---------------------------- +- plumb threading model, thread pool size, and rate limit param to KPL (issue #37) + v1.4.0 ---------------------------- - plumb through optional recordTtl param to KPL (issue #31) diff --git a/README.md b/README.md index 3f59dce..32fd412 100644 --- a/README.md +++ b/README.md @@ -464,6 +464,9 @@ Note: Using permanent credentials are not recommended due to security concerns. | kinesis.sink.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.sink.aggregationEnabled | True | Specify if records should be aggregated before sending them to Kinesis | | kinesis.sink.recordTtl | 30000 (milliseconds) | Records not successfully written to Kinesis within this time are dropped | +| kinesis.sink.rateLimit | 150 (percent) | Limits the maximum allowed put rate for a shard per executor, as a percentage of the Kinesis shard limits. | +| kinesis.sink.threadingModel | PER_REQUSET | Specify whether the KPL will create new threads per request (PER_REQUEST) or use a pool (POOLED) when invoking Kinesis | +| kinesis.sink.threadPoolSize | 64 | If using a thread pool (above), maximum number of threads | ## Security diff --git a/pom.xml b/pom.xml index 19db220..6a5ecfc 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-streaming-sql-kinesis-connector_2.12 - 1.4.0 + 1.5.0 jar Spark Structured Streaming Kinesis Connector Connector to read from and write into Kinesis from Structured Streaming Applications diff --git a/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala b/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala index 3b54ca8..99130eb 100644 --- a/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala +++ b/src/main/scala/org/apache/spark/sql/connector/kinesis/CachedKinesisProducer.scala @@ -81,6 +81,21 @@ object CachedKinesisProducer extends Logging { KinesisOptions.DEFAULT_SINK_MAX_CONNECTIONS) .toInt + val rateLimit = producerConfiguration.getOrElse( + KinesisOptions.SINK_RATE_LIMIT.toLowerCase(Locale.ROOT), + KinesisOptions.DEFAULT_SINK_MAX_CONNECTIONS) + .toLong + + val threadingModel = producerConfiguration.getOrElse( + KinesisOptions.SINK_THREADING_MODEL.toLowerCase(Locale.ROOT), + KinesisOptions.DEFAULT_SINK_THREADING_MODEL) + .toString + + val threadPoolSize = producerConfiguration.getOrElse( + KinesisOptions.SINK_THREAD_POOL_SIZE.toLowerCase(Locale.ROOT), + KinesisOptions.DEFAULT_SINK_THREAD_POOL_SIZE) + .toInt + val aggregation = Try { producerConfiguration.getOrElse( KinesisOptions.SINK_AGGREGATION_ENABLED.toLowerCase(Locale.ROOT), KinesisOptions.DEFAULT_SINK_AGGREGATION) @@ -95,6 +110,9 @@ object CachedKinesisProducer extends Logging { ) .setRegion(region) .setRecordTtl(recordTTL) + .setThreadingModel(threadingModel) + .setThreadPoolSize(threadPoolSize) + .setRateLimit(rateLimit) // check for proxy settings if (producerConfiguration.contains(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))) { diff --git a/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala b/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala index 3cd2f6d..caf33ea 100644 --- a/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala +++ b/src/main/scala/org/apache/spark/sql/connector/kinesis/KinesisOptions.scala @@ -173,12 +173,18 @@ object KinesisOptions { val SINK_FLUSH_WAIT_TIME_MILLIS: String = SINK_PREFIX + "flushWaitTimeMs" val SINK_RECORD_MAX_BUFFERED_TIME: String = SINK_PREFIX + "recordMaxBufferedTimeMs" val SINK_MAX_CONNECTIONS: String = SINK_PREFIX + "maxConnections" + val SINK_RATE_LIMIT: String = SINK_PREFIX + "rateLimit" + val SINK_THREAD_POOL_SIZE: String = SINK_PREFIX + "threadPoolSize" + val SINK_THREADING_MODEL: String = SINK_PREFIX + "threadPool" val SINK_AGGREGATION_ENABLED: String = SINK_PREFIX + "aggregationEnabled" val DEFAULT_SINK_FLUSH_WAIT_TIME_MILLIS: String = "100" val DEFAULT_SINK_RECORD_TTL: String = "30000" val DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME: String = "1000" val DEFAULT_SINK_MAX_CONNECTIONS: String = "1" + val DEFAULT_SINK_RATE_LIMIT: String = "150" + val DEFAULT_SINK_THREADING_MODEL: String = "PER_REQUEST" + val DEFAULT_SINK_THREAD_POOL_SIZE: String = "64" val DEFAULT_SINK_AGGREGATION: String = "true" // proxy options