A collection of progressively more sophisticated Key-Value Store implementations built with Hydro, designed as educational examples for an upcoming book about distributed programming.
The KVS Zoo demonstrates how to build distributed systems using Hydro's global dataflow programming model and a composable server architecture that allows mixing and matching request-routing layers, replication layers, and value semantics to create sophisticated distributed key-value stores from reusable components.
This project builds on prior distributed systems research:
- Anna KVS: A lattice-based key-value store emphasizing coordination-free semantics (original papers here and here
- Hydro Project: A Rust framework for correct and performance distributed systems
The implementations showcase Hydro's approach to building distributed systems: expressing coordination patterns as location-aware, low-latency dataflow graphs that provide zero-overhead distributed safety at compile time.
The zoo showcases a composable architecture where routing layers, replication layers, and value semantics can be mixed and matched.
The core emits two orthogonal event streams:
DataEvent<V>captures observable effects such as put/delete responses (and future streaming reads).MetaEventcarries background metadata such as tomb notifications or summary statistics.
Pipeline stages subscribe to the streams they care about:
- Before stages run sequentially just before storage, routing and ordering
KVSOperation<V>requests. - After stages run sequentially just after storage, consuming
DataEvent<V>and optionallyMetaEventwhen replication needs the extra context. - Background stages (opt-in) attach to either
DataEvent<V>orMetaEventstreams for longer-lived background work.
Stages branch by cloning Hydro streams, which could potentially be optimized in future.
- Composable layers:
- Before storage (routing/ordering):
SingleNodeRouter,RoundRobinRouter,ShardedRouter,PaxosDispatcher, andPipeline<...>to compose them - After storage (replication/responders):
NoReplication,SimpleGossip,BroadcastReplication
- Before storage (routing/ordering):
- Single entrypoint:
layer_flow(for focused wiring) andplumb_kvs_dataflow(for end-to-end server wiring with external I/O) - Value Types:
LwwWrapper<T>(last-writer-wins),CausalWrapper<T>(causal with vector clocks)
Single-node key-value store with sequential semantics.
- Routing:
SingleNodeRouter - Replication:
NoReplication - Nodes: 1
- Concepts: Basic Hydro dataflow, external interfaces, process/cluster abstraction
Multi-node replication with selectable eventual consistency model.
- Routing:
RoundRobinRouter - Replication:
SimpleGossip(epidemic rumor-mongering) - Value Types:
CausalString(default) - causal consistency with vector clocksLwwWrapper<String>(via--lattice lww) - last-writer-wins non-deterministic consistency
- Nodes: 3 replicas
- Concepts: Gossip protocols, eventual consistency, lattice-based merge semantics
- Features:
- Periodic gossip with configurable intervals
- Probabilistic tombstoning for rumor cleanup
- Runtime selection of consistency model via CLI flag
Horizontal partitioning via consistent hashing for scalability.
- Routing:
Pipeline<ShardedRouter, SingleNodeRouter> - Replication:
NoReplication(per-shard) - Nodes: 3 shards
- Concepts: Data partitioning, hash-based routing, independent shards
- Features:
- Consistent key-to-shard mapping
- Proxy-based routing with unicast to specific shards
- No cross-shard communication (independent operation)
Combines sharding and replication for both scalability and fault tolerance.
- Routing:
Pipeline<ShardedRouter, RoundRobinRouter> - Replication:
BroadcastReplication(within each shard) - Nodes: 3 shards Γ 3 replicas = 9 total nodes
- Concepts: Hybrid architecture, multi-level composition
- Features:
- Multiple shard clusters, each with internal replication
- Broadcast replication within each shard for consistency
- Partitioning across shards for capacity
Imposes a total order with Paxos while keeping background replication pluggable.
- Routing/Ordering:
PaxosDispatcher(consensus before execution) +SlotOrderEnforcer(sequential execution) - Replication:
BroadcastReplication(eager replication of lattice deltas) - Nodes: 3 Paxos proposers + 3 Paxos acceptors + 3 KVS replicas = 9 total
- Concepts: Consensus, linearizability, slot-preserving replication
- Features:
- Paxos imposes a global slot order on operations
- Replication preserves slots and enforces in-order application per replica
- Strong consistency guarantees
KVS architectures are built by composing before_storage and after_storage layers, then wiring them with a single entrypoint:
// Create external I/O and plumb the full stack
let (layers, port) = plumb_kvs_dataflow::<LwwWrapper<String>, _>(
&proxy,
&client_external,
&flow,
kvs_spec, // e.g., a nested KVSCluster spec built from routing/replication components
);For focused scenarios (e.g., tests), layer_flow plumbs a single cluster with selected routing and replication over a stream of KVSOperation<V>.
LwwWrapper<T>: Last-writer-wins (simple overwrite)CausalWrapper<T>: Causal consistency usingDomPair<VCWrapper, SetUnionHashSet<T>>VCWrapper: Vector clock primitive for causality tracking
NoReplication: No background synchronizationSimpleGossip<V>: Demers-style rumor-mongering with probabilistic tombstoningBroadcastReplication<V>: Eager broadcast of all updates
Notes:
- Replication strategies implement a unified
replicate_updatesAPI overReplicationUpdate<V>so they can handle both slotted and unslotted updates without duplication.
SingleNodeRouter: Direct to single nodeRoundRobinRouter: Load balance across replicasShardedRouter: Hash-based key partitioningPaxosDispatcher: Global total order via Paxos consensusPipeline<R1, R2>: Compose two routing strategies
# Install Rust (if not already installed)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Clone the repository
git clone https://github.com/jhellerstein/kvs_zoo.git
cd kvs_zoo# Build the library and examples
cargo build --examples
# Run tests (includes causal consistency, sharding, gossip protocol tests)
cargo testEach example demonstrates a different KVS architecture:
# Local single-node KVS
cargo run --example local
# Replicated KVS with 3-node gossip cluster
cargo run --example replicated # Causal consistency (default)
cargo run --example replicated -- --lattice causal # Explicit causal
cargo run --example replicated -- --lattice lww # Last-write-wins
# Sharded KVS with 3 independent shards
cargo run --example sharded
# Sharded + Replicated (3 shards Γ 3 replicas = 9 nodes)
cargo run --example sharded_replicated
# Linearizable KVS with Paxos consensus
cargo run --example linearizableSee examples/README.md for detailed documentation on each architecture.
Here's what happens when you run the replicated example with causal consistency:
cargo run --example replicated -- --lattice causalThe example:
- Deploys a 3-node replicated cluster with
SimpleGossip - Sends operations with causal values (vector clock + value):
PUT "doc" => CausalString { vc: {node1: 1}, val: "v1" } PUT "doc" => CausalString { vc: {node2: 1}, val: "v2" } // Concurrent! GET "doc"
- Gossips updates between replicas periodically
- Merges concurrent writes via set union (both "v1" and "v2" retained)
- Prints responses showing the merged causal value
Output demonstrates:
- Operations routed round-robin across replicas
- Causal ordering preserved by vector clocks
- Concurrent writes converging via lattice merge
- Eventual consistency through gossip propagation
Servers, routing, replication, and values are independent dimensions:
// Mix and match components (spec-style)
type MyKVS = KVSCluster<
ShardCluster,
Pipeline<ShardedRouter, RoundRobinRouter>, // before_storage
BroadcastReplication<CausalString>, // after_storage
() // leaf
>;
let routing = Pipeline::new(ShardedRouter::new(3), RoundRobinRouter::new());
let replication = BroadcastReplication::<CausalString>::default();Values implement the Merge trait for coordination-free convergence:
impl<V: Merge<V>> KVSCore<V> {
// Concurrent writes automatically merge via lattice join
fn put(&mut self, key: String, value: V) {
self.store.merge_with(key, value);
}
}Epidemic gossip separates metadata from values:
- Rumor Store: Tracks which keys have been updated (metadata only)
- Lattice Store: Holds actual merged values
- Optimization: Only gossip keys, fetch values from local store
Compose routing layers for multi-level architectures:
// First route by shard, then by replica within shard
let pipeline = Pipeline::new(
ShardedRouter::new(num_shards),
RoundRobinRouter::new()
);The KVS Zoo demonstrates the spectrum of consistency models:
| Variant | Consistency | Coordination | Latency | Fault Tolerance | Nodes |
|---|---|---|---|---|---|
| Local | Strong | None | Lowest | None | 1 |
| Replicated (LWW) | Eventual | Gossip | Low | High | 3 |
| Replicated (Causal) | Causal | Gossip | Low | High | 3 |
| Sharded | Per-shard strong | Hash routing | Low | Medium | 3 |
| Sharded+Replicated | Per-shard causal | Gossip + Hash | Medium | Very High | 9 |
| Linearizable | Linearizable | Paxos consensus | Higher | High | 9 |
The test suite includes comprehensive validation:
- Protocol tests: Basic Get/Put operations (
tests/protocol_tests.rs) - Vector clock tests: Causality tracking, concurrent updates (
tests/vector_clock_tests.rs) - Causal consistency tests: Happens-before relationships, convergence (
tests/causal_consistency_tests.rs) - Sharding tests: Key distribution, shard independence (
tests/sharding_tests.rs) - Replication tests: Gossip protocol, convergence (
tests/replication_strategy_tests.rs) - Linearizability tests: Paxos consensus, total order (
tests/linearizability_tests.rs) - Composability tests: Server trait implementations (
tests/composable_integration.rs)
# Run all tests
cargo test
# Run with nextest for better output
cargo install cargo-nextest
cargo nextest run
# Run specific test suite
cargo test causal_consistencyWe use the insta crate to snapshot the user-facing stdout of examples in tests/examples_snapshots.rs. The test harness filters out unstable, internal process-launch noise; only semantically meaningful lines (banner, shard mapping lines, operation outputs, completion banners) are retained to keep snapshots stable over time.
Common workflow:
# Run snapshot tests normally
cargo test --test examples_snapshots -- --nocapture
# Review and accept updated snapshots interactively after intentional output changes
cargo insta review
# Force regenerate all snapshots (CI should not do this)
INSTA_UPDATE=always cargo test --test examples_snapshotsCommit updated tests/snapshots/*.snap files when outputs change intentionally. CI runs these to guard the educational surface of example output.
This project is designed for educational purposes as part of a distributed systems book. Contributions that improve clarity, add documentation, or demonstrate additional distributed systems concepts are welcome!
- Hydro Documentation
- Anna: A KVS for Any Scale
- Conflict-free Replicated Data Types (CRDTs)
- Vector Clocks
- Demers Gossip Protocol
This project is designed as educational material for the Hydro distributed programming book.