Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@

package org.apache.gora.hive.store;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.Metadata;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hive.GoraHiveTestDriver;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStoreTestBase;
import org.apache.gora.store.DataStoreTestUtil;
import org.apache.gora.util.GoraException;
import org.apache.gora.util.StringUtils;
import org.apache.metamodel.query.parser.QueryParserException;
import org.junit.Ignore;
import org.junit.Test;

Expand All @@ -56,6 +67,146 @@ public void assertSchemaExists(String schemaName) throws Exception {
assertTrue(employeeStore.schemaExists());
}

private void awaitWebPageSchema(String key) throws Exception {
// wait until Hive exposes the schema and specific record to avoid empty query results
for (int attempt = 0; attempt < 100; attempt++) {
webPageStore.flush();
if (!webPageStore.schemaExists()) {
Thread.sleep(100L);
continue;
}
if (key == null) {
return;
}
try {
webPageStore.get(key, new String[] {"url"});
return;
} catch (QueryParserException e) {
webPageStore.close();
webPageStore = testDriver.createDataStore(String.class, WebPage.class);
}
}
fail("Hive web page schema or record was not visible");
}

private void populateWebPages() throws Exception {
// load deterministic fixtures and block until every inserted URL becomes queryable
webPageStore.createSchema();
awaitWebPageSchema(null);
WebPageDataCreator.createWebPageData(webPageStore);
for (String url : WebPageDataCreator.URLS) {
awaitWebPageSchema(url);
}
}

private List<String> sortedWebPageUrls() {
// copy and sort URLs to get a stable ordering for range expectations
List<String> sorted = new ArrayList<>(Arrays.asList(WebPageDataCreator.URLS));
Collections.sort(sorted);
return sorted;
}

private void assertResultSize(boolean setStartKey, boolean setEndKey, boolean setLimit)
throws Exception {
// exhaustively exercise start/end/limit combinations while guarding against stale reads
populateWebPages();
List<String> urls = sortedWebPageUrls();
int total = urls.size();

for (int i = 0, iLimit = setStartKey ? total : 1; i < iLimit; i++) {
// choose every permissible end index for the current start index
int jStart = setEndKey ? i : total - 1;
int jLimit = setEndKey ? total : jStart + 1;
for (int j = jStart; j < jLimit; j++) {
Query<String, WebPage> query = webPageStore.newQuery();
if (setStartKey) {
query.setStartKey(urls.get(i));
}
if (setEndKey) {
query.setEndKey(urls.get(j));
}

int startIndex = setStartKey ? i : 0;
int endExclusive = setEndKey ? j + 1 : total;
List<String> eligibleUrls = new ArrayList<>(urls.subList(startIndex, endExclusive));
Set<String> allowed = new HashSet<>(eligibleUrls); // membership guard for returned URLs
LinkedHashSet<String> actual = new LinkedHashSet<>(); // record unique URLs actually read

int expectedCount = eligibleUrls.size();
if (setLimit) {
int limit = expectedCount / 2;
if (limit == 0) {
continue;
}
query.setLimit(limit);
expectedCount = Math.min(limit, expectedCount);
}

Result<String, WebPage> result = query.execute();
int reportedSize = result.size();
while (result.next()) {
WebPage page = result.get();
DataStoreTestUtil.assertWebPage(
page, WebPageDataCreator.URL_INDEXES.get(page.getUrl().toString()));
String url = page.getUrl().toString();
assertTrue("Unexpected url returned: " + url, allowed.contains(url));
assertTrue("Duplicate url returned: " + url, actual.add(url));
}
result.close();

if (!setLimit) {
// full-range scans should yield every eligible URL, in deterministic order
assertEquals(new LinkedHashSet<>(eligibleUrls), actual);
} else {
// limited queries only guarantee the expected cardinality
assertEquals(expectedCount, actual.size());
}
// Result.size() must match the number of rows we iterated over
assertEquals(expectedCount, reportedSize);
}
}
}

@Override
public void testResultSize() throws Exception {
assertResultSize(false, false, false);
}

@Override
public void testResultSizeEndKey() throws Exception {
assertResultSize(false, true, false);
}

@Override
public void testResultSizeEndKeyWithLimit() throws Exception {
assertResultSize(false, true, true);
}

@Override
public void testResultSizeWithLimit() throws Exception {
assertResultSize(false, false, true);
}

@Override
public void testResultSizeKeyRange() throws Exception {
assertResultSize(true, true, false);
}

@Override
public void testResultSizeKeyRangeWithLimit() throws Exception {
assertResultSize(true, true, true);
}

@Override
public void testResultSizeStartKey() throws Exception {
assertResultSize(true, false, false);
}

@Override
public void testResultSizeStartKeyWithLimit() throws Exception {
assertResultSize(true, false, true);
}

@Override
public void assertPut(Employee employee) throws GoraException {
employeeStore.put(employee.getSsn().toString(), employee);
Expand Down
Loading