Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Changelog
=========
v1.5.0
----------------------------
- plumb threading model, thread pool size, and rate limit param to KPL (issue #37)

v1.4.0
----------------------------
- plumb through optional recordTtl param to KPL (issue #31)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ Note: Using permanent credentials are not recommended due to security concerns.
| kinesis.sink.maxConnections | 1 | Specify the maximum connections to Kinesis |
| kinesis.sink.aggregationEnabled | True | Specify if records should be aggregated before sending them to Kinesis |
| kinesis.sink.recordTtl | 30000 (milliseconds) | Records not successfully written to Kinesis within this time are dropped |
| kinesis.sink.rateLimit | 150 (percent) | Limits the maximum allowed put rate for a shard per executor, as a percentage of the Kinesis shard limits. |
| kinesis.sink.threadingModel | PER_REQUSET | Specify whether the KPL will create new threads per request (PER_REQUEST) or use a pool (POOLED) when invoking Kinesis |
| kinesis.sink.threadPoolSize | 64 | If using a thread pool (above), maximum number of threads |


## Security
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-sql-kinesis-connector_2.12</artifactId>
<version>1.4.0</version>
<version>1.5.0</version>
<packaging>jar</packaging>
<name>Spark Structured Streaming Kinesis Connector</name>
<description>Connector to read from and write into Kinesis from Structured Streaming Applications</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ object CachedKinesisProducer extends Logging {
KinesisOptions.DEFAULT_SINK_MAX_CONNECTIONS)
.toInt

val rateLimit = producerConfiguration.getOrElse(
KinesisOptions.SINK_RATE_LIMIT.toLowerCase(Locale.ROOT),
KinesisOptions.DEFAULT_SINK_MAX_CONNECTIONS)
.toLong

val threadingModel = producerConfiguration.getOrElse(
KinesisOptions.SINK_THREADING_MODEL.toLowerCase(Locale.ROOT),
KinesisOptions.DEFAULT_SINK_THREADING_MODEL)
.toString

val threadPoolSize = producerConfiguration.getOrElse(
KinesisOptions.SINK_THREAD_POOL_SIZE.toLowerCase(Locale.ROOT),
KinesisOptions.DEFAULT_SINK_THREAD_POOL_SIZE)
.toInt

val aggregation = Try { producerConfiguration.getOrElse(
KinesisOptions.SINK_AGGREGATION_ENABLED.toLowerCase(Locale.ROOT),
KinesisOptions.DEFAULT_SINK_AGGREGATION)
Expand All @@ -95,6 +110,9 @@ object CachedKinesisProducer extends Logging {
)
.setRegion(region)
.setRecordTtl(recordTTL)
.setThreadingModel(threadingModel)
.setThreadPoolSize(threadPoolSize)
.setRateLimit(rateLimit)

// check for proxy settings
if (producerConfiguration.contains(KinesisOptions.PROXY_ADDRESS.toLowerCase(Locale.ROOT))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,18 @@ object KinesisOptions {
val SINK_FLUSH_WAIT_TIME_MILLIS: String = SINK_PREFIX + "flushWaitTimeMs"
val SINK_RECORD_MAX_BUFFERED_TIME: String = SINK_PREFIX + "recordMaxBufferedTimeMs"
val SINK_MAX_CONNECTIONS: String = SINK_PREFIX + "maxConnections"
val SINK_RATE_LIMIT: String = SINK_PREFIX + "rateLimit"
val SINK_THREAD_POOL_SIZE: String = SINK_PREFIX + "threadPoolSize"
val SINK_THREADING_MODEL: String = SINK_PREFIX + "threadPool"
val SINK_AGGREGATION_ENABLED: String = SINK_PREFIX + "aggregationEnabled"

val DEFAULT_SINK_FLUSH_WAIT_TIME_MILLIS: String = "100"
val DEFAULT_SINK_RECORD_TTL: String = "30000"
val DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME: String = "1000"
val DEFAULT_SINK_MAX_CONNECTIONS: String = "1"
val DEFAULT_SINK_RATE_LIMIT: String = "150"
val DEFAULT_SINK_THREADING_MODEL: String = "PER_REQUEST"
val DEFAULT_SINK_THREAD_POOL_SIZE: String = "64"
val DEFAULT_SINK_AGGREGATION: String = "true"

// proxy options
Expand Down