diff --git a/datasketches/src/frequencies/mod.rs b/datasketches/src/frequencies/mod.rs new file mode 100644 index 0000000..9462fb3 --- /dev/null +++ b/datasketches/src/frequencies/mod.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Frequency sketches for finding heavy hitters in data streams. + +mod reverse_purge_item_hash_map; +mod serialization; +mod sketch; + +pub mod serde; + +pub use serde::I64Serde; +pub use serde::ItemsSerde; +pub use serde::StringSerde; + +pub use self::sketch::ErrorType; +pub use self::sketch::FrequentItemsSketch; +pub use self::sketch::Row; diff --git a/datasketches/src/frequencies/reverse_purge_item_hash_map.rs b/datasketches/src/frequencies/reverse_purge_item_hash_map.rs new file mode 100644 index 0000000..1b6dec3 --- /dev/null +++ b/datasketches/src/frequencies/reverse_purge_item_hash_map.rs @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reverse purge hash map for generic items. + +use std::hash::Hash; +use std::hash::Hasher; + +use crate::hash::MurmurHash3X64128; + +const LOAD_FACTOR: f64 = 0.75; +const DRIFT_LIMIT: usize = 1024; +const MAX_SAMPLE_SIZE: usize = 1024; + +#[derive(Debug, Clone)] +pub struct ReversePurgeItemHashMap { + lg_length: u8, + load_threshold: usize, + keys: Vec>, + values: Vec, + states: Vec, + num_active: usize, +} + +impl ReversePurgeItemHashMap { + pub fn new(map_size: usize) -> Self { + assert!(map_size.is_power_of_two(), "map_size must be power of 2"); + let lg_length = map_size.trailing_zeros() as u8; + let load_threshold = (map_size as f64 * LOAD_FACTOR) as usize; + Self { + lg_length, + load_threshold, + keys: (0..map_size).map(|_| None).collect(), + values: vec![0; map_size], + states: vec![0; map_size], + num_active: 0, + } + } + + pub fn get(&self, key: &T) -> i64 { + let probe = self.hash_probe(key); + if self.states[probe] > 0 { + return self.values[probe]; + } + 0 + } + + pub fn adjust_or_put_value(&mut self, key: T, adjust_amount: i64) { + let mask = self.keys.len() - 1; + let mut probe = (hash_item(&key) as usize) & mask; + let mut drift: usize = 1; + while self.states[probe] != 0 { + let matches = self.keys[probe] + .as_ref() + .map(|existing| existing == &key) + .unwrap_or(false); + if matches { + break; + } + probe = (probe + 1) & mask; + drift += 1; + debug_assert!(drift < DRIFT_LIMIT, "drift limit exceeded"); + } + if self.states[probe] == 0 { + self.keys[probe] = Some(key); + self.values[probe] = adjust_amount; + self.states[probe] = drift as u16; + self.num_active += 1; + } else { + self.values[probe] += adjust_amount; + } + } + + pub fn keep_only_positive_counts(&mut self) { + let len = self.keys.len(); + let mut first_probe = len - 1; + while self.states[first_probe] > 0 { + first_probe -= 1; + } + for probe in (0..first_probe).rev() { + if self.states[probe] > 0 && self.values[probe] <= 0 { + self.hash_delete(probe); + self.num_active -= 1; + } + } + for probe in (first_probe..len).rev() { + if self.states[probe] > 0 && self.values[probe] <= 0 { + self.hash_delete(probe); + self.num_active -= 1; + } + } + } + + pub fn adjust_all_values_by(&mut self, adjust_amount: i64) { + for value in &mut self.values { + *value += adjust_amount; + } + } + + pub fn purge(&mut self, sample_size: usize) -> i64 { + let limit = sample_size.min(self.num_active).min(MAX_SAMPLE_SIZE); + let mut samples = Vec::with_capacity(limit); + let mut i = 0usize; + while samples.len() < limit { + if self.is_active(i) { + samples.push(self.values[i]); + } + i += 1; + } + let mid = samples.len() / 2; + samples.select_nth_unstable(mid); + let median = samples[mid]; + self.adjust_all_values_by(-median); + self.keep_only_positive_counts(); + median + } + + pub fn resize(&mut self, new_size: usize) { + assert!(new_size.is_power_of_two(), "new_size must be power of 2"); + let mut old_keys = std::mem::take(&mut self.keys); + let old_values = std::mem::take(&mut self.values); + let old_states = std::mem::take(&mut self.states); + self.keys = (0..new_size).map(|_| None).collect(); + self.values = vec![0; new_size]; + self.states = vec![0; new_size]; + self.lg_length = new_size.trailing_zeros() as u8; + self.load_threshold = (new_size as f64 * LOAD_FACTOR) as usize; + self.num_active = 0; + for i in 0..old_keys.len() { + if old_states[i] > 0 { + if let Some(key) = old_keys[i].take() { + self.adjust_or_put_value(key, old_values[i]); + } + } + } + } + + pub fn get_length(&self) -> usize { + self.keys.len() + } + + pub fn get_lg_length(&self) -> u8 { + self.lg_length + } + + pub fn get_capacity(&self) -> usize { + self.load_threshold + } + + pub fn get_num_active(&self) -> usize { + self.num_active + } + + pub fn get_active_keys(&self) -> Vec + where + T: Clone, + { + if self.num_active == 0 { + return Vec::new(); + } + let mut keys = Vec::with_capacity(self.num_active); + for i in 0..self.keys.len() { + if self.states[i] > 0 { + if let Some(key) = self.keys[i].as_ref() { + keys.push(key.clone()); + } + } + } + keys + } + + pub fn get_active_values(&self) -> Vec { + if self.num_active == 0 { + return Vec::new(); + } + let mut values = Vec::with_capacity(self.num_active); + for i in 0..self.values.len() { + if self.states[i] > 0 { + values.push(self.values[i]); + } + } + values + } + + pub fn iter(&self) -> ReversePurgeItemIter<'_, T> { + ReversePurgeItemIter::new(self) + } + + fn is_active(&self, probe: usize) -> bool { + self.states[probe] > 0 + } + + fn hash_probe(&self, key: &T) -> usize { + let mask = self.keys.len() - 1; + let mut probe = (hash_item(key) as usize) & mask; + while self.states[probe] > 0 { + let matches = self.keys[probe] + .as_ref() + .map(|existing| existing == key) + .unwrap_or(false); + if matches { + break; + } + probe = (probe + 1) & mask; + } + probe + } + + fn hash_delete(&mut self, mut delete_probe: usize) { + self.states[delete_probe] = 0; + self.keys[delete_probe] = None; + let mut drift: usize = 1; + let mask = self.keys.len() - 1; + let mut probe = (delete_probe + drift) & mask; + while self.states[probe] != 0 { + if self.states[probe] as usize > drift { + self.keys[delete_probe] = self.keys[probe].take(); + self.values[delete_probe] = self.values[probe]; + self.states[delete_probe] = self.states[probe] - drift as u16; + self.states[probe] = 0; + drift = 0; + delete_probe = probe; + } + probe = (probe + 1) & mask; + drift += 1; + debug_assert!(drift < DRIFT_LIMIT, "drift limit exceeded"); + } + } +} + +pub struct ReversePurgeItemIter<'a, T> { + map: &'a ReversePurgeItemHashMap, + index: usize, + count: usize, + stride: usize, + mask: usize, +} + +impl<'a, T> ReversePurgeItemIter<'a, T> { + fn new(map: &'a ReversePurgeItemHashMap) -> Self { + let size = map.keys.len(); + let stride = ((size as f64 * 0.6180339887498949) as usize) | 1; + let mask = size - 1; + let index = 0usize.wrapping_sub(stride); + Self { + map, + index, + count: 0, + stride, + mask, + } + } +} + +impl<'a, T> Iterator for ReversePurgeItemIter<'a, T> { + type Item = (&'a T, i64); + + fn next(&mut self) -> Option { + if self.count >= self.map.num_active { + return None; + } + loop { + self.index = self.index.wrapping_add(self.stride) & self.mask; + if self.map.states[self.index] > 0 { + self.count += 1; + let key = self.map.keys[self.index] + .as_ref() + .expect("active key missing"); + return Some((key, self.map.values[self.index])); + } + } + } +} + +#[inline] +fn hash_item(item: &T) -> u64 { + let mut hasher = MurmurHash3X64128::default(); + item.hash(&mut hasher); + hasher.finish() +} diff --git a/datasketches/src/frequencies/serde.rs b/datasketches/src/frequencies/serde.rs new file mode 100644 index 0000000..e4b1aad --- /dev/null +++ b/datasketches/src/frequencies/serde.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Serialization helpers for frequent items sketches. + +use std::str; + +use crate::error::SerdeError; + +/// Serializer/deserializer for items stored in a frequency sketch. +pub trait ItemsSerde { + /// Serializes a slice of items to a byte buffer. + fn serialize_items(&self, items: &[T]) -> Vec; + + /// Deserializes `num_items` from bytes, returning items and bytes consumed. + fn deserialize_items( + &self, + bytes: &[u8], + num_items: usize, + ) -> Result<(Vec, usize), SerdeError>; +} + +/// Serializer for UTF-8 strings compatible with ArrayOfStringsSerDe in Java. +#[derive(Debug, Default, Clone, Copy)] +pub struct StringSerde; + +impl ItemsSerde for StringSerde { + fn serialize_items(&self, items: &[String]) -> Vec { + let total_len: usize = items.iter().map(|item| 4 + item.len()).sum(); + let mut out = Vec::with_capacity(total_len); + for item in items { + let bytes = item.as_bytes(); + let len = bytes.len() as u32; + out.extend_from_slice(&len.to_le_bytes()); + out.extend_from_slice(bytes); + } + out + } + + fn deserialize_items( + &self, + bytes: &[u8], + num_items: usize, + ) -> Result<(Vec, usize), SerdeError> { + if num_items == 0 { + return Ok((Vec::new(), 0)); + } + let mut items = Vec::with_capacity(num_items); + let mut offset = 0usize; + for _ in 0..num_items { + if offset + 4 > bytes.len() { + return Err(SerdeError::InsufficientData( + "not enough bytes for string length".to_string(), + )); + } + let len = u32::from_le_bytes([ + bytes[offset], + bytes[offset + 1], + bytes[offset + 2], + bytes[offset + 3], + ]) as usize; + offset += 4; + if offset + len > bytes.len() { + return Err(SerdeError::InsufficientData( + "not enough bytes for string payload".to_string(), + )); + } + let slice = &bytes[offset..offset + len]; + let value = match str::from_utf8(slice) { + Ok(s) => s.to_string(), + Err(_) => { + return Err(SerdeError::MalformedData( + "invalid UTF-8 string payload".to_string(), + )); + } + }; + items.push(value); + offset += len; + } + Ok((items, offset)) + } +} + +/// Serializer for i64 items compatible with ArrayOfLongsSerDe in Java. +#[derive(Debug, Default, Clone, Copy)] +pub struct I64Serde; + +impl ItemsSerde for I64Serde { + fn serialize_items(&self, items: &[i64]) -> Vec { + let mut out = Vec::with_capacity(items.len() * 8); + for item in items { + out.extend_from_slice(&item.to_le_bytes()); + } + out + } + + fn deserialize_items( + &self, + bytes: &[u8], + num_items: usize, + ) -> Result<(Vec, usize), SerdeError> { + let needed = num_items + .checked_mul(8) + .ok_or_else(|| SerdeError::MalformedData("items size overflow".to_string()))?; + if bytes.len() < needed { + return Err(SerdeError::InsufficientData( + "not enough bytes for i64 items".to_string(), + )); + } + let mut items = Vec::with_capacity(num_items); + for i in 0..num_items { + let offset = i * 8; + let value = i64::from_le_bytes([ + bytes[offset], + bytes[offset + 1], + bytes[offset + 2], + bytes[offset + 3], + bytes[offset + 4], + bytes[offset + 5], + bytes[offset + 6], + bytes[offset + 7], + ]); + items.push(value); + } + Ok((items, needed)) + } +} diff --git a/datasketches/src/frequencies/serialization.rs b/datasketches/src/frequencies/serialization.rs new file mode 100644 index 0000000..24e8952 --- /dev/null +++ b/datasketches/src/frequencies/serialization.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Serialization constants and helpers for frequency sketches. + +/// Family ID for frequency sketches. +pub const FAMILY_ID: u8 = 10; +/// Serialization version. +pub const SER_VER: u8 = 1; + +/// Preamble longs for empty sketch. +pub const PREAMBLE_LONGS_EMPTY: u8 = 1; +/// Preamble longs for non-empty sketch. +pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4; + +/// Empty flag mask (both bits for compatibility). +pub const EMPTY_FLAG_MASK: u8 = 5; + +/// Offset of preamble longs byte. +pub const PREAMBLE_LONGS_BYTE: usize = 0; +/// Offset of serialization version byte. +pub const SER_VER_BYTE: usize = 1; +/// Offset of family ID byte. +pub const FAMILY_BYTE: usize = 2; +/// Offset of lg_max_map_size byte. +pub const LG_MAX_MAP_SIZE_BYTE: usize = 3; +/// Offset of lg_cur_map_size byte. +pub const LG_CUR_MAP_SIZE_BYTE: usize = 4; +/// Offset of flags byte. +pub const FLAGS_BYTE: usize = 5; + +/// Offset of active items int (low 32 bits of second pre-long). +pub const ACTIVE_ITEMS_INT: usize = 8; +/// Offset of stream weight (third pre-long). +pub const STREAM_WEIGHT_LONG: usize = 16; +/// Offset of offset (fourth pre-long). +pub const OFFSET_LONG: usize = 24; + +/// Read a u32 value from bytes at the given offset (little-endian). +#[inline] +pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 { + u32::from_le_bytes([ + bytes[offset], + bytes[offset + 1], + bytes[offset + 2], + bytes[offset + 3], + ]) +} + +/// Read an i64 value from bytes at the given offset (little-endian). +#[inline] +pub fn read_i64_le(bytes: &[u8], offset: usize) -> i64 { + i64::from_le_bytes([ + bytes[offset], + bytes[offset + 1], + bytes[offset + 2], + bytes[offset + 3], + bytes[offset + 4], + bytes[offset + 5], + bytes[offset + 6], + bytes[offset + 7], + ]) +} + +/// Write a u32 value to bytes at the given offset (little-endian). +#[inline] +pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) { + bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes()); +} + +/// Write an i64 value to bytes at the given offset (little-endian). +#[inline] +pub fn write_i64_le(bytes: &mut [u8], offset: usize, value: i64) { + bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes()); +} diff --git a/datasketches/src/frequencies/sketch.rs b/datasketches/src/frequencies/sketch.rs new file mode 100644 index 0000000..6ce28a3 --- /dev/null +++ b/datasketches/src/frequencies/sketch.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Frequent items sketch implementations. + +use std::hash::Hash; + +use crate::error::SerdeError; +use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap; +use crate::frequencies::serde::ItemsSerde; +use crate::frequencies::serialization::*; + +const LG_MIN_MAP_SIZE: u8 = 3; +const SAMPLE_SIZE: usize = 1024; +const EPSILON_FACTOR: f64 = 3.5; +const LOAD_FACTOR_NUMERATOR: usize = 3; +const LOAD_FACTOR_DENOMINATOR: usize = 4; + +/// Error guarantees for frequent item queries. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorType { + /// Include items if upper bound exceeds threshold (no false negatives). + NoFalseNegatives, + /// Include items if lower bound exceeds threshold (no false positives). + NoFalsePositives, +} + +/// Result row for frequent item queries. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Row { + item: T, + estimate: i64, + upper_bound: i64, + lower_bound: i64, +} + +impl Row { + /// Returns the item value. + pub fn item(&self) -> &T { + &self.item + } + + /// Returns the estimated frequency. + pub fn estimate(&self) -> i64 { + self.estimate + } + + /// Returns the upper bound for the frequency. + pub fn upper_bound(&self) -> i64 { + self.upper_bound + } + + /// Returns the lower bound for the frequency. + pub fn lower_bound(&self) -> i64 { + self.lower_bound + } +} + +/// Frequent items sketch for generic item types. +#[derive(Debug, Clone)] +pub struct FrequentItemsSketch { + lg_max_map_size: u8, + cur_map_cap: usize, + offset: i64, + stream_weight: i64, + sample_size: usize, + hash_map: ReversePurgeItemHashMap, +} + +impl FrequentItemsSketch { + /// Creates a new sketch with the given maximum map size (power of two). + pub fn new(max_map_size: usize) -> Self { + let lg_max_map_size = exact_log2(max_map_size); + Self::with_lg_map_sizes(lg_max_map_size, LG_MIN_MAP_SIZE) + } + + /// Returns true if the sketch is empty. + pub fn is_empty(&self) -> bool { + self.hash_map.get_num_active() == 0 + } + + /// Returns the number of active items being tracked. + pub fn get_num_active_items(&self) -> usize { + self.hash_map.get_num_active() + } + + /// Returns the total weight of the stream. + pub fn get_total_weight(&self) -> i64 { + self.stream_weight + } + + /// Returns the estimated frequency for an item. + pub fn get_estimate(&self, item: &T) -> i64 { + let value = self.hash_map.get(item); + if value > 0 { value + self.offset } else { 0 } + } + + /// Returns the lower bound for an item's frequency. + pub fn get_lower_bound(&self, item: &T) -> i64 { + self.hash_map.get(item) + } + + /// Returns the upper bound for an item's frequency. + pub fn get_upper_bound(&self, item: &T) -> i64 { + self.hash_map.get(item) + self.offset + } + + /// Returns the maximum error across all items. + pub fn get_maximum_error(&self) -> i64 { + self.offset + } + + /// Returns epsilon for this sketch. + pub fn get_epsilon(&self) -> f64 { + Self::get_epsilon_for_lg(self.lg_max_map_size) + } + + /// Returns epsilon for a sketch configured with `lg_max_map_size`. + pub fn get_epsilon_for_lg(lg_max_map_size: u8) -> f64 { + EPSILON_FACTOR / (1u64 << lg_max_map_size) as f64 + } + + /// Returns the a priori error estimate. + pub fn get_apriori_error(lg_max_map_size: u8, estimated_total_weight: i64) -> f64 { + Self::get_epsilon_for_lg(lg_max_map_size) * estimated_total_weight as f64 + } + + /// Returns the maximum map capacity for this sketch. + pub fn get_maximum_map_capacity(&self) -> usize { + (1usize << self.lg_max_map_size) * LOAD_FACTOR_NUMERATOR / LOAD_FACTOR_DENOMINATOR + } + + /// Returns the current map capacity. + pub fn get_current_map_capacity(&self) -> usize { + self.cur_map_cap + } + + /// Returns the configured lg_max_map_size. + pub fn get_lg_max_map_size(&self) -> u8 { + self.lg_max_map_size + } + + /// Returns the current map size in log2. + pub fn get_lg_cur_map_size(&self) -> u8 { + self.hash_map.get_lg_length() + } + + /// Updates the sketch with a count of one. + pub fn update(&mut self, item: T) { + self.update_with_count(item, 1); + } + + /// Updates the sketch with an item and count. + pub fn update_with_count(&mut self, item: T, count: i64) { + if count == 0 { + return; + } + assert!(count > 0, "count may not be negative"); + self.stream_weight += count; + self.hash_map.adjust_or_put_value(item, count); + self.maybe_resize_or_purge(); + } + + /// Merges another sketch into this one. + pub fn merge(&mut self, other: &Self) + where + T: Clone, + { + if other.is_empty() { + return; + } + let merged_total = self.stream_weight + other.stream_weight; + for (item, count) in other.hash_map.iter() { + self.update_with_count(item.clone(), count); + } + self.offset += other.offset; + self.stream_weight = merged_total; + } + + /// Resets the sketch to an empty state. + pub fn reset(&mut self) { + *self = Self::with_lg_map_sizes(self.lg_max_map_size, LG_MIN_MAP_SIZE); + } + + /// Returns frequent items using the sketch maximum error as threshold. + pub fn get_frequent_items(&self, error_type: ErrorType) -> Vec> + where + T: Clone, + { + self.get_frequent_items_with_threshold(error_type, self.offset) + } + + /// Returns frequent items using a custom threshold. + pub fn get_frequent_items_with_threshold( + &self, + error_type: ErrorType, + threshold: i64, + ) -> Vec> + where + T: Clone, + { + let threshold = threshold.max(self.offset); + let mut rows = Vec::new(); + for (item, count) in self.hash_map.iter() { + let lower = count; + let upper = count + self.offset; + let include = match error_type { + ErrorType::NoFalseNegatives => upper > threshold, + ErrorType::NoFalsePositives => lower > threshold, + }; + if include { + rows.push(Row { + item: item.clone(), + estimate: upper, + upper_bound: upper, + lower_bound: lower, + }); + } + } + rows.sort_by(|a, b| b.estimate.cmp(&a.estimate)); + rows + } + + /// Serializes this sketch into a byte vector using the provided serializer. + pub fn serialize_with>(&self, serde: &S) -> Vec + where + T: Clone, + { + if self.is_empty() { + let mut out = vec![0u8; 8]; + out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_EMPTY; + out[SER_VER_BYTE] = SER_VER; + out[FAMILY_BYTE] = FAMILY_ID; + out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size; + out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.get_lg_length(); + out[FLAGS_BYTE] = EMPTY_FLAG_MASK; + return out; + } + let active_items = self.get_num_active_items(); + let values = self.hash_map.get_active_values(); + let keys = self.hash_map.get_active_keys(); + let items_bytes = serde.serialize_items(&keys); + let total_bytes = + PREAMBLE_LONGS_NONEMPTY as usize * 8 + (active_items * 8) + items_bytes.len(); + let mut out = vec![0u8; total_bytes]; + out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_NONEMPTY; + out[SER_VER_BYTE] = SER_VER; + out[FAMILY_BYTE] = FAMILY_ID; + out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size; + out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.get_lg_length(); + out[FLAGS_BYTE] = 0; + write_u32_le(&mut out, ACTIVE_ITEMS_INT, active_items as u32); + write_i64_le(&mut out, STREAM_WEIGHT_LONG, self.stream_weight); + write_i64_le(&mut out, OFFSET_LONG, self.offset); + + let mut offset = PREAMBLE_LONGS_NONEMPTY as usize * 8; + for value in values { + write_i64_le(&mut out, offset, value); + offset += 8; + } + out[offset..offset + items_bytes.len()].copy_from_slice(&items_bytes); + out + } + + /// Deserializes a sketch from bytes using the provided serializer. + pub fn deserialize_with>(bytes: &[u8], serde: &S) -> Result + where + T: Clone, + { + if bytes.len() < 8 { + return Err(SerdeError::InsufficientData( + "insufficient data for preamble".to_string(), + )); + } + let pre_longs = bytes[PREAMBLE_LONGS_BYTE] & 0x3f; + let ser_ver = bytes[SER_VER_BYTE]; + let family = bytes[FAMILY_BYTE]; + let lg_max = bytes[LG_MAX_MAP_SIZE_BYTE]; + let lg_cur = bytes[LG_CUR_MAP_SIZE_BYTE]; + let flags = bytes[FLAGS_BYTE]; + let is_empty = (flags & EMPTY_FLAG_MASK) != 0; + if ser_ver != SER_VER { + return Err(SerdeError::UnsupportedVersion(format!( + "unsupported ser_ver {}", + ser_ver + ))); + } + if family != FAMILY_ID { + return Err(SerdeError::InvalidFamily(format!( + "expected family {}, got {}", + FAMILY_ID, family + ))); + } + if lg_cur > lg_max { + return Err(SerdeError::InvalidParameter( + "lg_cur_map_size exceeds lg_max_map_size".to_string(), + )); + } + if is_empty { + if pre_longs != PREAMBLE_LONGS_EMPTY { + return Err(SerdeError::MalformedData( + "empty sketch with invalid preamble size".to_string(), + )); + } + return Ok(Self::with_lg_map_sizes(lg_max, lg_cur)); + } + if pre_longs != PREAMBLE_LONGS_NONEMPTY { + return Err(SerdeError::MalformedData( + "non-empty sketch with invalid preamble size".to_string(), + )); + } + if bytes.len() < PREAMBLE_LONGS_NONEMPTY as usize * 8 { + return Err(SerdeError::InsufficientData( + "insufficient data for full preamble".to_string(), + )); + } + let active_items = read_u32_le(bytes, ACTIVE_ITEMS_INT) as usize; + let stream_weight = read_i64_le(bytes, STREAM_WEIGHT_LONG); + let offset_val = read_i64_le(bytes, OFFSET_LONG); + let values_offset = PREAMBLE_LONGS_NONEMPTY as usize * 8; + let values_bytes = active_items + .checked_mul(8) + .ok_or_else(|| SerdeError::MalformedData("values size overflow".to_string()))?; + let items_offset = values_offset + values_bytes; + if bytes.len() < items_offset { + return Err(SerdeError::InsufficientData( + "insufficient data for values".to_string(), + )); + } + let mut values = Vec::with_capacity(active_items); + for i in 0..active_items { + values.push(read_i64_le(bytes, values_offset + i * 8)); + } + let (items, consumed) = serde.deserialize_items(&bytes[items_offset..], active_items)?; + if items.len() != active_items { + return Err(SerdeError::MalformedData( + "item count mismatch during deserialization".to_string(), + )); + } + if consumed > bytes.len() - items_offset { + return Err(SerdeError::InsufficientData( + "insufficient data for items".to_string(), + )); + } + let mut sketch = Self::with_lg_map_sizes(lg_max, lg_cur); + for (item, value) in items.into_iter().zip(values) { + sketch.update_with_count(item, value); + } + sketch.stream_weight = stream_weight; + sketch.offset = offset_val; + Ok(sketch) + } + + fn maybe_resize_or_purge(&mut self) { + if self.hash_map.get_num_active() > self.cur_map_cap { + if self.hash_map.get_lg_length() < self.lg_max_map_size { + self.hash_map.resize(self.hash_map.get_length() * 2); + self.cur_map_cap = self.hash_map.get_capacity(); + } else { + let delta = self.hash_map.purge(self.sample_size); + self.offset += delta; + if self.hash_map.get_num_active() > self.get_maximum_map_capacity() { + panic!("purge did not reduce number of active items"); + } + } + } + } + + fn with_lg_map_sizes(lg_max_map_size: u8, lg_cur_map_size: u8) -> Self { + let lg_max = lg_max_map_size.max(LG_MIN_MAP_SIZE); + let lg_cur = lg_cur_map_size.max(LG_MIN_MAP_SIZE); + assert!( + lg_cur <= lg_max, + "lg_cur_map_size must not exceed lg_max_map_size" + ); + let map = ReversePurgeItemHashMap::new(1usize << lg_cur); + let cur_map_cap = map.get_capacity(); + let max_map_cap = (1usize << lg_max) * LOAD_FACTOR_NUMERATOR / LOAD_FACTOR_DENOMINATOR; + let sample_size = SAMPLE_SIZE.min(max_map_cap); + Self { + lg_max_map_size: lg_max, + cur_map_cap, + offset: 0, + stream_weight: 0, + sample_size, + hash_map: map, + } + } +} + +fn exact_log2(value: usize) -> u8 { + assert!(value.is_power_of_two(), "value must be power of 2"); + value.trailing_zeros() as u8 +} diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs index c566727..c471447 100644 --- a/datasketches/src/lib.rs +++ b/datasketches/src/lib.rs @@ -31,6 +31,7 @@ compile_error!("datasketches does not support big-endian targets"); pub mod error; +pub mod frequencies; pub mod hll; pub mod tdigest; diff --git a/datasketches/tests/frequencies_serialization_test.rs b/datasketches/tests/frequencies_serialization_test.rs new file mode 100644 index 0000000..d7252a8 --- /dev/null +++ b/datasketches/tests/frequencies_serialization_test.rs @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod common; + +use std::fs; + +use common::serialization_test_data; +use datasketches::error::SerdeError; +use datasketches::frequencies::FrequentItemsSketch; +use datasketches::frequencies::I64Serde; +use datasketches::frequencies::StringSerde; + +#[test] +fn test_longs_round_trip() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(32); + for i in 1..=100 { + sketch.update_with_count(i, i); + } + let serde = I64Serde; + let bytes = sketch.serialize_with(&serde); + let restored = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert_eq!(restored.get_total_weight(), sketch.get_total_weight()); + assert_eq!(restored.get_estimate(&42), sketch.get_estimate(&42)); + assert_eq!(restored.get_maximum_error(), sketch.get_maximum_error()); +} + +#[test] +fn test_items_round_trip() { + let mut sketch = FrequentItemsSketch::new(32); + sketch.update_with_count("alpha".to_string(), 3); + sketch.update_with_count("beta".to_string(), 5); + sketch.update_with_count("gamma".to_string(), 7); + + let serde = StringSerde; + let bytes = sketch.serialize_with(&serde); + let restored = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert_eq!(restored.get_total_weight(), sketch.get_total_weight()); + assert_eq!(restored.get_estimate(&"beta".to_string()), 5); + assert_eq!(restored.get_maximum_error(), sketch.get_maximum_error()); +} + +#[test] +fn test_java_frequent_longs_compatibility() { + let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000]; + let serde = I64Serde; + for n in test_cases { + let filename = format!("frequent_long_n{}_java.sk", n); + let path = serialization_test_data("java_generated_files", &filename); + let bytes = fs::read(&path).unwrap(); + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert_eq!(sketch.is_empty(), n == 0); + if n > 10 { + assert!(sketch.get_maximum_error() > 0); + } else { + assert_eq!(sketch.get_maximum_error(), 0); + } + assert_eq!(sketch.get_total_weight(), n as i64); + } +} + +#[test] +fn test_java_frequent_strings_ascii() { + let path = serialization_test_data("java_generated_files", "frequent_string_ascii_java.sk"); + let bytes = fs::read(&path).unwrap(); + let serde = StringSerde; + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_maximum_error(), 0); + assert_eq!(sketch.get_total_weight(), 10); + assert_eq!( + sketch.get_estimate(&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + 1 + ); + assert_eq!( + sketch.get_estimate(&"bbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()), + 2 + ); + assert_eq!( + sketch.get_estimate(&"ccccccccccccccccccccccccccccc".to_string()), + 3 + ); + assert_eq!( + sketch.get_estimate(&"ddddddddddddddddddddddddddddd".to_string()), + 4 + ); +} + +#[test] +fn test_java_frequent_strings_utf8() { + let path = serialization_test_data("java_generated_files", "frequent_string_utf8_java.sk"); + let bytes = fs::read(&path).unwrap(); + let serde = StringSerde; + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_maximum_error(), 0); + assert_eq!(sketch.get_total_weight(), 28); + assert_eq!(sketch.get_estimate(&"абвгд".to_string()), 1); + assert_eq!(sketch.get_estimate(&"еёжзи".to_string()), 2); + assert_eq!(sketch.get_estimate(&"йклмн".to_string()), 3); + assert_eq!(sketch.get_estimate(&"опрст".to_string()), 4); + assert_eq!(sketch.get_estimate(&"уфхцч".to_string()), 5); + assert_eq!(sketch.get_estimate(&"шщъыь".to_string()), 6); + assert_eq!(sketch.get_estimate(&"эюя".to_string()), 7); +} + +#[test] +fn test_cpp_frequent_longs_compatibility() { + let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000]; + let serde = I64Serde; + for n in test_cases { + let filename = format!("frequent_long_n{}_cpp.sk", n); + let path = serialization_test_data("cpp_generated_files", &filename); + let bytes = fs::read(&path).unwrap(); + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde); + if cfg!(windows) { + if let Err(err) = sketch { + assert!(matches!(err, SerdeError::InsufficientData(_))); + continue; + } + } + let sketch = sketch.unwrap(); + assert_eq!(sketch.is_empty(), n == 0); + if n > 10 { + assert!(sketch.get_maximum_error() > 0); + } else { + assert_eq!(sketch.get_maximum_error(), 0); + } + assert_eq!(sketch.get_total_weight(), n as i64); + } +} + +#[test] +fn test_cpp_frequent_strings_compatibility() { + let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000]; + for n in test_cases { + let filename = format!("frequent_string_n{}_cpp.sk", n); + let path = serialization_test_data("cpp_generated_files", &filename); + let bytes = fs::read(&path).unwrap(); + let serde = StringSerde; + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert_eq!(sketch.is_empty(), n == 0); + if n > 10 { + assert!(sketch.get_maximum_error() > 0); + } else { + assert_eq!(sketch.get_maximum_error(), 0); + } + assert_eq!(sketch.get_total_weight(), n as i64); + } +} + +#[test] +fn test_cpp_frequent_strings_ascii() { + let path = serialization_test_data("cpp_generated_files", "frequent_string_ascii_cpp.sk"); + let bytes = fs::read(&path).unwrap(); + let serde = StringSerde; + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_maximum_error(), 0); + assert_eq!(sketch.get_total_weight(), 10); + assert_eq!( + sketch.get_estimate(&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + 1 + ); + assert_eq!( + sketch.get_estimate(&"bbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()), + 2 + ); + assert_eq!( + sketch.get_estimate(&"ccccccccccccccccccccccccccccc".to_string()), + 3 + ); + assert_eq!( + sketch.get_estimate(&"ddddddddddddddddddddddddddddd".to_string()), + 4 + ); +} + +#[test] +fn test_cpp_frequent_strings_utf8() { + let path = serialization_test_data("cpp_generated_files", "frequent_string_utf8_cpp.sk"); + let bytes = fs::read(&path).unwrap(); + let serde = StringSerde; + let sketch = FrequentItemsSketch::deserialize_with(&bytes, &serde).unwrap(); + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_maximum_error(), 0); + assert_eq!(sketch.get_total_weight(), 28); + assert_eq!(sketch.get_estimate(&"абвгд".to_string()), 1); + assert_eq!(sketch.get_estimate(&"еёжзи".to_string()), 2); + assert_eq!(sketch.get_estimate(&"йклмн".to_string()), 3); + assert_eq!(sketch.get_estimate(&"опрст".to_string()), 4); + assert_eq!(sketch.get_estimate(&"уфхцч".to_string()), 5); + assert_eq!(sketch.get_estimate(&"шщъыь".to_string()), 6); + assert_eq!(sketch.get_estimate(&"эюя".to_string()), 7); +} diff --git a/datasketches/tests/frequencies_update_test.rs b/datasketches/tests/frequencies_update_test.rs new file mode 100644 index 0000000..7e20afe --- /dev/null +++ b/datasketches/tests/frequencies_update_test.rs @@ -0,0 +1,506 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datasketches::frequencies::ErrorType; +use datasketches::frequencies::FrequentItemsSketch; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct TestItem(i32); + +#[test] +fn test_longs_update_with_zero_count_is_noop() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, 0); + + assert!(sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 0); + assert_eq!(sketch.get_num_active_items(), 0); +} + +#[test] +fn test_items_update_with_zero_count_is_noop() { + let mut sketch = FrequentItemsSketch::new(8); + sketch.update_with_count("a".to_string(), 0); + + assert!(sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 0); + assert_eq!(sketch.get_num_active_items(), 0); +} + +#[test] +fn test_capacity_and_epsilon_helpers() { + let longs: FrequentItemsSketch = FrequentItemsSketch::new(8); + assert_eq!(longs.get_current_map_capacity(), 6); + assert_eq!(longs.get_maximum_map_capacity(), 6); + assert_eq!(longs.get_lg_cur_map_size(), 3); + assert_eq!(longs.get_lg_max_map_size(), 3); + + let epsilon = FrequentItemsSketch::::get_epsilon_for_lg(10); + let expected = 3.5 / 1024.0; + assert!((epsilon - expected).abs() < 1e-12); + + let apriori = FrequentItemsSketch::::get_apriori_error(10, 10_000); + assert!((apriori - expected * 10_000.0).abs() < 1e-9); + + let items: FrequentItemsSketch = FrequentItemsSketch::new(1024); + assert!((items.get_epsilon() - expected).abs() < 1e-12); + assert_eq!(items.get_current_map_capacity(), 6); + assert_eq!(items.get_maximum_map_capacity(), 768); + assert_eq!(items.get_lg_max_map_size(), 10); +} + +#[test] +fn test_longs_empty() { + let sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + + assert!(sketch.is_empty()); + assert_eq!(sketch.get_num_active_items(), 0); + assert_eq!(sketch.get_total_weight(), 0); + assert_eq!(sketch.get_estimate(&42), 0); + assert_eq!(sketch.get_lower_bound(&42), 0); + assert_eq!(sketch.get_upper_bound(&42), 0); + assert_eq!(sketch.get_maximum_error(), 0); +} + +#[test] +fn test_items_empty() { + let sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + let item = "a".to_string(); + + assert!(sketch.is_empty()); + assert_eq!(sketch.get_num_active_items(), 0); + assert_eq!(sketch.get_total_weight(), 0); + assert_eq!(sketch.get_estimate(&item), 0); + assert_eq!(sketch.get_lower_bound(&item), 0); + assert_eq!(sketch.get_upper_bound(&item), 0); + assert_eq!(sketch.get_maximum_error(), 0); +} + +#[test] +fn test_longs_one_item() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update(10); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_num_active_items(), 1); + assert_eq!(sketch.get_total_weight(), 1); + assert_eq!(sketch.get_estimate(&10), 1); + assert_eq!(sketch.get_lower_bound(&10), 1); + assert_eq!(sketch.get_upper_bound(&10), 1); +} + +#[test] +fn test_items_one_item() { + let mut sketch = FrequentItemsSketch::new(8); + let item = "a".to_string(); + sketch.update(item.clone()); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_num_active_items(), 1); + assert_eq!(sketch.get_total_weight(), 1); + assert_eq!(sketch.get_estimate(&item), 1); + assert_eq!(sketch.get_lower_bound(&item), 1); + assert_eq!(sketch.get_upper_bound(&item), 1); +} + +#[test] +fn test_longs_several_items_no_resize_no_purge() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update(1); + sketch.update(2); + sketch.update(3); + sketch.update(4); + sketch.update(2); + sketch.update(3); + sketch.update(2); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 7); + assert_eq!(sketch.get_num_active_items(), 4); + assert_eq!(sketch.get_estimate(&1), 1); + assert_eq!(sketch.get_estimate(&2), 3); + assert_eq!(sketch.get_estimate(&3), 2); + assert_eq!(sketch.get_estimate(&4), 1); + assert_eq!(sketch.get_maximum_error(), 0); +} + +#[test] +fn test_items_several_items_no_resize_no_purge() { + let mut sketch = FrequentItemsSketch::new(8); + let a = "a".to_string(); + let b = "b".to_string(); + let c = "c".to_string(); + let d = "d".to_string(); + sketch.update(a.clone()); + sketch.update(b.clone()); + sketch.update(c.clone()); + sketch.update(d.clone()); + sketch.update(b.clone()); + sketch.update(c.clone()); + sketch.update(b.clone()); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 7); + assert_eq!(sketch.get_num_active_items(), 4); + assert_eq!(sketch.get_estimate(&a), 1); + assert_eq!(sketch.get_estimate(&b), 3); + assert_eq!(sketch.get_estimate(&c), 2); + assert_eq!(sketch.get_estimate(&d), 1); + assert_eq!(sketch.get_maximum_error(), 0); + + let rows = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows.len(), 4); + + let rows = sketch.get_frequent_items_with_threshold(ErrorType::NoFalsePositives, 2); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].item(), &b); + + sketch.reset(); + assert!(sketch.is_empty()); + assert_eq!(sketch.get_num_active_items(), 0); + assert_eq!(sketch.get_total_weight(), 0); +} + +#[test] +fn test_items_several_items_with_resize_no_purge() { + let mut sketch = FrequentItemsSketch::new(16); + let a = "a".to_string(); + let b = "b".to_string(); + let c = "c".to_string(); + let d = "d".to_string(); + sketch.update(a.clone()); + sketch.update(b.clone()); + sketch.update(c.clone()); + sketch.update(d.clone()); + sketch.update(b.clone()); + sketch.update(c.clone()); + sketch.update(b.clone()); + for item in ["e", "f", "g", "h", "i", "j", "k", "l"] { + sketch.update(item.to_string()); + } + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 15); + assert_eq!(sketch.get_num_active_items(), 12); + assert_eq!(sketch.get_estimate(&a), 1); + assert_eq!(sketch.get_estimate(&b), 3); + assert_eq!(sketch.get_estimate(&c), 2); + assert_eq!(sketch.get_estimate(&d), 1); + assert_eq!(sketch.get_maximum_error(), 0); +} + +#[test] +fn test_longs_estimation_mode() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, 10); + for item in 2..=6 { + sketch.update(item); + } + sketch.update_with_count(7, 15); + for item in 8..=12 { + sketch.update(item); + } + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 35); + assert!(sketch.get_maximum_error() > 0); + + let items = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(items.len(), 2); + assert_eq!(items[0].item(), &7); + assert_eq!(items[0].estimate(), 15); + assert_eq!(items[1].item(), &1); + assert_eq!(items[1].estimate(), 10); + + let items = sketch.get_frequent_items(ErrorType::NoFalseNegatives); + assert!(items.len() >= 2); + assert!(items.len() <= 12); +} + +#[test] +fn test_items_estimation_mode() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, 10); + for item in 2..=6 { + sketch.update(item); + } + sketch.update_with_count(7, 15); + for item in 8..=12 { + sketch.update(item); + } + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 35); + assert!(sketch.get_maximum_error() > 0); + + let items = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(items.len(), 2); + assert_eq!(items[0].item(), &7); + assert_eq!(items[0].estimate(), 15); + assert_eq!(items[1].item(), &1); + assert_eq!(items[1].estimate(), 10); + + let items = sketch.get_frequent_items(ErrorType::NoFalseNegatives); + assert!(items.len() >= 2); + assert!(items.len() <= 12); +} + +#[test] +fn test_longs_purge_keeps_heavy_hitters() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, 10); + for item in 2..=7 { + sketch.update(item); + } + + assert_eq!(sketch.get_total_weight(), 16); + assert_eq!(sketch.get_maximum_error(), 1); + assert_eq!(sketch.get_estimate(&1), 10); + assert_eq!(sketch.get_lower_bound(&1), 9); + + let rows = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].item(), &1); + assert_eq!(rows[0].estimate(), 10); +} + +#[test] +fn test_items_purge_keeps_heavy_hitters() { + let mut sketch = FrequentItemsSketch::new(8); + sketch.update_with_count("a".to_string(), 10); + for item in ["b", "c", "d", "e", "f", "g"] { + sketch.update(item.to_string()); + } + + assert_eq!(sketch.get_total_weight(), 16); + assert_eq!(sketch.get_maximum_error(), 1); + assert_eq!(sketch.get_estimate(&"a".to_string()), 10); + assert_eq!(sketch.get_lower_bound(&"a".to_string()), 9); + + let rows = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].item(), "a"); + assert_eq!(rows[0].estimate(), 10); +} + +#[test] +fn test_items_custom_type() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(TestItem(1), 10); + for item in 2..=7 { + sketch.update(TestItem(item)); + } + let item = TestItem(8); + sketch.update(item); + + assert!(!sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 17); + assert_eq!(sketch.get_estimate(&TestItem(1)), 10); + + let rows = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].item(), &TestItem(1)); + assert_eq!(rows[0].estimate(), 10); +} + +#[test] +fn test_longs_merge_estimation_mode() { + let mut sketch1: FrequentItemsSketch = FrequentItemsSketch::new(16); + sketch1.update_with_count(1, 9); + for item in 2..=14 { + sketch1.update(item); + } + assert!(sketch1.get_maximum_error() > 0); + + let mut sketch2: FrequentItemsSketch = FrequentItemsSketch::new(16); + for item in 8..=20 { + sketch2.update(item); + } + sketch2.update_with_count(21, 11); + assert!(sketch2.get_maximum_error() > 0); + + sketch1.merge(&sketch2); + assert!(!sketch1.is_empty()); + assert_eq!(sketch1.get_total_weight(), 46); + assert!(sketch1.get_num_active_items() >= 2); + + let items = sketch1.get_frequent_items_with_threshold(ErrorType::NoFalsePositives, 2); + assert_eq!(items.len(), 2); + assert_eq!(items[0].item(), &21); + assert!(items[0].estimate() >= 11); + assert_eq!(items[1].item(), &1); + assert!(items[1].estimate() >= 9); +} + +#[test] +fn test_items_merge_estimation_mode() { + let mut sketch1: FrequentItemsSketch = FrequentItemsSketch::new(16); + sketch1.update_with_count(1, 9); + for item in 2..=14 { + sketch1.update(item); + } + assert!(sketch1.get_maximum_error() > 0); + + let mut sketch2: FrequentItemsSketch = FrequentItemsSketch::new(16); + for item in 8..=20 { + sketch2.update(item); + } + sketch2.update_with_count(21, 11); + assert!(sketch2.get_maximum_error() > 0); + + sketch1.merge(&sketch2); + assert!(!sketch1.is_empty()); + assert_eq!(sketch1.get_total_weight(), 46); + assert!(sketch1.get_num_active_items() >= 2); + + let items = sketch1.get_frequent_items_with_threshold(ErrorType::NoFalsePositives, 2); + assert_eq!(items.len(), 2); + assert_eq!(items[0].item(), &21); + assert!(items[0].estimate() >= 11); + assert_eq!(items[1].item(), &1); + assert!(items[1].estimate() >= 9); +} + +#[test] +fn test_longs_merge_exact_mode() { + let mut sketch1: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch1.update(1); + sketch1.update(2); + sketch1.update(2); + + let mut sketch2: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch2.update(2); + sketch2.update(3); + + sketch1.merge(&sketch2); + + assert!(!sketch1.is_empty()); + assert_eq!(sketch1.get_total_weight(), 5); + assert_eq!(sketch1.get_num_active_items(), 3); + assert_eq!(sketch1.get_estimate(&1), 1); + assert_eq!(sketch1.get_estimate(&2), 3); + assert_eq!(sketch1.get_estimate(&3), 1); + assert_eq!(sketch1.get_maximum_error(), 0); +} + +#[test] +fn test_items_merge_exact_mode() { + let mut sketch1 = FrequentItemsSketch::new(8); + let a = "a".to_string(); + let b = "b".to_string(); + let c = "c".to_string(); + sketch1.update(a.clone()); + sketch1.update(b.clone()); + sketch1.update(b.clone()); + + let mut sketch2 = FrequentItemsSketch::new(8); + sketch2.update(b.clone()); + sketch2.update(c.clone()); + + sketch1.merge(&sketch2); + + assert!(!sketch1.is_empty()); + assert_eq!(sketch1.get_total_weight(), 5); + assert_eq!(sketch1.get_num_active_items(), 3); + assert_eq!(sketch1.get_estimate(&a), 1); + assert_eq!(sketch1.get_estimate(&b), 3); + assert_eq!(sketch1.get_estimate(&c), 1); + assert_eq!(sketch1.get_maximum_error(), 0); +} + +#[test] +fn test_longs_merge_empty_is_noop() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update(1); + + let empty: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.merge(&empty); + + assert_eq!(sketch.get_total_weight(), 1); + assert_eq!(sketch.get_num_active_items(), 1); + assert_eq!(sketch.get_estimate(&1), 1); +} + +#[test] +fn test_items_merge_empty_is_noop() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update(1); + + let empty: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.merge(&empty); + + assert_eq!(sketch.get_total_weight(), 1); + assert_eq!(sketch.get_num_active_items(), 1); + assert_eq!(sketch.get_estimate(&1), 1); +} + +#[test] +fn test_row_equality_changes_with_updates() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update(1); + let rows1 = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows1.len(), 1); + let row1 = rows1[0].clone(); + + sketch.update(1); + let rows2 = sketch.get_frequent_items(ErrorType::NoFalsePositives); + assert_eq!(rows2.len(), 1); + let row2 = rows2[0].clone(); + + assert_ne!(row1, row2); + assert_eq!(row2.item(), &1); + assert_eq!(row2.estimate(), 2); +} + +#[test] +fn test_longs_reset() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, 3); + sketch.update_with_count(2, 2); + sketch.reset(); + + assert!(sketch.is_empty()); + assert_eq!(sketch.get_total_weight(), 0); + assert_eq!(sketch.get_num_active_items(), 0); + assert_eq!(sketch.get_lg_max_map_size(), 3); +} + +#[test] +#[should_panic(expected = "count may not be negative")] +fn test_longs_negative_count_panics() { + let mut sketch: FrequentItemsSketch = FrequentItemsSketch::new(8); + sketch.update_with_count(1, -1); +} + +#[test] +#[should_panic(expected = "count may not be negative")] +fn test_items_negative_count_panics() { + let mut sketch = FrequentItemsSketch::new(8); + sketch.update_with_count("a".to_string(), -1); +} + +#[test] +#[should_panic(expected = "value must be power of 2")] +fn test_longs_invalid_map_size_panics() { + FrequentItemsSketch::::new(6); +} + +#[test] +#[should_panic(expected = "value must be power of 2")] +fn test_items_invalid_map_size_panics() { + let _ = FrequentItemsSketch::::new(6); +}