Skip to content

Commit 186fb22

Browse files
Merge pull request #644 from apache/compressed_iterator
use iteration in set ops, wrap compressed sketch and unpack in iterator
2 parents 0424666 + 29895b3 commit 186fb22

File tree

10 files changed

+341
-101
lines changed

10 files changed

+341
-101
lines changed

src/main/java/org/apache/datasketches/theta/AnotBimpl.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
package org.apache.datasketches.theta;
2121

2222
import static org.apache.datasketches.common.Util.exactLog2OfLong;
23-
import static org.apache.datasketches.thetacommon.HashOperations.convertToHashTable;
23+
import static org.apache.datasketches.thetacommon.HashOperations.checkThetaCorruption;
24+
import static org.apache.datasketches.thetacommon.HashOperations.continueCondition;
2425
import static org.apache.datasketches.thetacommon.HashOperations.hashSearch;
26+
import static org.apache.datasketches.thetacommon.HashOperations.hashSearchOrInsert;
27+
import static org.apache.datasketches.thetacommon.HashOperations.minLgHashTableSize;
2528

2629
import java.util.Arrays;
2730

@@ -124,7 +127,7 @@ public CompactSketch aNotB(final Sketch skA, final Sketch skB, final boolean dst
124127

125128
if (skB.isEmpty()) {
126129
return skA.compact(dstOrdered, dstMem);
127-
}
130+
}
128131
ThetaUtil.checkSeedHashes(skB.getSeedHash(), seedHash_);
129132
//Both skA & skB are not empty
130133

@@ -162,14 +165,12 @@ private static long[] getResultHashArr( //returns a new array
162165
final long[] hashArrA,
163166
final Sketch skB) {
164167

165-
//Rebuild/get hashtable of skB
168+
// Rebuild or get hashtable of skB
166169
final long[] hashTableB; //read only
167-
final long[] thetaCache = skB.getCache();
168-
final int countB = skB.getRetainedEntries(true);
169170
if (skB instanceof CompactSketch) {
170-
hashTableB = convertToHashTable(thetaCache, countB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD);
171+
hashTableB = convertToHashTable(skB, minThetaLong, ThetaUtil.REBUILD_THRESHOLD);
171172
} else {
172-
hashTableB = thetaCache;
173+
hashTableB = skB.getCache();
173174
}
174175

175176
//build temporary result arrays of skA
@@ -191,6 +192,25 @@ private static long[] getResultHashArr( //returns a new array
191192
return Arrays.copyOfRange(tmpHashArrA, 0, nonMatches);
192193
}
193194

195+
private static long[] convertToHashTable(
196+
final Sketch sketch,
197+
final long thetaLong,
198+
final double rebuildThreshold) {
199+
final int lgArrLongs = minLgHashTableSize(sketch.getRetainedEntries(true), rebuildThreshold);
200+
final int arrLongs = 1 << lgArrLongs;
201+
final long[] hashTable = new long[arrLongs];
202+
checkThetaCorruption(thetaLong);
203+
final HashIterator it = sketch.iterator();
204+
while (it.next()) {
205+
final long hash = it.get();
206+
if (continueCondition(thetaLong, hash) ) {
207+
continue;
208+
}
209+
hashSearchOrInsert(hashTable, lgArrLongs, hash);
210+
}
211+
return hashTable;
212+
}
213+
194214
private void reset() {
195215
thetaLong_ = Long.MAX_VALUE;
196216
empty_ = true;

src/main/java/org/apache/datasketches/theta/CompactOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static CompactSketch memoryToCompact(
161161
final long hash = srcMem.getLong(srcPreLongs << 3);
162162
final SingleItemSketch sis = new SingleItemSketch(hash, srcSeedHash);
163163
if (dstMem != null) {
164-
dstMem.putByteArray(0, sis.toByteArray(),0, 16);
164+
dstMem.putByteArray(0, sis.toByteArray(), 0, 16);
165165
return new DirectCompactSketch(dstMem);
166166
} else { //heap
167167
return sis;

src/main/java/org/apache/datasketches/theta/CompactSketch.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4;
3333
import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4;
3434
import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4;
35+
import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits;
3536
import static org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem;
3637

3738
import org.apache.datasketches.common.Family;
@@ -189,7 +190,8 @@ private static CompactSketch wrap(final Memory srcMem, final long seed, final bo
189190
if (serVer == 4) {
190191
// not wrapping the compressed format since currently we cannot take advantage of
191192
// decompression during iteration because set operations reach into memory directly
192-
return heapifyV4(srcMem, seed, enforceSeed);
193+
return DirectCompactCompressedSketch.wrapInstance(srcMem,
194+
enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
193195
}
194196
else if (serVer == 3) {
195197
if (PreambleUtil.isEmptyFlag(srcMem)) {
@@ -274,10 +276,6 @@ private int computeMinLeadingZeros() {
274276
return Long.numberOfLeadingZeros(ored);
275277
}
276278

277-
private static int wholeBytesToHoldBits(final int bits) {
278-
return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
279-
}
280-
281279
private byte[] toByteArrayV4() {
282280
final int preambleLongs = isEstimationMode() ? 2 : 1;
283281
final int entryBits = 64 - computeMinLeadingZeros();
@@ -286,8 +284,8 @@ private byte[] toByteArrayV4() {
286284
// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
287285
final int numEntriesBytes = wholeBytesToHoldBits(32 - Integer.numberOfLeadingZeros(getRetainedEntries()));
288286

289-
final int size = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits);
290-
final byte[] bytes = new byte[size];
287+
final int sizeBytes = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits);
288+
final byte[] bytes = new byte[sizeBytes];
291289
final WritableMemory mem = WritableMemory.writableWrap(bytes);
292290
int offsetBytes = 0;
293291
mem.putByte(offsetBytes++, (byte) preambleLongs);
@@ -334,12 +332,10 @@ private byte[] toByteArrayV4() {
334332

335333
private static CompactSketch heapifyV4(final Memory srcMem, final long seed, final boolean enforceSeed) {
336334
final int preLongs = extractPreLongs(srcMem);
337-
final int flags = extractFlags(srcMem);
338335
final int entryBits = extractEntryBitsV4(srcMem);
339336
final int numEntriesBytes = extractNumEntriesBytesV4(srcMem);
340337
final short seedHash = (short) extractSeedHash(srcMem);
341-
final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0;
342-
if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
338+
if (enforceSeed) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
343339
int offsetBytes = 8;
344340
long theta = Long.MAX_VALUE;
345341
if (preLongs > 1) {
@@ -374,7 +370,7 @@ private static CompactSketch heapifyV4(final Memory srcMem, final long seed, fin
374370
entries[i] += previous;
375371
previous = entries[i];
376372
}
377-
return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, theta, true);
373+
return new HeapCompactSketch(entries, false, seedHash, numEntries, theta, true);
378374
}
379375

380376
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datasketches.theta;
21+
22+
import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4;
23+
import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4;
24+
import static org.apache.datasketches.theta.PreambleUtil.extractPreLongs;
25+
import static org.apache.datasketches.theta.PreambleUtil.extractSeedHash;
26+
import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4;
27+
import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits;
28+
29+
import org.apache.datasketches.memory.Memory;
30+
import org.apache.datasketches.memory.WritableMemory;
31+
import org.apache.datasketches.thetacommon.ThetaUtil;
32+
33+
/**
34+
* An off-heap (Direct), compact, compressed, read-only sketch. It is not empty, not a single item and ordered.
35+
*
36+
* <p>This sketch can only be associated with a Serialization Version 4 format binary image.</p>
37+
*
38+
* <p>This implementation uses data in a given Memory that is owned and managed by the caller.
39+
* This Memory can be off-heap, which if managed properly will greatly reduce the need for
40+
* the JVM to perform garbage collection.</p>
41+
*/
42+
class DirectCompactCompressedSketch extends DirectCompactSketch {
43+
/**
44+
* Construct this sketch with the given memory.
45+
* @param mem Read-only Memory object.
46+
*/
47+
DirectCompactCompressedSketch(final Memory mem) {
48+
super(mem);
49+
}
50+
51+
/**
52+
* Wraps the given Memory, which must be a SerVer 4 compressed CompactSketch image.
53+
* Must check the validity of the Memory before calling.
54+
* @param srcMem <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
55+
* @param seedHash The update seedHash.
56+
* <a href="{@docRoot}/resources/dictionary.html#seedHash">See Seed Hash</a>.
57+
* @return this sketch
58+
*/
59+
static DirectCompactCompressedSketch wrapInstance(final Memory srcMem, final short seedHash) {
60+
ThetaUtil.checkSeedHashes((short) extractSeedHash(srcMem), seedHash);
61+
return new DirectCompactCompressedSketch(srcMem);
62+
}
63+
64+
//Sketch Overrides
65+
66+
@Override
67+
public CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem) {
68+
if (dstMem != null) {
69+
mem_.copyTo(0, dstMem, 0, getCurrentBytes());
70+
return new DirectCompactSketch(dstMem);
71+
}
72+
return CompactSketch.heapify(mem_);
73+
}
74+
75+
@Override
76+
public int getCurrentBytes() {
77+
final int preLongs = extractPreLongs(mem_);
78+
final int entryBits = extractEntryBitsV4(mem_);
79+
final int numEntriesBytes = extractNumEntriesBytesV4(mem_);
80+
return preLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(getRetainedEntries() * entryBits);
81+
}
82+
83+
private static final int START_PACKED_DATA_EXACT_MODE = 8;
84+
private static final int START_PACKED_DATA_ESTIMATION_MODE = 16;
85+
86+
@Override
87+
public int getRetainedEntries(final boolean valid) { //compact is always valid
88+
// number of entries is stored using variable length encoding
89+
// most significant bytes with all zeros are not stored
90+
// one byte in the preamble has the number of non-zero bytes used
91+
final int preLongs = extractPreLongs(mem_); // if > 1 then the second long has theta
92+
final int numEntriesBytes = extractNumEntriesBytesV4(mem_);
93+
int offsetBytes = preLongs > 1 ? START_PACKED_DATA_ESTIMATION_MODE : START_PACKED_DATA_EXACT_MODE;
94+
int numEntries = 0;
95+
for (int i = 0; i < numEntriesBytes; i++) {
96+
numEntries |= Byte.toUnsignedInt(mem_.getByte(offsetBytes++)) << (i << 3);
97+
}
98+
return numEntries;
99+
}
100+
101+
@Override
102+
public long getThetaLong() {
103+
final int preLongs = extractPreLongs(mem_);
104+
return (preLongs > 1) ? extractThetaLongV4(mem_) : Long.MAX_VALUE;
105+
}
106+
107+
@Override
108+
public boolean isEmpty() {
109+
return false;
110+
}
111+
112+
@Override
113+
public boolean isOrdered() {
114+
return true;
115+
}
116+
117+
@Override
118+
public HashIterator iterator() {
119+
return new MemoryCompactCompressedHashIterator(
120+
mem_,
121+
(extractPreLongs(mem_) > 1 ? 16 : 8) + extractNumEntriesBytesV4(mem_),
122+
extractEntryBitsV4(mem_),
123+
getRetainedEntries()
124+
);
125+
}
126+
127+
//restricted methods
128+
129+
@Override
130+
long[] getCache() {
131+
final int numEntries = getRetainedEntries();
132+
final long[] cache = new long[numEntries];
133+
int i = 0;
134+
HashIterator it = iterator();
135+
while (it.next()) {
136+
cache[i++] = it.get();
137+
}
138+
return cache;
139+
}
140+
}

src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,7 @@ public int getCurrentBytes() {
8686

8787
@Override
8888
public double getEstimate() {
89-
if (otherCheckForSingleItem(mem_)) { return 1; }
90-
final int preLongs = extractPreLongs(mem_);
91-
final int curCount = (preLongs == 1) ? 0 : extractCurCount(mem_);
92-
final long thetaLong = (preLongs > 2) ? extractThetaLong(mem_) : Long.MAX_VALUE;
93-
return Sketch.estimate(thetaLong, curCount);
89+
return Sketch.estimate(getThetaLong(), getRetainedEntries());
9490
}
9591

9692
@Override
@@ -142,10 +138,8 @@ public HashIterator iterator() {
142138

143139
@Override
144140
public byte[] toByteArray() {
145-
final int curCount = getRetainedEntries(true);
146-
checkIllegalCurCountAndEmpty(isEmpty(), curCount);
147-
final int preLongs = extractPreLongs(mem_);
148-
final int outBytes = (curCount + preLongs) << 3;
141+
checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries());
142+
final int outBytes = getCurrentBytes();
149143
final byte[] byteArrOut = new byte[outBytes];
150144
mem_.getByteArray(0, byteArrOut, 0, outBytes);
151145
return byteArrOut;

src/main/java/org/apache/datasketches/theta/IntersectionImpl.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ else if (curCount_ < 0 && sketchInEntries > 0) {
288288
else { //On the heap, allocate a HT
289289
hashTable_ = new long[1 << lgArrLongs_];
290290
}
291-
moveDataToTgt(sketchIn.getCache(), curCount_);
291+
moveDataToTgt(sketchIn);
292292
} //end of state 5
293293

294294
//state 7
@@ -434,8 +434,6 @@ long getThetaLong() {
434434
private void performIntersect(final Sketch sketchIn) {
435435
// curCount and input data are nonzero, match against HT
436436
assert curCount_ > 0 && !empty_;
437-
final long[] cacheIn = sketchIn.getCache();
438-
final int arrLongsIn = cacheIn.length;
439437
final long[] hashTable;
440438
if (wmem_ != null) {
441439
final int htLen = 1 << lgArrLongs_;
@@ -448,27 +446,17 @@ private void performIntersect(final Sketch sketchIn) {
448446
final long[] matchSet = new long[ min(curCount_, sketchIn.getRetainedEntries(true)) ];
449447

450448
int matchSetCount = 0;
451-
if (sketchIn.isOrdered()) {
452-
//ordered compact, which enables early stop
453-
for (int i = 0; i < arrLongsIn; i++ ) {
454-
final long hashIn = cacheIn[i];
455-
//if (hashIn <= 0L) continue; //<= 0 should not happen
456-
if (hashIn >= thetaLong_) {
457-
break; //early stop assumes that hashes in input sketch are ordered!
458-
}
449+
final boolean isOrdered = sketchIn.isOrdered();
450+
final HashIterator it = sketchIn.iterator();
451+
while (it.next()) {
452+
final long hashIn = it.get();
453+
if (hashIn < thetaLong_) {
459454
final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
460-
if (foundIdx == -1) { continue; }
461-
matchSet[matchSetCount++] = hashIn;
462-
}
463-
}
464-
else {
465-
//either unordered compact or hash table
466-
for (int i = 0; i < arrLongsIn; i++ ) {
467-
final long hashIn = cacheIn[i];
468-
if (hashIn <= 0L || hashIn >= thetaLong_) { continue; }
469-
final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
470-
if (foundIdx == -1) { continue; }
471-
matchSet[matchSetCount++] = hashIn;
455+
if (foundIdx != -1) {
456+
matchSet[matchSetCount++] = hashIn;
457+
}
458+
} else {
459+
if (isOrdered) { break; } // early stop
472460
}
473461
}
474462
//reduce effective array size to minimum
@@ -515,6 +503,32 @@ private void moveDataToTgt(final long[] arr, final int count) {
515503
assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count;
516504
}
517505

506+
private void moveDataToTgt(final Sketch sketch) {
507+
int count = sketch.getRetainedEntries();
508+
int tmpCnt = 0;
509+
if (wmem_ != null) { //Off Heap puts directly into mem
510+
final int preBytes = CONST_PREAMBLE_LONGS << 3;
511+
final int lgArrLongs = lgArrLongs_;
512+
final long thetaLong = thetaLong_;
513+
HashIterator it = sketch.iterator();
514+
while (it.next()) {
515+
final long hash = it.get();
516+
if (continueCondition(thetaLong, hash)) { continue; }
517+
hashInsertOnlyMemory(wmem_, lgArrLongs, hash, preBytes);
518+
tmpCnt++;
519+
}
520+
} else { //On Heap. Assumes HT exists and is large enough
521+
HashIterator it = sketch.iterator();
522+
while (it.next()) {
523+
final long hash = it.get();
524+
if (continueCondition(thetaLong_, hash)) { continue; }
525+
hashInsertOnly(hashTable_, lgArrLongs_, hash);
526+
tmpCnt++;
527+
}
528+
}
529+
assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", expected: " + count;
530+
}
531+
518532
private void hardReset() {
519533
resetCommon();
520534
if (wmem_ != null) {

0 commit comments

Comments
 (0)