diff --git a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java index 003d8ec7..d993c15b 100644 --- a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java +++ b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java @@ -18,22 +18,33 @@ package org.apache.gora.hive.store; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import org.apache.avro.util.Utf8; +import org.apache.gora.examples.WebPageDataCreator; import org.apache.gora.examples.generated.Employee; import org.apache.gora.examples.generated.Metadata; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.hive.GoraHiveTestDriver; import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; import org.apache.gora.store.DataStoreTestBase; import org.apache.gora.store.DataStoreTestUtil; import org.apache.gora.util.GoraException; import org.apache.gora.util.StringUtils; +import org.apache.metamodel.query.parser.QueryParserException; import org.junit.Ignore; import org.junit.Test; @@ -56,6 +67,146 @@ public void assertSchemaExists(String schemaName) throws Exception { assertTrue(employeeStore.schemaExists()); } + private void awaitWebPageSchema(String key) throws Exception { + // wait until Hive exposes the schema and specific record to avoid empty query results + for (int attempt = 0; attempt < 100; attempt++) { + webPageStore.flush(); + if (!webPageStore.schemaExists()) { + Thread.sleep(100L); + continue; + } + if (key == null) { + return; + } + try { + webPageStore.get(key, new String[] {"url"}); + return; + } catch (QueryParserException e) { + webPageStore.close(); + webPageStore = testDriver.createDataStore(String.class, WebPage.class); + } + } + fail("Hive web page schema or record was not visible"); + } + + private void populateWebPages() throws Exception { + // load deterministic fixtures and block until every inserted URL becomes queryable + webPageStore.createSchema(); + awaitWebPageSchema(null); + WebPageDataCreator.createWebPageData(webPageStore); + for (String url : WebPageDataCreator.URLS) { + awaitWebPageSchema(url); + } + } + + private List sortedWebPageUrls() { + // copy and sort URLs to get a stable ordering for range expectations + List sorted = new ArrayList<>(Arrays.asList(WebPageDataCreator.URLS)); + Collections.sort(sorted); + return sorted; + } + + private void assertResultSize(boolean setStartKey, boolean setEndKey, boolean setLimit) + throws Exception { + // exhaustively exercise start/end/limit combinations while guarding against stale reads + populateWebPages(); + List urls = sortedWebPageUrls(); + int total = urls.size(); + + for (int i = 0, iLimit = setStartKey ? total : 1; i < iLimit; i++) { + // choose every permissible end index for the current start index + int jStart = setEndKey ? i : total - 1; + int jLimit = setEndKey ? total : jStart + 1; + for (int j = jStart; j < jLimit; j++) { + Query query = webPageStore.newQuery(); + if (setStartKey) { + query.setStartKey(urls.get(i)); + } + if (setEndKey) { + query.setEndKey(urls.get(j)); + } + + int startIndex = setStartKey ? i : 0; + int endExclusive = setEndKey ? j + 1 : total; + List eligibleUrls = new ArrayList<>(urls.subList(startIndex, endExclusive)); + Set allowed = new HashSet<>(eligibleUrls); // membership guard for returned URLs + LinkedHashSet actual = new LinkedHashSet<>(); // record unique URLs actually read + + int expectedCount = eligibleUrls.size(); + if (setLimit) { + int limit = expectedCount / 2; + if (limit == 0) { + continue; + } + query.setLimit(limit); + expectedCount = Math.min(limit, expectedCount); + } + + Result result = query.execute(); + int reportedSize = result.size(); + while (result.next()) { + WebPage page = result.get(); + DataStoreTestUtil.assertWebPage( + page, WebPageDataCreator.URL_INDEXES.get(page.getUrl().toString())); + String url = page.getUrl().toString(); + assertTrue("Unexpected url returned: " + url, allowed.contains(url)); + assertTrue("Duplicate url returned: " + url, actual.add(url)); + } + result.close(); + + if (!setLimit) { + // full-range scans should yield every eligible URL, in deterministic order + assertEquals(new LinkedHashSet<>(eligibleUrls), actual); + } else { + // limited queries only guarantee the expected cardinality + assertEquals(expectedCount, actual.size()); + } + // Result.size() must match the number of rows we iterated over + assertEquals(expectedCount, reportedSize); + } + } + } + + @Override + public void testResultSize() throws Exception { + assertResultSize(false, false, false); + } + + @Override + public void testResultSizeEndKey() throws Exception { + assertResultSize(false, true, false); + } + + @Override + public void testResultSizeEndKeyWithLimit() throws Exception { + assertResultSize(false, true, true); + } + + @Override + public void testResultSizeWithLimit() throws Exception { + assertResultSize(false, false, true); + } + + @Override + public void testResultSizeKeyRange() throws Exception { + assertResultSize(true, true, false); + } + + @Override + public void testResultSizeKeyRangeWithLimit() throws Exception { + assertResultSize(true, true, true); + } + + @Override + public void testResultSizeStartKey() throws Exception { + assertResultSize(true, false, false); + } + + @Override + public void testResultSizeStartKeyWithLimit() throws Exception { + assertResultSize(true, false, true); + } + @Override public void assertPut(Employee employee) throws GoraException { employeeStore.put(employee.getSsn().toString(), employee);