Skip to content
This repository was archived by the owner on Sep 1, 2022. It is now read-only.

Commit 7aa3b42

Browse files
author
nomiero
committed
Merge pull request #5 from nomiero/master
Update to version 1.0.0.
2 parents 525a8d4 + beffd72 commit 7aa3b42

File tree

11 files changed

+86
-28
lines changed

11 files changed

+86
-28
lines changed

pom.xml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.microsoft.azure</groupId>
55
<artifactId>azure-documentdb-hadoop</artifactId>
6-
<version>0.9.2</version>
6+
<version>1.0.0</version>
77
<packaging>jar</packaging>
88

99
<name>Azure-DocumentDB-Hadoop</name>
@@ -15,6 +15,7 @@
1515
<url>http://www.opensource.org/licenses/mit-license.php</url>
1616
</license>
1717
</licenses>
18+
1819
<developers>
1920
<developer>
2021
<name>DocumentDB Team</name>
@@ -132,7 +133,7 @@
132133
<groupId>org.apache.hadoop</groupId>
133134
<artifactId>hadoop-mapreduce</artifactId>
134135
<version>2.5.1</version>
135-
<type>pom</type>
136+
<type>pom</type>
136137
</dependency>
137138
<dependency>
138139
<groupId>org.apache.commons</groupId>
@@ -154,7 +155,7 @@
154155
<artifactId>commons-codec</artifactId>
155156
<groupId>commons-codec</groupId>
156157
</exclusion>
157-
</exclusions>
158+
</exclusions>
158159
</dependency>
159160
<dependency>
160161
<groupId>org.apache.hadoop</groupId>
@@ -215,18 +216,18 @@
215216
<groupId>org.apache.hive</groupId>
216217
<artifactId>hive-exec</artifactId>
217218
<version>0.13.1</version>
218-
<type>jar</type>
219+
<type>jar</type>
219220
</dependency>
220221
<dependency>
221222
<groupId>org.apache.hive</groupId>
222223
<artifactId>hive-metastore</artifactId>
223224
<version>0.13.1</version>
224-
<type>jar</type>
225+
<type>jar</type>
225226
</dependency>
226227
<dependency>
227228
<groupId>com.microsoft.azure</groupId>
228229
<artifactId>azure-documentdb</artifactId>
229-
<version>0.9.5</version>
230+
<version>1.0.0</version>
230231
</dependency>
231232
<dependency>
232233
<groupId>org.apache.pig</groupId>

src/BulkImportScript.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ function bulkImport(docs, upsert) {
6060
} else {
6161
throw "Error in retrieving document: " + doc.id;
6262
}
63-
});
64-
if (!isAccepted) getContext().getResponse().setBody(count);
63+
});
64+
65+
if (!isAccepted) getContext().getResponse().setBody(count);
6566
}
6667

6768
// This is called when collection.createDocument is done in order to

src/com/microsoft/azure/documentdb/hadoop/ConfigurationUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public class ConfigurationUtil {
5252
*/
5353
public static final String OUTPUT_RANGE_INDEXED = "DocumentDB.rangeIndex";
5454

55+
/**
56+
* The offer type of the output collections.
57+
*/
58+
public static final String OUTPUT_COLLECTIONS_OFFER = "DocumentDB.outputCollectionsOffer";
59+
5560
/**
5661
* An upsert option, true by default. This can be disabled by setting it to "false"
5762
*/
@@ -147,6 +152,16 @@ public final static boolean getUpsert(Configuration conf) {
147152
return (upsert != null && upsert.equalsIgnoreCase("false")) ? false : true;
148153
}
149154

155+
/**
156+
* Gets the DocumentDB.outputCollectionsOffer from the Configuration object.
157+
* @param conf job configuration object
158+
* @return the value of documentdb.outputCollectionsOffer option
159+
*/
160+
public final static String getOutputCollectionsOffer(Configuration conf) {
161+
String outputCollectionsOffer = conf.get(OUTPUT_COLLECTIONS_OFFER);
162+
return (outputCollectionsOffer != null) ? outputCollectionsOffer : "S3";
163+
}
164+
150165
/**
151166
* Copies the configuration properties for the connector to a map.
152167
* @param from Properties object to copy from.

src/com/microsoft/azure/documentdb/hadoop/DocumentDBConnectorUtil.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.microsoft.azure.documentdb.IndexingPath;
2525
import com.microsoft.azure.documentdb.IndexingPolicy;
2626
import com.microsoft.azure.documentdb.QueryIterable;
27+
import com.microsoft.azure.documentdb.RequestOptions;
2728
import com.microsoft.azure.documentdb.SqlParameter;
2829
import com.microsoft.azure.documentdb.SqlParameterCollection;
2930
import com.microsoft.azure.documentdb.SqlQuerySpec;
@@ -41,7 +42,7 @@ public class DocumentDBConnectorUtil {
4142
private final static String BULK_IMPORT_ID = "HadoopBulkImportSprocV1";
4243
private final static String BULK_IMPORT_PATH = "/BulkImportScript.js";
4344

44-
public static final String UserAgentSuffix = "HadoopConnector/0.9.2";
45+
public static String UserAgentSuffix = " HadoopConnector/1.0.0";
4546

4647
/**
4748
* Gets an output collection with the passed name ( if the collection already exists return it, otherwise create new one
@@ -51,13 +52,13 @@ public class DocumentDBConnectorUtil {
5152
* @param rangeIndexes An optional parameter that contain index paths for range indexes and it will be used to create an indexing policy.
5253
*/
5354
public static DocumentCollection getOrCreateOutputCollection(DocumentClient client, String databaseSelfLink,
54-
String collectionName, String[] rangeIndexes) throws DocumentClientException {
55+
String collectionName, String[] rangeIndexes, String offerType) throws DocumentClientException {
5556

5657
DocumentCollection outputCollection = DocumentDBConnectorUtil.GetDocumentCollection(client, databaseSelfLink, collectionName);
5758

5859
if (outputCollection == null) {
5960
DocumentCollection outputColl = new DocumentCollection("{ 'id':'" + collectionName + "' }");
60-
if (rangeIndexes.length > 0) {
61+
if (rangeIndexes != null && rangeIndexes.length > 0) {
6162
IndexingPolicy policy = new IndexingPolicy();
6263
ArrayList<IndexingPath> indexingPaths = new ArrayList<IndexingPath>(rangeIndexes.length);
6364
for (int i = 0; i < rangeIndexes.length; i++) {
@@ -73,12 +74,14 @@ public static DocumentCollection getOrCreateOutputCollection(DocumentClient clie
7374
policy.getIncludedPaths().addAll(indexingPaths);
7475
outputColl.setIndexingPolicy(policy);
7576
}
76-
77+
7778
BackoffExponentialRetryPolicy retryPolicy = new BackoffExponentialRetryPolicy();
7879

7980
while(retryPolicy.shouldRetry()) {
8081
try {
81-
outputCollection = client.createCollection(databaseSelfLink, outputColl, null).getResource();
82+
RequestOptions options = new RequestOptions();
83+
options.setOfferType(offerType);
84+
outputCollection = client.createCollection(databaseSelfLink, outputColl, options).getResource();
8285
break;
8386
} catch (Exception e) {
8487
retryPolicy.errorOccured(e);
@@ -107,7 +110,10 @@ public static DocumentCollection GetDocumentCollection(DocumentClient client, St
107110
}
108111
}
109112

110-
if(collections.size() == 0) return null;
113+
if(collections.size() == 0) {
114+
return null;
115+
}
116+
111117
return collections.get(0);
112118
}
113119

@@ -128,7 +134,10 @@ public static Database GetDatabase(DocumentClient client, String databaseId) {
128134
}
129135
}
130136

131-
if(databases.size() == 0) return null;
137+
if(databases.size() == 0) {
138+
return null;
139+
}
140+
132141
return databases.get(0);
133142
}
134143

@@ -177,10 +186,11 @@ public static StoredProcedure CreateBulkImportStoredProcedure(DocumentClient cli
177186
*/
178187
public static void executeWriteStoredProcedure(final DocumentClient client, String collectionSelfLink, final StoredProcedure sproc,
179188
List<Document> allDocs, final boolean upsert) {
180-
int currentCount = 0;
181189

190+
int currentCount = 0;
191+
182192
while (currentCount < allDocs.size())
183-
{
193+
{
184194
String []jsonArrayString = CreateBulkInsertScriptArguments(allDocs, currentCount, MAX_SCRIPT_SIZE);
185195
BackoffExponentialRetryPolicy retryPolicy = new BackoffExponentialRetryPolicy();
186196
String response = null;
@@ -193,7 +203,7 @@ public static void executeWriteStoredProcedure(final DocumentClient client, Stri
193203
retryPolicy.errorOccured(e);
194204
}
195205
}
196-
206+
197207
int createdCount = Integer.parseInt(response);
198208
currentCount += createdCount;
199209
}

src/com/microsoft/azure/documentdb/hadoop/DocumentDBOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public RecordWriter<Writable, DocumentDBWritable> getRecordWriter(TaskAttemptCon
5757
return new DocumentDBRecordWriter(conf, ConfigurationUtil.getDBEndpoint(conf),
5858
ConfigurationUtil.getDBKey(conf), ConfigurationUtil.getDBName(conf),
5959
ConfigurationUtil.getOutputCollectionNames(conf), ConfigurationUtil.getRangeIndex(conf),
60-
ConfigurationUtil.getUpsert(conf));
60+
ConfigurationUtil.getUpsert(conf),
61+
ConfigurationUtil.getOutputCollectionsOffer(conf));
6162
}
6263
}

src/com/microsoft/azure/documentdb/hadoop/DocumentDBRecordWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,19 @@ public class DocumentDBRecordWriter extends RecordWriter<Writable, DocumentDBWri
4141
private int currentStoredProcedureIndex = 0;
4242

4343
public DocumentDBRecordWriter(Configuration conf, String host, String key, String dbName, String[] collNames,
44-
String[] rangeIndexes, boolean upsert) throws IOException {
44+
String[] rangeIndexes, boolean upsert, String offerType) throws IOException {
4545
try {
4646
ConnectionPolicy policy = ConnectionPolicy.GetDefault();
4747
policy.setUserAgentSuffix(DocumentDBConnectorUtil.UserAgentSuffix);
4848
DocumentClient client = new DocumentClient(host, key, policy,
4949
ConsistencyLevel.Session);
50-
50+
5151
Database db = DocumentDBConnectorUtil.GetDatabase(client, dbName);
5252
this.collections = new DocumentCollection[collNames.length];
5353
this.sprocs = new StoredProcedure[collNames.length];
5454
for (int i = 0; i < collNames.length; i++) {
5555
this.collections[i] = DocumentDBConnectorUtil.getOrCreateOutputCollection(client, db.getSelfLink(), collNames[i],
56-
rangeIndexes);
56+
rangeIndexes, offerType);
5757
this.sprocs[i] = DocumentDBConnectorUtil.CreateBulkImportStoredProcedure(client, this.collections[i].getSelfLink());
5858
}
5959

src/com/microsoft/azure/documentdb/hive/DocumentDBSerDe.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.openx.data.jsonserde.json.JSONObject;
1818

1919
import com.microsoft.azure.documentdb.Document;
20+
import com.microsoft.azure.documentdb.hadoop.DocumentDBConnectorUtil;
2021
import com.microsoft.azure.documentdb.hadoop.DocumentDBWritable;
2122

2223
/**
@@ -25,13 +26,18 @@
2526
*
2627
*/
2728
public class DocumentDBSerDe implements SerDe {
28-
29+
private static final String HIVE_USERAGENT = " HiveConnector/1.0.0";
2930
private DocumentDBWritable cachedWritable;
3031
private JsonSerDe jsonSerde;
3132

3233
public DocumentDBSerDe() {
3334
this.cachedWritable = new DocumentDBWritable();
3435
this.jsonSerde = new JsonSerDe();
36+
37+
// Set the user-agent to hive.
38+
if (!DocumentDBConnectorUtil.UserAgentSuffix.contains(DocumentDBSerDe.HIVE_USERAGENT)) {
39+
DocumentDBConnectorUtil.UserAgentSuffix += DocumentDBSerDe.HIVE_USERAGENT;
40+
}
3541
}
3642

3743
/**

src/com/microsoft/azure/documentdb/mapred/hadoop/DocumentDBOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ public RecordWriter<Writable, DocumentDBWritable> getRecordWriter(FileSystem fs,
4848
return new DocumentDBRecordWriter(conf, ConfigurationUtil.getDBEndpoint(conf),
4949
ConfigurationUtil.getDBKey(conf), ConfigurationUtil.getDBName(conf),
5050
ConfigurationUtil.getOutputCollectionNames(conf), ConfigurationUtil.getRangeIndex(conf),
51-
ConfigurationUtil.getUpsert(conf));
51+
ConfigurationUtil.getUpsert(conf), ConfigurationUtil.getOutputCollectionsOffer(conf));
5252
}
5353
}

src/com/microsoft/azure/documentdb/mapred/hadoop/DocumentDBRecordWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DocumentDBRecordWriter implements RecordWriter<Writable, DocumentDB
4545
private int currentStoredProcedureIndex = 0;
4646

4747
public DocumentDBRecordWriter(JobConf conf, String host, String key, String dbName, String[] collNames,
48-
String[] rangeIndexes, boolean upsert) throws IOException {
48+
String[] rangeIndexes, boolean upsert, String offerType) throws IOException {
4949
DocumentClient client;
5050
try {
5151
ConnectionPolicy policy = ConnectionPolicy.GetDefault();
@@ -57,7 +57,7 @@ public DocumentDBRecordWriter(JobConf conf, String host, String key, String dbNa
5757
this.sprocs = new StoredProcedure[collNames.length];
5858
for (int i = 0; i < collNames.length; i++) {
5959
this.collections[i] = DocumentDBConnectorUtil.getOrCreateOutputCollection(client, db.getSelfLink(), collNames[i],
60-
rangeIndexes);
60+
rangeIndexes, offerType);
6161
this.sprocs[i] = DocumentDBConnectorUtil.CreateBulkImportStoredProcedure(client, this.collections[i].getSelfLink());
6262
}
6363

src/com/microsoft/azure/documentdb/pig/DocumentDBLoader.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@
3030

3131
import com.microsoft.azure.documentdb.Document;
3232
import com.microsoft.azure.documentdb.hadoop.ConfigurationUtil;
33+
import com.microsoft.azure.documentdb.hadoop.DocumentDBConnectorUtil;
3334
import com.microsoft.azure.documentdb.hadoop.DocumentDBInputFormat;
3435
import com.microsoft.azure.documentdb.hadoop.DocumentDBWritable;
3536

3637
/**
3738
* A Pig data loader from DocumentDB.
3839
*/
3940
public class DocumentDBLoader extends LoadFunc{
41+
private static final String PIG_LOADER_USERAGENT = " PigConnectorLoader/1.0.0";
4042
private String masterkey = null;
4143
private RecordReader reader = null;
4244
private ResourceFieldSchema[] fields;
@@ -58,6 +60,11 @@ public DocumentDBLoader(String masterkey, String dbName, String inputCollections
5860
// Comma separated collection names
5961
this.inputCollections = inputCollections;
6062
this.query = query;
63+
64+
//Set the userAgent to pig loader
65+
if (!DocumentDBConnectorUtil.UserAgentSuffix.contains(DocumentDBLoader.PIG_LOADER_USERAGENT)) {
66+
DocumentDBConnectorUtil.UserAgentSuffix += DocumentDBLoader.PIG_LOADER_USERAGENT;
67+
}
6168
}
6269

6370
/**

0 commit comments

Comments
 (0)