Skip to content
This repository was archived by the owner on Sep 1, 2022. It is now read-only.

Commit 4a66e87

Browse files
committed
Updates hadoop connector to version 1.1.0
user now can specify the string precision of the output collections.
1 parent b343570 commit 4a66e87

File tree

8 files changed

+83
-59
lines changed

8 files changed

+83
-59
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.microsoft.azure</groupId>
55
<artifactId>azure-documentdb-hadoop</artifactId>
6-
<version>1.0.0</version>
6+
<version>1.1.0</version>
77
<packaging>jar</packaging>
88

99
<name>Azure-DocumentDB-Hadoop</name>
@@ -227,7 +227,7 @@
227227
<dependency>
228228
<groupId>com.microsoft.azure</groupId>
229229
<artifactId>azure-documentdb</artifactId>
230-
<version>1.0.0</version>
230+
<version>1.1.0</version>
231231
</dependency>
232232
<dependency>
233233
<groupId>org.apache.pig</groupId>

src/com/microsoft/azure/documentdb/hadoop/ConfigurationUtil.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Properties;
88
import java.util.Set;
99

10+
import org.apache.commons.lang3.StringUtils;
1011
import org.apache.hadoop.conf.Configuration;
1112

1213
import com.google.common.collect.ImmutableSet;
@@ -48,10 +49,10 @@ public class ConfigurationUtil {
4849
public static final String QUERY = "DocumentDB.query";
4950

5051
/**
51-
* Comma separated Property paths to range index in the output collections.
52+
* Precision of the output collections' string indexes .
5253
*/
53-
public static final String OUTPUT_RANGE_INDEXED = "DocumentDB.rangeIndex";
54-
54+
public static final String OUTPUT_STRING_PRECISION = "DocumentDB.outputStringPrecision";
55+
5556
/**
5657
* The offer type of the output collections.
5758
*/
@@ -62,6 +63,8 @@ public class ConfigurationUtil {
6263
*/
6364
public static final String UPSERT = "DocumentDB.upsert";
6465

66+
public static final int DEFAULT_STRING_PRECISION = -1; // Maxmum precision.
67+
6568
/**
6669
* Gets the DocumentDB.db from the Configuration object.
6770
* @param conf job configuration object
@@ -126,20 +129,30 @@ public final static String getQuery(Configuration conf) {
126129
}
127130

128131
/**
129-
* Gets the DocumentDB.rangeIndex from the Configuration object.
132+
* Gets the DocumentDB.outputStringPrecision from the Configuration object.
130133
* @param conf job configuration object
131-
* @return array of property paths to range index.
132-
*/
133-
public final static String[] getRangeIndex(Configuration conf) {
134-
String rangeIndexed = conf.get(OUTPUT_RANGE_INDEXED);
135-
String[] propertyNames = null;
136-
if (rangeIndexed != null) {
137-
propertyNames = rangeIndexed.split(",");
138-
} else {
139-
propertyNames = new String[] {};
134+
* @return the string precision of the output collections.
135+
*/
136+
public final static int getOutputStringPrecision(Configuration conf) {
137+
String value = conf.get(OUTPUT_STRING_PRECISION);
138+
139+
Integer stringPrecision = new Integer(DEFAULT_STRING_PRECISION);
140+
141+
if (StringUtils.isEmpty(value)) {
142+
return stringPrecision;
140143
}
141-
142-
return propertyNames;
144+
145+
try {
146+
stringPrecision = Integer.valueOf(value);
147+
} catch (IllegalArgumentException e) {
148+
throw new IllegalArgumentException("outputStringPrecision is expected to be an integer.", e);
149+
}
150+
151+
if (stringPrecision < -1 || stringPrecision == 0) {
152+
throw new IllegalArgumentException("outputStringPrecision can only be -1 or a positive number.");
153+
}
154+
155+
return stringPrecision;
143156
}
144157

145158
/**

src/com/microsoft/azure/documentdb/hadoop/DocumentDBConnectorUtil.java

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
import org.apache.commons.logging.Log;
1616
import org.apache.commons.logging.LogFactory;
1717

18+
import com.microsoft.azure.documentdb.DataType;
1819
import com.microsoft.azure.documentdb.Database;
1920
import com.microsoft.azure.documentdb.Document;
2021
import com.microsoft.azure.documentdb.DocumentClient;
2122
import com.microsoft.azure.documentdb.DocumentClientException;
2223
import com.microsoft.azure.documentdb.DocumentCollection;
23-
import com.microsoft.azure.documentdb.IndexType;
24-
import com.microsoft.azure.documentdb.IndexingPath;
24+
import com.microsoft.azure.documentdb.IncludedPath;
2525
import com.microsoft.azure.documentdb.IndexingPolicy;
2626
import com.microsoft.azure.documentdb.QueryIterable;
27+
import com.microsoft.azure.documentdb.RangeIndex;
2728
import com.microsoft.azure.documentdb.RequestOptions;
2829
import com.microsoft.azure.documentdb.SqlParameter;
2930
import com.microsoft.azure.documentdb.SqlParameterCollection;
@@ -42,38 +43,25 @@ public class DocumentDBConnectorUtil {
4243
private final static String BULK_IMPORT_ID = "HadoopBulkImportSprocV1";
4344
private final static String BULK_IMPORT_PATH = "/BulkImportScript.js";
4445

45-
public static String UserAgentSuffix = " HadoopConnector/1.0.0";
46+
public static String UserAgentSuffix = " HadoopConnector/1.1.0";
4647

4748
/**
4849
* Gets an output collection with the passed name ( if the collection already exists return it, otherwise create new one
4950
* @param client The DocumentClient instance.
5051
* @param databaseSelfLink the self link of the passed database.
51-
* @param collectionName The id of the output collection.
52-
* @param rangeIndexes An optional parameter that contain index paths for range indexes and it will be used to create an indexing policy.
52+
* @param collectionId The id of the output collection.
53+
* @param outputStringPrecision An optional parameter that contains the default string precision to be used to create an indexing policy.
54+
* @param offerType An optional parameter that contains the offer type of the output collection.
5355
*/
5456
public static DocumentCollection getOrCreateOutputCollection(DocumentClient client, String databaseSelfLink,
55-
String collectionName, String[] rangeIndexes, String offerType) throws DocumentClientException {
56-
57-
DocumentCollection outputCollection = DocumentDBConnectorUtil.GetDocumentCollection(client, databaseSelfLink, collectionName);
57+
String collectionId, int outputStringPrecision, String offerType) throws DocumentClientException {
58+
59+
DocumentCollection outputCollection = DocumentDBConnectorUtil.GetDocumentCollection(client, databaseSelfLink, collectionId);
5860

5961
if (outputCollection == null) {
60-
DocumentCollection outputColl = new DocumentCollection("{ 'id':'" + collectionName + "' }");
61-
if (rangeIndexes != null && rangeIndexes.length > 0) {
62-
IndexingPolicy policy = new IndexingPolicy();
63-
ArrayList<IndexingPath> indexingPaths = new ArrayList<IndexingPath>(rangeIndexes.length);
64-
for (int i = 0; i < rangeIndexes.length; i++) {
65-
IndexingPath path = new IndexingPath();
66-
path.setIndexType(IndexType.Range);
67-
path.setPath(rangeIndexes[i]);
68-
indexingPaths.add(path);
69-
}
62+
DocumentCollection outputColl = new DocumentCollection("{ 'id':'" + collectionId + "' }");
7063

71-
IndexingPath defaultPath = new IndexingPath();
72-
defaultPath.setPath("/");
73-
indexingPaths.add(defaultPath);
74-
policy.getIncludedPaths().addAll(indexingPaths);
75-
outputColl.setIndexingPolicy(policy);
76-
}
64+
outputColl.setIndexingPolicy(DocumentDBConnectorUtil.getOutputIndexingPolicy(outputStringPrecision));
7765

7866
BackoffExponentialRetryPolicy retryPolicy = new BackoffExponentialRetryPolicy();
7967

@@ -92,6 +80,12 @@ public static DocumentCollection getOrCreateOutputCollection(DocumentClient clie
9280
return outputCollection;
9381
}
9482

83+
/**
84+
* Gets an output collection with the passed name ( if the collection already exists return it, otherwise create new one
85+
* @param client The DocumentClient instance.
86+
* @param databaseSelfLink the self link of the passed database.
87+
* @param collectionId The id of the output collection.
88+
*/
9589
public static DocumentCollection GetDocumentCollection(DocumentClient client, String databaseSelfLink, String collectionId) {
9690
BackoffExponentialRetryPolicy retryPolicy = new BackoffExponentialRetryPolicy();
9791
QueryIterable<DocumentCollection> collIterable = client.queryCollections(
@@ -269,4 +263,23 @@ public static void addIdIfMissing(Document doc) {
269263
doc.setId(UUID.randomUUID().toString());
270264
}
271265
}
266+
267+
private static IndexingPolicy getOutputIndexingPolicy(int outputStringPrecision) {
268+
// Setup indexing policy.
269+
IndexingPolicy policy = new IndexingPolicy();
270+
ArrayList<IncludedPath> includedPaths = new ArrayList<IncludedPath>();
271+
272+
// All paths.
273+
IncludedPath path = new IncludedPath();
274+
RangeIndex stringIndex = new RangeIndex(DataType.String);
275+
stringIndex.setPrecision(outputStringPrecision);
276+
path.getIndexes().add(stringIndex);
277+
RangeIndex numberIndex = new RangeIndex(DataType.Number);
278+
numberIndex.setPrecision(-1); // Maximum precision
279+
path.getIndexes().add(numberIndex);
280+
path.setPath("/*");
281+
includedPaths.add(path);
282+
policy.setIncludedPaths(includedPaths);
283+
return policy;
284+
}
272285
}

src/com/microsoft/azure/documentdb/hadoop/DocumentDBOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public RecordWriter<Writable, DocumentDBWritable> getRecordWriter(TaskAttemptCon
5656
Configuration conf = context.getConfiguration();
5757
return new DocumentDBRecordWriter(conf, ConfigurationUtil.getDBEndpoint(conf),
5858
ConfigurationUtil.getDBKey(conf), ConfigurationUtil.getDBName(conf),
59-
ConfigurationUtil.getOutputCollectionNames(conf), ConfigurationUtil.getRangeIndex(conf),
59+
ConfigurationUtil.getOutputCollectionNames(conf),
60+
ConfigurationUtil.getOutputStringPrecision(conf),
6061
ConfigurationUtil.getUpsert(conf),
6162
ConfigurationUtil.getOutputCollectionsOffer(conf));
6263
}

src/com/microsoft/azure/documentdb/hadoop/DocumentDBRecordWriter.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@
2020
import com.microsoft.azure.documentdb.Document;
2121
import com.microsoft.azure.documentdb.DocumentClient;
2222
import com.microsoft.azure.documentdb.DocumentCollection;
23-
import com.microsoft.azure.documentdb.QueryIterable;
24-
import com.microsoft.azure.documentdb.SqlParameter;
25-
import com.microsoft.azure.documentdb.SqlParameterCollection;
26-
import com.microsoft.azure.documentdb.SqlQuerySpec;
2723
import com.microsoft.azure.documentdb.StoredProcedure;
2824

2925
/**
@@ -41,7 +37,7 @@ public class DocumentDBRecordWriter extends RecordWriter<Writable, DocumentDBWri
4137
private int currentStoredProcedureIndex = 0;
4238

4339
public DocumentDBRecordWriter(Configuration conf, String host, String key, String dbName, String[] collNames,
44-
String[] rangeIndexes, boolean upsert, String offerType) throws IOException {
40+
int outputStringPrecision, boolean upsert, String offerType) throws IOException {
4541
try {
4642
ConnectionPolicy policy = ConnectionPolicy.GetDefault();
4743
policy.setUserAgentSuffix(DocumentDBConnectorUtil.UserAgentSuffix);
@@ -53,7 +49,7 @@ public DocumentDBRecordWriter(Configuration conf, String host, String key, Strin
5349
this.sprocs = new StoredProcedure[collNames.length];
5450
for (int i = 0; i < collNames.length; i++) {
5551
this.collections[i] = DocumentDBConnectorUtil.getOrCreateOutputCollection(client, db.getSelfLink(), collNames[i],
56-
rangeIndexes, offerType);
52+
outputStringPrecision, offerType);
5753
this.sprocs[i] = DocumentDBConnectorUtil.CreateBulkImportStoredProcedure(client, this.collections[i].getSelfLink());
5854
}
5955

src/com/microsoft/azure/documentdb/mapred/hadoop/DocumentDBOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public RecordWriter<Writable, DocumentDBWritable> getRecordWriter(FileSystem fs,
4747

4848
return new DocumentDBRecordWriter(conf, ConfigurationUtil.getDBEndpoint(conf),
4949
ConfigurationUtil.getDBKey(conf), ConfigurationUtil.getDBName(conf),
50-
ConfigurationUtil.getOutputCollectionNames(conf), ConfigurationUtil.getRangeIndex(conf),
50+
ConfigurationUtil.getOutputCollectionNames(conf),
51+
ConfigurationUtil.getOutputStringPrecision(conf),
5152
ConfigurationUtil.getUpsert(conf), ConfigurationUtil.getOutputCollectionsOffer(conf));
5253
}
5354
}

src/com/microsoft/azure/documentdb/mapred/hadoop/DocumentDBRecordWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.microsoft.azure.documentdb.ConnectionPolicy;
1818
import com.microsoft.azure.documentdb.ConsistencyLevel;
19+
import com.microsoft.azure.documentdb.DataType;
1920
import com.microsoft.azure.documentdb.Database;
2021
import com.microsoft.azure.documentdb.Document;
2122
import com.microsoft.azure.documentdb.DocumentClient;
@@ -45,7 +46,7 @@ public class DocumentDBRecordWriter implements RecordWriter<Writable, DocumentDB
4546
private int currentStoredProcedureIndex = 0;
4647

4748
public DocumentDBRecordWriter(JobConf conf, String host, String key, String dbName, String[] collNames,
48-
String[] rangeIndexes, boolean upsert, String offerType) throws IOException {
49+
int outputStringPrecision, boolean upsert, String offerType) throws IOException {
4950
DocumentClient client;
5051
try {
5152
ConnectionPolicy policy = ConnectionPolicy.GetDefault();
@@ -57,7 +58,7 @@ public DocumentDBRecordWriter(JobConf conf, String host, String key, String dbNa
5758
this.sprocs = new StoredProcedure[collNames.length];
5859
for (int i = 0; i < collNames.length; i++) {
5960
this.collections[i] = DocumentDBConnectorUtil.getOrCreateOutputCollection(client, db.getSelfLink(), collNames[i],
60-
rangeIndexes, offerType);
61+
outputStringPrecision, offerType);
6162
this.sprocs[i] = DocumentDBConnectorUtil.CreateBulkImportStoredProcedure(client, this.collections[i].getSelfLink());
6263
}
6364

src/com/microsoft/azure/documentdb/pig/DocumentDBStorage.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
2424
import org.apache.pig.data.Tuple;
2525
import org.apache.pig.impl.util.UDFContext;
26-
import org.json.JSONException;
2726

2827
import com.microsoft.azure.documentdb.Document;
2928
import com.microsoft.azure.documentdb.hadoop.ConfigurationUtil;
@@ -42,7 +41,7 @@ public class DocumentDBStorage extends StoreFunc implements StoreMetadata {
4241
protected ResourceSchema schema = null;
4342
private String dbName;
4443
private String outputCollections;
45-
private String rangeIndexed;
44+
private String stringPrecision;
4645
private String upsert;
4746
private String offerType;
4847
private String udfContextSignature = null;
@@ -61,12 +60,12 @@ public DocumentDBStorage(String masterkey, String dbName, String outputCollectio
6160
this(masterkey, dbName, outputCollections, offerType, null, null);
6261
}
6362

64-
public DocumentDBStorage(String masterkey, String dbName, String outputCollections, String offerType, String rangeindexed, String upsert) {
63+
public DocumentDBStorage(String masterkey, String dbName, String outputCollections, String offerType, String outputStringPrecision, String upsert) {
6564
this.masterkey = masterkey;
6665
this.dbName = dbName;
6766
this.outputCollections = outputCollections;
6867
this.upsert = upsert;
69-
this.rangeIndexed = rangeindexed;
68+
this.stringPrecision = outputStringPrecision;
7069
this.offerType = offerType;
7170

7271
// Set the userAgent to pig storage
@@ -95,11 +94,11 @@ public void setStoreLocation(final String location, final Job job) throws IOExce
9594
if (this.upsert != null) {
9695
conf.set(ConfigurationUtil.UPSERT, this.upsert);
9796
}
98-
99-
if (this.rangeIndexed != null) {
100-
conf.set(ConfigurationUtil.OUTPUT_RANGE_INDEXED, this.rangeIndexed);
97+
98+
if (this.stringPrecision != null) {
99+
conf.set(ConfigurationUtil.OUTPUT_STRING_PRECISION, this.stringPrecision);
101100
}
102-
101+
103102
if(this.offerType != null) {
104103
conf.set(ConfigurationUtil.OUTPUT_COLLECTIONS_OFFER, this.offerType);
105104
}

0 commit comments

Comments
 (0)