Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
7 changes: 7 additions & 0 deletions src/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="src" path="main/java"/>
<classpathentry kind="src" path="test/java"/>
<classpathentry kind="output" path="bin"/>
</classpath>
1 change: 1 addition & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin/
17 changes: 17 additions & 0 deletions src/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>src</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ object CosmosDBConfig {
val RootPropertyToSave = "rootpropertytosave"
val BulkImport = "bulkimport"
val BulkUpdate = "bulkupdate"
val BulkRead = "bulkread"
val MaxMiniBatchUpdateCount = "maxminibatchupdatecount"
val MaxBulkReadBatchCount = "maxbulkreadbatchcount"
val ClientInitDelay = "clientinitdelay"
val RangeQuery = "rangequery"


// Writing progress tracking
val WritingBatchId = "writingbatchid"
Expand Down Expand Up @@ -121,8 +125,10 @@ object CosmosDBConfig {
val DefaultStreamingSlowSourceDelayMs = 1
val DefaultBulkImport = true
val DefaultBulkUpdate = false
val DefaultBulkRead = false
val DefaultMaxMiniBatchUpdateCount = 500
val DefaultClientInitDelay = 10
val DefaultBulkReadBatchCount = 100

val DefaultAdlUseGuidForId = true
val DefaultAdlUseGuidForPk = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ import org.apache.spark.Partition

case class CosmosDBPartition(index: Int,
partitionCount: Int,
partitionKeyRangeId: Int) extends Partition
partitionKeyRangeId: Int,
start: Char,
end: Char) extends Partition
Original file line number Diff line number Diff line change
Expand Up @@ -41,56 +41,120 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with LoggingTrait {
var connection: CosmosDBConnection = new CosmosDBConnection(config)
var partitionKeyRanges = connection.getAllPartitions
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length){
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt)
if (config.get(CosmosDBConfig.RangeQuery).isDefined) {
val a1 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i')
}
val a2 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p')
}
val a3 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z')
}
val a4 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9')
}
// val a5 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{')
// }
// val a6 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4')
// }
// val a7 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7')
// }
// val a8 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':')
// }

logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")

(a1 ++ a2 ++ a3 ++ a4).toArray[Partition]
}
else {
Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z')
}
}
}

def computePartitions(config: Config,
requiredColumns: Array[String] = Array(),
filters: Array[Filter] = Array(),
hadoopConfig: mutable.Map[String, String]): Array[Partition] = {
val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined
var connection: CosmosDBConnection = new CosmosDBConnection(config)
if (adlImport) {
// ADL source
val hdfsUtils = new HdfsUtils(hadoopConfig.toMap)
val adlConnection: ADLConnection = ADLConnection(config)
val adlFiles = adlConnection.getFiles
val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath)
val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection)
val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId)
val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount)
.getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString)
.toInt
logDebug(s"The Adl folder has ${adlFiles.size()} files")
val partitions = new ListBuffer[ADLFilePartition]
var partitionIndex = 0
var i = 0
while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) {
var processed = true
if (adlCheckpointPath.isDefined) {
processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get)
} else if (adlCosmosDBFileStoreCollection.isDefined) {
val dbName = config.get[String](CosmosDBConfig.Database).get
val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}"
processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get)
def computePartitions(config: Config,
requiredColumns: Array[String] = Array(),
filters: Array[Filter] = Array(),
hadoopConfig: mutable.Map[String, String]): Array[Partition] = {
val adlImport = config.get(CosmosDBConfig.adlAccountFqdn).isDefined
var connection: CosmosDBConnection = new CosmosDBConnection(config)
if (adlImport) {
// ADL source
val hdfsUtils = new HdfsUtils(hadoopConfig.toMap)
val adlConnection: ADLConnection = ADLConnection(config)
val adlFiles = adlConnection.getFiles
val adlCheckpointPath = config.get(CosmosDBConfig.adlFileCheckpointPath)
val adlCosmosDBFileStoreCollection = config.get(CosmosDBConfig.CosmosDBFileStoreCollection)
val writingBatchId = config.get[String](CosmosDBConfig.WritingBatchId)
val adlMaxFileCount = config.get(CosmosDBConfig.adlMaxFileCount)
.getOrElse(CosmosDBConfig.DefaultAdlMaxFileCount.toString)
.toInt
logDebug(s"The Adl folder has ${adlFiles.size()} files")
val partitions = new ListBuffer[ADLFilePartition]
var partitionIndex = 0
var i = 0
while (i < adlFiles.size() && partitionIndex < adlMaxFileCount) {
var processed = true
if (adlCheckpointPath.isDefined) {
processed = ADLConnection.isAdlFileProcessed(hdfsUtils, adlCheckpointPath.get, adlFiles.get(i), writingBatchId.get)
} else if (adlCosmosDBFileStoreCollection.isDefined) {
val dbName = config.get[String](CosmosDBConfig.Database).get
val collectionLink = s"/dbs/$dbName/colls/${adlCosmosDBFileStoreCollection.get}"
processed = ADLConnection.isAdlFileProcessed(connection, collectionLink, adlFiles.get(i), writingBatchId.get)
}
if (!processed) {
partitions += ADLFilePartition(partitionIndex, adlFiles.get(i))
partitionIndex += 1
}
i += 1
}
if (!processed) {
partitions += ADLFilePartition(partitionIndex, adlFiles.get(i))
partitionIndex += 1
partitions.toArray
} else {
// CosmosDB source
var query: String = FilterConverter.createQueryString(requiredColumns, filters)
var partitionKeyRanges = connection.getAllPartitions(query)

if (config.get(CosmosDBConfig.RangeQuery).isDefined) {
val a1 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'i')
}
val a2 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'i', 'p')
}
val a3 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length * 2 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'p', 'z')
}
val a4 = Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(partitionKeyRanges.length * 3 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '9')
}
// val a5 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 4 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 't', '{')
// }
// val a6 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 5 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '0', '4')
// }
// val a7 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 6 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '4', '7')
// }
// val a8 = Array.tabulate(partitionKeyRanges.length) {
// i => CosmosDBPartition(partitionKeyRanges.length * 7 + i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, '7', ':')
// }

logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")

(a1 ++ a2 ++ a3 ++ a4).toArray[Partition]
}
else {
Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt, 'a', 'z')
}
}
i += 1
}
partitions.toArray
} else {
// CosmosDB source
var query: String = FilterConverter.createQueryString(requiredColumns, filters)
var partitionKeyRanges = connection.getAllPartitions(query)
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length) {
i => CosmosDBPartition(i, partitionKeyRanges.length, partitionKeyRanges(i).getId.toInt)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, LoggingTrait}
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkimport.DocumentBulkImporter
import org.apache.commons.lang3.StringUtils
import org.apache.spark._
import org.apache.spark.sql.sources.Filter

import scala.concurrent.{Await, Future}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import scala.collection.JavaConversions._

object CosmosDBRDDIterator {

Expand Down Expand Up @@ -186,10 +192,89 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
.get[String](CosmosDBConfig.QueryCustom)
.getOrElse(FilterConverter.createQueryString(requiredColumns, filters))
logDebug(s"CosmosDBRDDIterator::LazyReader, convert to predicate: $queryString")

if (queryString == FilterConverter.defaultQuery) {

val bulkRead = config
.get[String](CosmosDBConfig.BulkRead)
if(bulkRead.isDefined)
{
val maxBatchSize = config
.get[String](CosmosDBConfig.MaxBulkReadBatchCount)
.getOrElse(CosmosDBConfig.DefaultBulkReadBatchCount.toString)
.toInt

var collectionThroughput: Int = 0
collectionThroughput = connection.getCollectionThroughput
val importer: DocumentBulkImporter = connection.getDocumentBulkImporter(collectionThroughput)
importer.readDocuments(partition.partitionKeyRangeId.toString, maxBatchSize)
}
else if (queryString == FilterConverter.defaultQuery) {
// If there is no filters, read feed should be used
connection.readDocuments(feedOpts)
val rangeQuery = config
.get[String](CosmosDBConfig.RangeQuery)

if(rangeQuery.isDefined) {

val rangeQueryString = s"SELECT * FROM data o WHERE o.id >= '${partition.start}' and o.id <= '${partition.end}'"
// val queryString2 = "SELECT * FROM data o WHERE o.partitionKey >= 'e' and o.partitionKey <= 'i'"
// val queryString3 = "SELECT * FROM data o WHERE o.partitionKey >= 'i' and o.partitionKey <= 'l'"
// val queryString4 = "SELECT * FROM data o WHERE o.partitionKey >= 'l' and o.partitionKey <= 'p'"
// val queryString5 = "SELECT * FROM data o WHERE o.partitionKey >= 'p' and o.partitionKey <= 't'"
// val queryString6 = "SELECT * FROM data o WHERE o.partitionKey >= 't' and o.partitionKey <= '{'"
// val queryString7 = "SELECT * FROM data o WHERE o.partitionKey >= '0' and o.partitionKey <= '4'"
// val queryString8 = "SELECT * FROM data o WHERE o.partitionKey >= '4' and o.partitionKey <= '7'"
// val queryString9 = "SELECT * FROM data o WHERE o.partitionKey >= '7' and o.partitionKey <= ':'"
//
//
// val query1 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString1, feedOpts)
// }
// val query2 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString2, feedOpts)
// }
// val query3 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString3, feedOpts)
// }
// val query4 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString4, feedOpts)
// }
// val query5 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString5, feedOpts)
// }
// val query6 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString6, feedOpts)
// }
// val query7 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString7, feedOpts)
// }
// val query8 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString8, feedOpts)
// }
// val query9 = Future[Iterator[Document]] {
// connection.queryDocuments(queryString9, feedOpts)
// }
//
// val combinedFuture =
// for {
// r1 <- query1
// r2 <- query2
// r3 <- query3
// r4 <- query4
// r5 <- query5
// r6 <- query6
// r7 <- query7
// r8 <- query8
// r9 <- query9
// } yield (r1, r2, r3, r4, r5, r6, r7, r8, r9)
//
// val (r1, r2, r3, r4, r5, r6, r7, r8, r9) = Await.result(combinedFuture, Duration.Inf)
// r1 ++ r2 ++ r3 ++ r4 ++ r5 ++ r6 ++ r7 ++ r8 ++ r9
connection.queryDocuments(rangeQueryString, feedOpts)
}
else
{
connection.readDocuments(feedOpts)
}

} else {
connection.queryDocuments(queryString, feedOpts)
}
Expand Down Expand Up @@ -355,3 +440,5 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
}
}
}