From 733de26cee457471e4a2d6bb3fbbd130b2faeea8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 15:54:59 -0700 Subject: [PATCH 1/9] fn: add Collect function for iterators In this commit, we introduce a new utility function `Collect` to the fn package. This function drains all elements from an iterator and returns them as a slice. This is particularly useful when transitioning from iterator-based APIs to code that expects slices, allowing for gradual migration to the new iterator patterns. The fn module's go.mod is also updated to require Go 1.23, which is necessary for the built-in iter.Seq type support. --- fn/go.mod | 2 +- fn/iter.go | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 fn/iter.go diff --git a/fn/go.mod b/fn/go.mod index 89dc280df20..abf0cdf4cda 100644 --- a/fn/go.mod +++ b/fn/go.mod @@ -1,6 +1,6 @@ module github.com/lightningnetwork/lnd/fn/v2 -go 1.19 +go 1.23 require ( github.com/stretchr/testify v1.8.1 diff --git a/fn/iter.go b/fn/iter.go new file mode 100644 index 00000000000..52071dc6edb --- /dev/null +++ b/fn/iter.go @@ -0,0 +1,15 @@ +package fn + +import "iter" + +// Collect drains all of the elements from the passed iterator, returning a +// slice of the contents. +func Collect[T any](seq iter.Seq[T]) []T { + + var ret []T + for i := range seq { + ret = append(ret, i) + } + + return ret +} From 8e5c296f4d514cbdbce9a6e61e11d7d8fec19b1c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 16:06:10 -0700 Subject: [PATCH 2/9] build: add replace directive for local fn package In this commit, we add a replace directive to use the local fn package that now includes the new Collect function for iterators. This ensures that the main module can access the iterator utilities we've added. The replace directive will be removed once the fn package changes are merged and a new version is tagged. --- go.mod | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 3104c8b5252..07b3b86c0d2 100644 --- a/go.mod +++ b/go.mod @@ -206,6 +206,9 @@ require ( // store have been included in a tagged sqldb version. replace github.com/lightningnetwork/lnd/sqldb => ./sqldb +// Replace fn package to use local version with iterator Collect function. +replace github.com/lightningnetwork/lnd/fn/v2 => ./fn + // This replace is for https://github.com/advisories/GHSA-25xm-hr59-7c27 replace github.com/ulikunitz/xz => github.com/ulikunitz/xz v0.5.11 From 05d607e9757f1ee89acbaaca68335bf846e3b01c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 15:55:19 -0700 Subject: [PATCH 3/9] graph/db: add options infrastructure for iterator configuration In this commit, we introduce a new options pattern for configuring iterator behavior in the graph database. This includes configuration for batch sizes when iterating over channel and node updates, as well as an option to filter for public nodes only. The new functional options pattern allows callers to customize iterator behavior without breaking existing APIs. Default batch sizes are set to 1000 entries for both channel and node updates, which provides a good balance between memory usage and performance. --- graph/db/options.go | 52 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/graph/db/options.go b/graph/db/options.go index 3edda660212..da517df07ae 100644 --- a/graph/db/options.go +++ b/graph/db/options.go @@ -20,6 +20,58 @@ const ( DefaultPreAllocCacheNumNodes = 15000 ) +// Option is a functional option used to change the per-call confirmation. +type Option func(*config) + +// config holds the configuration for graph operations. +type config struct { + // chanUpdateIterBatchSize is the batch size to use when reading out + // channel updates to send a peer a backlog. + chanUpdateIterBatchSize int + + // nodeUpdateIterBatchSize is the batch size to use when reading out + // node updates to send to a peer backlog. + nodeUpdateIterBatchSize int + + // iterPublicNodes is used to make an iterator that only iterates over + // public nodes. + iterPublicNodes bool +} + +// defaultConfig returns the default configuration. +func defaultConfig() *config { + return &config{ + chanUpdateIterBatchSize: 1_000, + nodeUpdateIterBatchSize: 1_000, + } +} + +// WithChanUpdateSize sets the batch size for chan upd iterators. +func WithChanUpdateSize(size int) Option { + return func(cfg *config) { + if size > 0 { + cfg.chanUpdateIterBatchSize = size + } + } +} + +// WithNodeUpdateIterBatchSize set the batch size for node ann iterators. +func WithNodeUpdateIterBatchSize(size int) Option { + return func(cfg *config) { + if size > 0 { + cfg.nodeUpdateIterBatchSize = size + } + } +} + +// WithIterPublicNodesOnly is used to create an iterator that only iterates over +// public nodes. +func WithIterPublicNodesOnly() Option { + return func(cfg *config) { + cfg.iterPublicNodes = true + } +} + // chanGraphOptions holds parameters for tuning and customizing the // ChannelGraph. type chanGraphOptions struct { From f78603497bd7420ab2e310a4ecf74214361ead13 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 17:08:30 -0700 Subject: [PATCH 4/9] graph/db: convert NodeUpdatesInHorizon to use iterators In this commit, we refactor the NodeUpdatesInHorizon method to return an iterator instead of a slice. This change significantly reduces memory usage when dealing with large result sets by allowing callers to process items incrementally rather than loading everything into memory at once. The new implementation uses Go 1.23's iter.Seq type to provide a standard iterator interface. The method now supports configurable batch sizes through functional options, allowing fine-tuned control over memory usage and performance characteristics. Rather than reading all the entries from disk into memory (before this commit, we did consult the cache for most entries, skipping the disk hits), we now expose a chunked iterator instead. We also make the process of filtering out public nodes first class. This saves many newly created db transactions later. --- graph/db/interfaces.go | 5 +- graph/db/kv_store.go | 204 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 184 insertions(+), 25 deletions(-) diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index dbcd5d1e64e..0d1032a07ae 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -2,6 +2,7 @@ package graphdb import ( "context" + "iter" "net" "time" @@ -124,8 +125,8 @@ type V1Store interface { //nolint:interfacebloat // an update timestamp within the passed range. This method can be used // by two nodes to quickly determine if they have the same set of up to // date node announcements. - NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.LightningNode, error) + NodeUpdatesInHorizon(startTime, endTime time.Time, + opts ...Option) (iter.Seq[models.LightningNode], error) // FetchLightningNode attempts to look up a target node by its identity // public key. If the node isn't found in the database, then diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 7bac0ec134e..51d76fd8701 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "iter" "math" "net" "sort" @@ -2189,57 +2190,170 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, return edgesInHorizon, nil } -// NodeUpdatesInHorizon returns all the known lightning node which have an -// update timestamp within the passed range. This method can be used by two -// nodes to quickly determine if they have the same set of up to date node -// announcements. -func (c *KVStore) NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.LightningNode, error) { +// nodeUpdatesIterator maintains state for iterating through node updates. +// Iterator Lifecycle: +// 1. Initialize state with start/end time, batch size, and filtering options +// 2. Fetch batch using pagination cursor (lastSeenKey) +// 3. Filter nodes if publicNodesOnly is set +// 4. Update lastSeenKey to the last processed node's index key +// 5. Repeat until we exceed endTime or no more nodes exist +type nodeUpdatesIterator struct { + // batchSize is the amount of node updates to read at a single time. + batchSize int - var nodesInHorizon []models.LightningNode + // startTime is the start time of the iteration request. + startTime time.Time + + // endTime is the end time of the iteration request. + endTime time.Time + + // lastSeenKey is the last index key seen. This is used to resume + // iteration. + lastSeenKey []byte + + // publicNodesOnly filters to only return public nodes if true. + publicNodesOnly bool + + // total tracks total nodes processed. + total int +} + +// newNodeUpdatesIterator makes a new node updates iterator. +func newNodeUpdatesIterator(batchSize int, startTime, endTime time.Time, + publicNodesOnly bool) *nodeUpdatesIterator { + + return &nodeUpdatesIterator{ + batchSize: batchSize, + startTime: startTime, + endTime: endTime, + lastSeenKey: nil, + publicNodesOnly: publicNodesOnly, + } +} + +// fetchNextNodeBatch fetches the next batch of node announcements using the +// iterator state. +func (c *KVStore) fetchNextNodeBatch( + state *nodeUpdatesIterator) ([]models.LightningNode, bool, error) { + + var ( + nodeBatch []models.LightningNode + hasMore bool + ) err := kvdb.View(c.db, func(tx kvdb.RTx) error { nodes := tx.ReadBucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound } - + ourPubKey := nodes.Get(sourceKey) + if ourPubKey == nil && state.publicNodesOnly { + // If we're filtering for public nodes only but don't have + // a source node set, we can't determine if nodes are public. + // A node is considered public if it has at least one channel + // with our node (the source node). + return ErrSourceNodeNotSet + } nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket) if nodeUpdateIndex == nil { return ErrGraphNodesNotFound } - // We'll now obtain a cursor to perform a range query within - // the index to find all node announcements within the horizon. + // We'll now obtain a cursor to perform a range query within the + // index to find all node announcements within the horizon. + // The nodeUpdateIndex key format is: [8 bytes timestamp][33 bytes node pubkey] + // This allows efficient range queries by time while maintaining a stable + // sort order for nodes with the same timestamp. updateCursor := nodeUpdateIndex.ReadCursor() var startTimeBytes, endTimeBytes [8 + 33]byte byteOrder.PutUint64( - startTimeBytes[:8], uint64(startTime.Unix()), + startTimeBytes[:8], uint64(state.startTime.Unix()), ) byteOrder.PutUint64( - endTimeBytes[:8], uint64(endTime.Unix()), + endTimeBytes[:8], uint64(state.endTime.Unix()), ) - // With our start and end times constructed, we'll step through - // the index collecting info for each node within the time - // range. - // - //nolint:ll - for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil && - bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() { + // If we have a last seen key (existing iteration), then that'll + // be our starting point. Otherwise, we'll seek to the start + // time. + var indexKey []byte + if state.lastSeenKey != nil { + indexKey, _ = updateCursor.Seek(state.lastSeenKey) + + if bytes.Equal(indexKey, state.lastSeenKey) { + indexKey, _ = updateCursor.Next() + } + } else { + indexKey, _ = updateCursor.Seek(startTimeBytes[:]) + } + + // Now we'll read items up to the batch size, exiting early if + // we exceed the ending time. + var lastProcessedKey []byte + for len(nodeBatch) < state.batchSize && indexKey != nil { + // Extract the timestamp from the index key (first 8 bytes). + // Only compare timestamps, not the full key with pubkey. + keyTimestamp := byteOrder.Uint64(indexKey[:8]) + endTimestamp := uint64(state.endTime.Unix()) + if keyTimestamp > endTimestamp { + break + } + nodePub := indexKey[8:] node, err := fetchLightningNode(nodes, nodePub) if err != nil { return err } - nodesInHorizon = append(nodesInHorizon, node) + if state.publicNodesOnly { + nodeIsPublic, err := c.isPublic( + tx, node.PubKeyBytes, ourPubKey, + ) + if err != nil { + return err + } + if !nodeIsPublic { + indexKey, _ = updateCursor.Next() + continue + } + } + + nodeBatch = append(nodeBatch, node) + state.total++ + + // Remember the last key we actually processed. We'll + // use this to update the last seen key below. + if lastProcessedKey == nil { + lastProcessedKey = make([]byte, len(indexKey)) + } + copy(lastProcessedKey, indexKey) + + // Advance the iterator to the next entry. + indexKey, _ = updateCursor.Next() + } + + // If we haven't yet crossed the endTime, then we still + // have more entries to deliver. + if indexKey != nil { + keyTimestamp := byteOrder.Uint64(indexKey[:8]) + endTimestamp := uint64(state.endTime.Unix()) + if keyTimestamp <= endTimestamp { + hasMore = true + } + } + + // Update the cursor to the last key we actually processed. + if lastProcessedKey != nil { + if state.lastSeenKey == nil { + state.lastSeenKey = make([]byte, len(lastProcessedKey)) + } + copy(state.lastSeenKey, lastProcessedKey) } return nil }, func() { - nodesInHorizon = nil + nodeBatch = nil }) switch { case errors.Is(err, ErrGraphNoEdgesFound): @@ -2248,10 +2362,54 @@ func (c *KVStore) NodeUpdatesInHorizon(startTime, break case err != nil: - return nil, err + return nil, false, err + } + + return nodeBatch, hasMore, nil +} + +// NodeUpdatesInHorizon returns all the known lightning node which have an +// update timestamp within the passed range. This method can be used by two +// nodes to quickly determine if they have the same set of up to date node +// announcements. +func (c *KVStore) NodeUpdatesInHorizon(startTime, + endTime time.Time, + opts ...Option) (iter.Seq[models.LightningNode], error) { + + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) } - return nodesInHorizon, nil + return func(yield func(models.LightningNode) bool) { + // Initialize iterator state. + state := newNodeUpdatesIterator( + cfg.nodeUpdateIterBatchSize, + startTime, endTime, + cfg.iterPublicNodes, + ) + + for { + nodeAnns, hasMore, err := c.fetchNextNodeBatch(state) + if err != nil { + log.Errorf("unable to read node updates in "+ + "horizon: %v", err) + return + } + + for _, node := range nodeAnns { + if !yield(node) { + return + } + } + + // If we we're done, then we can just break out here + // now. + if !hasMore || len(nodeAnns) == 0 { + break + } + } + }, nil } // FilterKnownChanIDs takes a set of channel IDs and return the subset of chan From dbff8d175aad793af974dee7efb283bb4b72daee Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 17:16:21 -0700 Subject: [PATCH 5/9] graph/db: convert ChanUpdatesInHorizon to use iterators In this commit, we refactor the ChanUpdatesInHorizon method to return an iterator instead of a slice. This change significantly reduces memory usage when dealing with large result sets by allowing callers to process items incrementally rather than loading everything into memory at once. --- graph/db/interfaces.go | 4 +- graph/db/kv_store.go | 300 ++++++++++++++++++++++++++++++----------- 2 files changed, 227 insertions(+), 77 deletions(-) diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 0d1032a07ae..f92a56d923b 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -236,8 +236,8 @@ type V1Store interface { //nolint:interfacebloat // ChanUpdatesInHorizon returns all the known channel edges which have // at least one edge that has an update timestamp within the specified // horizon. - ChanUpdatesInHorizon(startTime, endTime time.Time) ([]ChannelEdge, - error) + ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...Option) (iter.Seq[ChannelEdge], error) // FilterKnownChanIDs takes a set of channel IDs and return the subset // of chan ID's that we don't know and are not known zombies of the diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 51d76fd8701..0114bdec8c2 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2035,107 +2035,199 @@ type ChannelEdge struct { Node2 *models.LightningNode } -// ChanUpdatesInHorizon returns all the known channel edges which have at least -// one edge that has an update timestamp within the specified horizon. -func (c *KVStore) ChanUpdatesInHorizon(startTime, - endTime time.Time) ([]ChannelEdge, error) { - - // To ensure we don't return duplicate ChannelEdges, we'll use an - // additional map to keep track of the edges already seen to prevent - // re-adding it. - var edgesSeen map[uint64]struct{} - var edgesToCache map[uint64]ChannelEdge - var edgesInHorizon []ChannelEdge +// updateChanCacheBatch updates the channel cache with multiple edges at once. +// This method acquires the cache lock only once for the entire batch. +func (c *KVStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) { + if len(edgesToCache) == 0 { + return + } c.cacheMu.Lock() defer c.cacheMu.Unlock() - var hits int + for cid, edge := range edgesToCache { + c.chanCache.insert(cid, edge) + } +} + +// isEmptyGraphError returns true if the error indicates the graph database +// is empty (no edges or nodes exist). These errors are expected when the +// graph is first created or has no data. +func isEmptyGraphError(err error) bool { + return errors.Is(err, ErrGraphNoEdgesFound) || + errors.Is(err, ErrGraphNodesNotFound) +} + +// chanUpdatesIterator holds the state for chunked channel update iteration. +type chanUpdatesIterator struct { + // batchSize is the amount of channel updates to read at a single time. + batchSize int + + // startTime is the start time of the iteration request. + startTime time.Time + + // endTime is the end time of the iteration request. + endTime time.Time + + // edgesSeen is used to dedup edges. + edgesSeen map[uint64]struct{} + + // edgesToCache houses all the edges that we read from the disk which + // aren't yet cached. This is used to update the cache after a batch + // chunk. + edgesToCache map[uint64]ChannelEdge + + // lastSeenKey is the last index key seen. This is used to resume + // iteration. + lastSeenKey []byte + + hits int + total int +} + +// newChanUpdatesIterator makes a new chan updates iterator. +func newChanUpdatesIterator(batchSize int, + startTime, endTime time.Time) *chanUpdatesIterator { + + return &chanUpdatesIterator{ + batchSize: batchSize, + startTime: startTime, + endTime: endTime, + edgesSeen: make(map[uint64]struct{}), + edgesToCache: make(map[uint64]ChannelEdge), + lastSeenKey: nil, + } +} + +// fetchNextChanUpdateBatch retrieves the next batch of channel edges within the +// horizon. Returns the batch, whether there are more edges, and any error. +func (c *KVStore) fetchNextChanUpdateBatch( + state *chanUpdatesIterator) ([]ChannelEdge, bool, error) { + + var ( + batch []ChannelEdge + hasMore bool + ) err := kvdb.View(c.db, func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound } + edgeIndex := edges.NestedReadBucket(edgeIndexBucket) if edgeIndex == nil { return ErrGraphNoEdgesFound } + edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket) if edgeUpdateIndex == nil { return ErrGraphNoEdgesFound } - nodes := tx.ReadBucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound } - // We'll now obtain a cursor to perform a range query within - // the index to find all channels within the horizon. + // With all the relevent buckets read, we'll now create a fresh + // read cusor. updateCursor := edgeUpdateIndex.ReadCursor() + // We'll now use the start and end time to create the keys that + // we'll use to seek. var startTimeBytes, endTimeBytes [8 + 8]byte byteOrder.PutUint64( - startTimeBytes[:8], uint64(startTime.Unix()), + startTimeBytes[:8], uint64(state.startTime.Unix()), ) byteOrder.PutUint64( - endTimeBytes[:8], uint64(endTime.Unix()), + endTimeBytes[:8], uint64(state.endTime.Unix()), ) - // With our start and end times constructed, we'll step through - // the index collecting the info and policy of each update of - // each channel that has a last update within the time range. - // - //nolint:ll - for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil && - bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() { - // We have a new eligible entry, so we'll slice of the - // chan ID so we can query it in the DB. - chanID := indexKey[8:] + var indexKey []byte + + // If we left off earlier, then we'll use that key as the + // starting point. + switch { + case state.lastSeenKey != nil: + // Seek to the last seen key, moving to the key right + // after it. + indexKey, _ = updateCursor.Seek(state.lastSeenKey) + + if bytes.Equal(indexKey, state.lastSeenKey) { + indexKey, _ = updateCursor.Next() + } + + // Otherwise, we'll move to the very start of the time range. + default: + // TODO(roasbeef): throwing away the value here? + indexKey, _ = updateCursor.Seek(startTimeBytes[:]) + } + + // TODO(roasbeef): iterate the channel graph cache instead w/ a treap + // ordering? + + // Now we'll read items up to the batch size, exiting early if + // we exceed the ending time. + for len(batch) < state.batchSize && indexKey != nil { + // If we're at the end, then we'll break out now. + if bytes.Compare(indexKey, endTimeBytes[:]) > 0 { + break + } - // If we've already retrieved the info and policies for - // this edge, then we can skip it as we don't need to do - // so again. + chanID := indexKey[8:] chanIDInt := byteOrder.Uint64(chanID) - if _, ok := edgesSeen[chanIDInt]; ok { + + if state.lastSeenKey == nil { + state.lastSeenKey = make([]byte, len(indexKey)) + } + copy(state.lastSeenKey, indexKey) + + // If we've seen this channel ID already, then we'll + // skip it. + if _, ok := state.edgesSeen[chanIDInt]; ok { + indexKey, _ = updateCursor.Next() continue } + // Before we read the edge info, we'll see if this + // element is already in the cache or not. + c.cacheMu.RLock() if channel, ok := c.chanCache.get(chanIDInt); ok { - hits++ - edgesSeen[chanIDInt] = struct{}{} - edgesInHorizon = append(edgesInHorizon, channel) + state.edgesSeen[chanIDInt] = struct{}{} + + batch = append(batch, channel) + + state.hits++ + state.total++ + + indexKey, _ = updateCursor.Next() + + c.cacheMu.RUnlock() continue } + c.cacheMu.RUnlock() - // First, we'll fetch the static edge information. + // The edge wasn't in the cache, so we'll fetch it along w/ the + // edge policies and nodes. edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { - chanID := byteOrder.Uint64(chanID) - return fmt.Errorf("unable to fetch info for "+ - "edge with chan_id=%v: %v", chanID, err) + return fmt.Errorf("unable to fetch info "+ + "for edge with chan_id=%v: %v", chanIDInt, err) } - - // With the static information obtained, we'll now - // fetch the dynamic policy info. edge1, edge2, err := fetchChanEdgePolicies( edgeIndex, edges, chanID, ) if err != nil { - chanID := byteOrder.Uint64(chanID) - return fmt.Errorf("unable to fetch policies "+ - "for edge with chan_id=%v: %v", chanID, - err) + return fmt.Errorf("unable to fetch "+ + "policies for edge with chan_id=%v: %v", + chanIDInt, err) } - node1, err := fetchLightningNode( nodes, edgeInfo.NodeKey1Bytes[:], ) if err != nil { return err } - node2, err := fetchLightningNode( nodes, edgeInfo.NodeKey2Bytes[:], ) @@ -2143,9 +2235,8 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, return err } - // Finally, we'll collate this edge with the rest of - // edges to be returned. - edgesSeen[chanIDInt] = struct{}{} + // Now we have all the information we need to build the + // channel edge. channel := ChannelEdge{ Info: &edgeInfo, Policy1: edge1, @@ -2153,41 +2244,100 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, Node1: &node1, Node2: &node2, } - edgesInHorizon = append(edgesInHorizon, channel) - edgesToCache[chanIDInt] = channel + + state.edgesSeen[chanIDInt] = struct{}{} + state.edgesToCache[chanIDInt] = channel + + batch = append(batch, channel) + + state.total++ + + // Advance the iterator to the next entry. + indexKey, _ = updateCursor.Next() + } + + // If we haven't yet crossed the endTimeBytes, then we still + // have more entries to deliver. + if indexKey != nil && + bytes.Compare(indexKey, endTimeBytes[:]) <= 0 { + hasMore = true } return nil }, func() { - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edgesInHorizon = nil + batch = nil + hasMore = false }) - switch { - case errors.Is(err, ErrGraphNoEdgesFound): - fallthrough - case errors.Is(err, ErrGraphNodesNotFound): - break - - case err != nil: - return nil, err + if err != nil { + return nil, false, err } - // Insert any edges loaded from disk into the cache. - for chanid, channel := range edgesToCache { - c.chanCache.insert(chanid, channel) - } + return batch, hasMore, nil +} - if len(edgesInHorizon) > 0 { - log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)", - float64(hits)*100/float64(len(edgesInHorizon)), hits, - len(edgesInHorizon)) - } else { - log.Debugf("ChanUpdatesInHorizon returned no edges in "+ - "horizon (%s, %s)", startTime, endTime) +// ChanUpdatesInHorizon returns all the known channel edges which have at least +// one edge that has an update timestamp within the specified horizon. +func (c *KVStore) ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...Option) (iter.Seq[ChannelEdge], error) { + + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) } - return edgesInHorizon, nil + return func(yield func(ChannelEdge) bool) { + + iterState := newChanUpdatesIterator( + cfg.chanUpdateIterBatchSize, startTime, endTime, + ) + + for { + // At the top of the loop, we'll read the next batch + // chunk from disk. We'll also determine if we have any + // more entries after this or not. + batch, hasMore, err := c.fetchNextChanUpdateBatch( + iterState, + ) + // TODO(roasbeef): yield error here? + if err != nil { + // These errors just mean the graph is empty, which is OK. + if !isEmptyGraphError(err) { + log.Errorf("ChanUpdatesInHorizon batch "+ + "error: %v", err) + return + } + // Continue with empty batch + } + + // We'll now yield each edge that we just read. If yield + // returns false, then that means that we'll exit early. + for _, edge := range batch { + if !yield(edge) { + return + } + } + + // Update cache after successful batch yield. + c.updateChanCacheBatch(iterState.edgesToCache) + iterState.edgesToCache = make(map[uint64]ChannelEdge) + + // If we we're done, then we can just break out here + // now. + if !hasMore || len(batch) == 0 { + break + } + } + + if iterState.total > 0 { + log.Tracef("ChanUpdatesInHorizon hit percentage: "+ + "%.2f (%d/%d)", float64(iterState.hits)*100/ + float64(iterState.total), iterState.hits, + iterState.total) + } else { + log.Tracef("ChanUpdatesInHorizon returned no edges "+ + "in horizon (%s, %s)", startTime, endTime) + } + }, nil } // nodeUpdatesIterator maintains state for iterating through node updates. From 2d1e71b7ff722ce5b7c963c5ab5f39ca0af3fef2 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 17:36:45 -0700 Subject: [PATCH 6/9] sqldb: implement iterator support for NodeUpdatesInHorizon In this commit, we update the SQL store implementation to support the new iterator-based API for NodeUpdatesInHorizon. This includes adding a new SQL query that supports efficient pagination through result sets. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetNodesByLastUpdateRange is updated to support: * Cursor-based pagination using (last_update, pub_key) compound cursor * Optional filtering for public nodes only * Configurable batch sizes via MaxResults parameter --- graph/db/sql_store.go | 104 +++++++++++++++++++++++++---------- sqldb/sqlc/graph.sql.go | 47 ++++++++++++++-- sqldb/sqlc/queries/graph.sql | 30 +++++++++- 3 files changed, 148 insertions(+), 33 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f996bfbfa93..8e5645b14a6 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "errors" "fmt" + "iter" "maps" "math" "net" @@ -541,42 +542,89 @@ func (s *SQLStore) SetSourceNode(ctx context.Context, // announcements. // // NOTE: This is part of the V1Store interface. -func (s *SQLStore) NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.LightningNode, error) { +func (s *SQLStore) NodeUpdatesInHorizon(startTime, endTime time.Time, + opts ...Option) (iter.Seq[models.LightningNode], error) { - ctx := context.TODO() + // Apply options. + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) + } - var nodes []models.LightningNode - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - dbNodes, err := db.GetNodesByLastUpdateRange( - ctx, sqlc.GetNodesByLastUpdateRangeParams{ - StartTime: sqldb.SQLInt64(startTime.Unix()), - EndTime: sqldb.SQLInt64(endTime.Unix()), - }, + return func(yield func(models.LightningNode) bool) { + var ( + ctx = context.TODO() + lastUpdateTime sql.NullInt64 + lastPubKey = make([]byte, 33) + hasMore = true ) - if err != nil { - return fmt.Errorf("unable to fetch nodes: %w", err) - } - err = forEachNodeInBatch( - ctx, s.cfg.PaginationCfg, db, dbNodes, - func(_ int64, node *models.LightningNode) error { - nodes = append(nodes, *node) + // Each iteration, we'll read a batch amount of nodes, yield + // them, then decide is we have more or not. + for hasMore { + var batch []models.LightningNode + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + rows, err := db.GetNodesByLastUpdateRange( + ctx, sqlc.GetNodesByLastUpdateRangeParams{ + StartTime: sqldb.SQLInt64(startTime.Unix()), + EndTime: sqldb.SQLInt64(endTime.Unix()), + LastUpdate: lastUpdateTime, + LastPubKey: lastPubKey, + OnlyPublic: sql.NullBool{Bool: cfg.iterPublicNodes, Valid: true}, + MaxResults: sql.NullInt32{ + Int32: int32(cfg.nodeUpdateIterBatchSize), + Valid: true, + }, + }, + ) + if err != nil { + return err + } + + hasMore = len(rows) == cfg.nodeUpdateIterBatchSize + + err = forEachNodeInBatch( + ctx, s.cfg.PaginationCfg, db, rows, + func(_ int64, node *models.LightningNode) error { + batch = append(batch, *node) + + // Update pagination cursors + // based on the last processed + // node. + lastUpdateTime = sql.NullInt64{ + Int64: node.LastUpdate.Unix(), + Valid: true, + } + lastPubKey = node.PubKeyBytes[:] + + return nil + }, + ) + if err != nil { + return fmt.Errorf("unable to build nodes: %w", err) + } return nil - }, - ) - if err != nil { - return fmt.Errorf("unable to build nodes: %w", err) - } + }, sqldb.NoOpReset) - return nil - }, sqldb.NoOpReset) - if err != nil { - return nil, fmt.Errorf("unable to fetch nodes: %w", err) - } + if err != nil { + log.Errorf("NodeUpdatesInHorizon batch error: %v", err) + return + } + + for _, node := range batch { + if !yield(node) { + return + } + } - return nodes, nil + // If the batch didn't yield anything, then we're done. + if len(batch) == 0 { + break + } + } + }, nil } // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index fa6d50803f0..2a5508e62fc 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -1733,16 +1733,55 @@ const getNodesByLastUpdateRange = `-- name: GetNodesByLastUpdateRange :many SELECT id, version, pub_key, alias, last_update, color, signature FROM graph_nodes WHERE last_update >= $1 - AND last_update < $2 + AND last_update <= $2 + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE($3, -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE($3, -1) + AND pub_key > $4) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE($5, FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE($6, 999999999) ` type GetNodesByLastUpdateRangeParams struct { - StartTime sql.NullInt64 - EndTime sql.NullInt64 + StartTime sql.NullInt64 + EndTime sql.NullInt64 + LastUpdate sql.NullInt64 + LastPubKey []byte + OnlyPublic interface{} + MaxResults interface{} } func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, arg.StartTime, arg.EndTime) + rows, err := q.db.QueryContext(ctx, getNodesByLastUpdateRange, + arg.StartTime, + arg.EndTime, + arg.LastUpdate, + arg.LastPubKey, + arg.OnlyPublic, + arg.MaxResults, + ) if err != nil { return nil, err } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index ae497941341..ada8c9f199b 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -156,7 +156,35 @@ ORDER BY node_id, type, position; SELECT * FROM graph_nodes WHERE last_update >= @start_time - AND last_update < @end_time; + AND last_update <= @end_time + -- Pagination: We use (last_update, pub_key) as a compound cursor. + -- This ensures stable ordering and allows us to resume from where we left off. + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + -- Include rows with last_update greater than cursor (or all rows if cursor is -1) + last_update > COALESCE(sqlc.narg('last_update'), -1) + OR + -- For rows with same last_update, use pub_key as tiebreaker + (last_update = COALESCE(sqlc.narg('last_update'), -1) + AND pub_key > sqlc.narg('last_pub_key')) + ) + -- Optional filter for public nodes only + AND ( + -- If only_public is false or not provided, include all nodes + COALESCE(sqlc.narg('only_public'), FALSE) IS FALSE + OR + -- For V1 protocol, a node is public if it has at least one public channel. + -- A public channel has bitcoin_1_signature set (channel announcement received). + EXISTS ( + SELECT 1 + FROM graph_channels c + WHERE c.version = 1 + AND c.bitcoin_1_signature IS NOT NULL + AND (c.node_id_1 = graph_nodes.id OR c.node_id_2 = graph_nodes.id) + ) + ) +ORDER BY last_update ASC, pub_key ASC +LIMIT COALESCE(sqlc.narg('max_results'), 999999999); -- name: DeleteNodeAddresses :exec DELETE FROM graph_node_addresses From 5c601c98f92246c19366fe4775b3326d2458c823 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 17:45:19 -0700 Subject: [PATCH 7/9] sqldb: implement iterator support for ChanUpdatesInHorizon In this commit, we update the SQL store implementation to support the new iterator-based API for ChanUpdatesInHorizon. This includes adding SQL query pagination support and helper functions for efficient batch processing. The SQL implementation uses cursor-based pagination with configurable batch sizes, allowing efficient iteration over large result sets without loading everything into memory. The query is optimized to use indexes effectively and minimize database round trips. New SQL query GetChannelsByPolicyLastUpdateRange is updated to support: - Cursor-based pagination using (max_update_time, id) compound cursor - Configurable batch sizes via MaxResults parameter - Efficient batch caching with updateChanCacheBatch helper --- graph/db/sql_store.go | 283 +++++++++++++++++++++++------------ sqldb/sqlc/graph.sql.go | 38 ++++- sqldb/sqlc/queries/graph.sql | 20 ++- 3 files changed, 240 insertions(+), 101 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 8e5645b14a6..23c01d0e6a6 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1048,122 +1048,215 @@ func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, }, reset) } +// extractMaxUpdateTime returns the maximum of the two policy update times. +// This is used for pagination cursor tracking. +func extractMaxUpdateTime(row sqlc.GetChannelsByPolicyLastUpdateRangeRow) int64 { + if row.Policy1LastUpdate.Valid && row.Policy2LastUpdate.Valid { + return max(row.Policy1LastUpdate.Int64, row.Policy2LastUpdate.Int64) + } else if row.Policy1LastUpdate.Valid { + return row.Policy1LastUpdate.Int64 + } else if row.Policy2LastUpdate.Valid { + return row.Policy2LastUpdate.Int64 + } + return 0 +} + +// buildChannelFromRow constructs a ChannelEdge from a database row. +// This includes building the nodes, channel info, and policies. +func (s *SQLStore) buildChannelFromRow(ctx context.Context, db SQLQueries, + row sqlc.GetChannelsByPolicyLastUpdateRangeRow) (ChannelEdge, error) { + + node1, node2, err := buildNodes( + ctx, db, row.GraphNode, row.GraphNode_2, + ) + if err != nil { + return ChannelEdge{}, err + } + + channel, err := getAndBuildEdgeInfo( + ctx, db, s.cfg.ChainHash, + row.GraphChannel, node1.PubKeyBytes, + node2.PubKeyBytes, + ) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build "+ + "channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to extract "+ + "channel policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, db, dbPol1, dbPol2, channel.ChannelID, + node1.PubKeyBytes, node2.PubKeyBytes, + ) + if err != nil { + return ChannelEdge{}, fmt.Errorf("unable to build "+ + "channel policies: %w", err) + } + + return ChannelEdge{ + Info: channel, + Policy1: p1, + Policy2: p2, + Node1: node1, + Node2: node2, + }, nil +} + +// updateChanCacheBatch updates the channel cache with multiple edges at once. +// This method acquires the cache lock only once for the entire batch. +func (s *SQLStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) { + if len(edgesToCache) == 0 { + return + } + + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + + for chanID, edge := range edgesToCache { + s.chanCache.insert(chanID, edge) + } +} + // ChanUpdatesInHorizon returns all the known channel edges which have at least // one edge that has an update timestamp within the specified horizon. // +// Iterator Lifecycle: +// 1. Initialize state (edgesSeen map, cache tracking, pagination cursors) +// 2. Query batch of channels with policies in time range +// 3. For each channel: check if seen, check cache, or build from DB +// 4. Yield channels to caller +// 5. Update cache after successful batch +// 6. Repeat with updated pagination cursor until no more results +// // NOTE: This is part of the V1Store interface. -func (s *SQLStore) ChanUpdatesInHorizon(startTime, - endTime time.Time) ([]ChannelEdge, error) { +func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...Option) (iter.Seq[ChannelEdge], error) { - s.cacheMu.Lock() - defer s.cacheMu.Unlock() + // Apply options. + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) + } - var ( - ctx = context.TODO() - // To ensure we don't return duplicate ChannelEdges, we'll use - // an additional map to keep track of the edges already seen to - // prevent re-adding it. - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edges []ChannelEdge - hits int - ) - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - rows, err := db.GetChannelsByPolicyLastUpdateRange( - ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{ - Version: int16(ProtocolV1), - StartTime: sqldb.SQLInt64(startTime.Unix()), - EndTime: sqldb.SQLInt64(endTime.Unix()), - }, + return func(yield func(ChannelEdge) bool) { + + var ( + ctx = context.TODO() + edgesSeen = make(map[uint64]struct{}) + edgesToCache = make(map[uint64]ChannelEdge) + hits int + total int + lastUpdateTime sql.NullInt64 + lastID sql.NullInt64 + hasMore = true ) - if err != nil { - return err - } - for _, row := range rows { - // If we've already retrieved the info and policies for - // this edge, then we can skip it as we don't need to do - // so again. - chanIDInt := byteOrder.Uint64(row.GraphChannel.Scid) - if _, ok := edgesSeen[chanIDInt]; ok { - continue - } + // Each iteration, we'll read a batch amount of channel updates + // (consulting the cache along the way), yield them, then loop + // back to decide if we have any more updates to read out. + for hasMore { + var batch []ChannelEdge - if channel, ok := s.chanCache.get(chanIDInt); ok { - hits++ - edgesSeen[chanIDInt] = struct{}{} - edges = append(edges, channel) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + rows, err := db.GetChannelsByPolicyLastUpdateRange( + ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{ + Version: int16(ProtocolV1), + StartTime: sqldb.SQLInt64(startTime.Unix()), + EndTime: sqldb.SQLInt64(endTime.Unix()), + LastUpdateTime: lastUpdateTime, + LastID: lastID, + MaxResults: sql.NullInt32{ + Int32: int32(cfg.chanUpdateIterBatchSize), + Valid: true, + }, + }, + ) + if err != nil { + return err + } - continue - } + hasMore = len(rows) == cfg.chanUpdateIterBatchSize + + for _, row := range rows { + // Update pagination cursor. + lastUpdateTime = sql.NullInt64{ + Int64: extractMaxUpdateTime(row), + Valid: true, + } + lastID = sql.NullInt64{ + Int64: row.GraphChannel.ID, + Valid: true, + } + + // Skip if we've already processed this channel. + chanIDInt := byteOrder.Uint64(row.GraphChannel.Scid) + if _, ok := edgesSeen[chanIDInt]; ok { + continue + } + + // Check cache first. + if channel, ok := s.chanCache.get(chanIDInt); ok { + hits++ + total++ + edgesSeen[chanIDInt] = struct{}{} + batch = append(batch, channel) + continue + } + + chanEdge, err := s.buildChannelFromRow(ctx, db, row) + if err != nil { + return err + } + + edgesSeen[chanIDInt] = struct{}{} + edgesToCache[chanIDInt] = chanEdge + + batch = append(batch, chanEdge) + + total++ + } - node1, node2, err := buildNodes( - ctx, db, row.GraphNode, row.GraphNode_2, - ) - if err != nil { - return err - } + return nil + }, func() { + batch = nil + }) - channel, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, - node1.PubKeyBytes, node2.PubKeyBytes, - ) if err != nil { - return fmt.Errorf("unable to build channel "+ - "info: %w", err) + log.Errorf("ChanUpdatesInHorizon batch error: %v", err) + return } - dbPol1, dbPol2, err := extractChannelPolicies(row) - if err != nil { - return fmt.Errorf("unable to extract channel "+ - "policies: %w", err) + for _, edge := range batch { + if !yield(edge) { + return + } } - p1, p2, err := getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, channel.ChannelID, - node1.PubKeyBytes, node2.PubKeyBytes, - ) - if err != nil { - return fmt.Errorf("unable to build channel "+ - "policies: %w", err) - } + // Update cache after successful batch yield, setting + // the cache lock only once for the entire batch. + s.updateChanCacheBatch(edgesToCache) + edgesToCache = make(map[uint64]ChannelEdge) - edgesSeen[chanIDInt] = struct{}{} - chanEdge := ChannelEdge{ - Info: channel, - Policy1: p1, - Policy2: p2, - Node1: node1, - Node2: node2, + // If the batch didn't yield anything, then we're done. + if len(batch) == 0 { + break } - edges = append(edges, chanEdge) - edgesToCache[chanIDInt] = chanEdge } - return nil - }, func() { - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edges = nil - }) - if err != nil { - return nil, fmt.Errorf("unable to fetch channels: %w", err) - } - - // Insert any edges loaded from disk into the cache. - for chanid, channel := range edgesToCache { - s.chanCache.insert(chanid, channel) - } - - if len(edges) > 0 { - log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)", - float64(hits)*100/float64(len(edges)), hits, len(edges)) - } else { - log.Debugf("ChanUpdatesInHorizon returned no edges in "+ - "horizon (%s, %s)", startTime, endTime) - } - - return edges, nil + if total > 0 { + log.Debugf("ChanUpdatesInHorizon hit percentage: "+ + "%.2f (%d/%d)", float64(hits)*100/float64(total), + hits, total) + } else { + log.Debugf("ChanUpdatesInHorizon returned no edges "+ + "in horizon (%s, %s)", startTime, endTime) + } + }, nil } // ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 2a5508e62fc..e894a272176 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -993,18 +993,39 @@ WHERE c.version = $1 OR (cp2.last_update >= $2 AND cp2.last_update < $3) ) + -- Pagination using compound cursor (max_update_time, id). + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END > COALESCE($4, -1)) + OR + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END = COALESCE($4, -1) + AND c.id > COALESCE($5, -1)) + ) ORDER BY CASE WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) THEN COALESCE(cp1.last_update, 0) ELSE COALESCE(cp2.last_update, 0) - END ASC + END ASC, + c.id ASC +LIMIT COALESCE($6, 999999999) ` type GetChannelsByPolicyLastUpdateRangeParams struct { - Version int16 - StartTime sql.NullInt64 - EndTime sql.NullInt64 + Version int16 + StartTime sql.NullInt64 + EndTime sql.NullInt64 + LastUpdateTime sql.NullInt64 + LastID sql.NullInt64 + MaxResults interface{} } type GetChannelsByPolicyLastUpdateRangeRow struct { @@ -1044,7 +1065,14 @@ type GetChannelsByPolicyLastUpdateRangeRow struct { } func (q *Queries) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) { - rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, arg.Version, arg.StartTime, arg.EndTime) + rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, + arg.Version, + arg.StartTime, + arg.EndTime, + arg.LastUpdateTime, + arg.LastID, + arg.MaxResults, + ) if err != nil { return nil, err } diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index ada8c9f199b..1e002c08794 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -411,12 +411,30 @@ WHERE c.version = @version OR (cp2.last_update >= @start_time AND cp2.last_update < @end_time) ) + -- Pagination using compound cursor (max_update_time, id). + -- We use COALESCE with -1 as sentinel since timestamps are always positive. + AND ( + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END > COALESCE(sqlc.narg('last_update_time'), -1)) + OR + (CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END = COALESCE(sqlc.narg('last_update_time'), -1) + AND c.id > COALESCE(sqlc.narg('last_id'), -1)) + ) ORDER BY CASE WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) THEN COALESCE(cp1.last_update, 0) ELSE COALESCE(cp2.last_update, 0) - END ASC; + END ASC, + c.id ASC +LIMIT COALESCE(sqlc.narg('max_results'), 999999999); -- name: GetChannelByOutpointWithPolicies :one SELECT From 528582fedfc3bbc2efbf64875239c1c44275897b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 16:01:50 -0700 Subject: [PATCH 8/9] graph/db: add tests for iterator implementations --- graph/db/graph_test.go | 265 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 256 insertions(+), 9 deletions(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 25ce1edf72d..d82c40cb321 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2009,10 +2009,13 @@ func TestChanUpdatesInHorizon(t *testing.T) { // If we issue an arbitrary query before any channel updates are // inserted in the database, we should get zero results. - chanUpdates, err := graph.ChanUpdatesInHorizon( + chanIter, err := graph.ChanUpdatesInHorizon( time.Unix(999, 0), time.Unix(9999, 0), ) require.NoError(t, err, "unable to updates for updates") + + chanUpdates := fn.Collect(chanIter) + if len(chanUpdates) != 0 { t.Fatalf("expected 0 chan updates, instead got %v", len(chanUpdates)) @@ -2125,13 +2128,15 @@ func TestChanUpdatesInHorizon(t *testing.T) { }, } for _, queryCase := range queryCases { - resp, err := graph.ChanUpdatesInHorizon( + respIter, err := graph.ChanUpdatesInHorizon( queryCase.start, queryCase.end, ) if err != nil { t.Fatalf("unable to query for updates: %v", err) } + resp := fn.Collect(respIter) + if len(resp) != len(queryCase.resp) { t.Fatalf("expected %v chans, got %v chans", len(queryCase.resp), len(resp)) @@ -2170,10 +2175,11 @@ func TestNodeUpdatesInHorizon(t *testing.T) { // If we issue an arbitrary query before we insert any nodes into the // database, then we shouldn't get any results back. - nodeUpdates, err := graph.NodeUpdatesInHorizon( + nodeUpdatesIter, err := graph.NodeUpdatesInHorizon( time.Unix(999, 0), time.Unix(9999, 0), ) require.NoError(t, err, "unable to query for node updates") + nodeUpdates := fn.Collect(nodeUpdatesIter) require.Len(t, nodeUpdates, 0) // We'll create 10 node announcements, each with an update timestamp 10 @@ -2234,20 +2240,22 @@ func TestNodeUpdatesInHorizon(t *testing.T) { resp: nodeAnns, }, - // If we reduce the ending time by 10 seconds, then we should - // get all but the last node we inserted. + // If we reduce the ending time by 1 nanosecond before the last + // node's timestamp, then we should get all but the last node. { start: startTime, - end: endTime.Add(-time.Second * 10), + end: endTime.Add(-time.Second*10 - time.Nanosecond), resp: nodeAnns[:9], }, } for _, queryCase := range queryCases { - resp, err := graph.NodeUpdatesInHorizon( + iter, err := graph.NodeUpdatesInHorizon( queryCase.start, queryCase.end, ) require.NoError(t, err) + + resp := fn.Collect(iter) require.Len(t, resp, len(queryCase.resp)) for i := 0; i < len(resp); i++ { @@ -2256,6 +2264,243 @@ func TestNodeUpdatesInHorizon(t *testing.T) { } } +// TestNodeUpdatesInHorizonBoundaryConditions tests the iterator boundary +// conditions, specifically around batch boundaries and edge cases. +func TestNodeUpdatesInHorizonBoundaryConditions(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // Test with various batch sizes to ensure the iterator works correctly + // across batch boundaries. + batchSizes := []int{1, 3, 5, 10, 25, 100} + + for _, batchSize := range batchSizes { + t.Run(fmt.Sprintf("BatchSize%d", batchSize), func(t *testing.T) { + // Create a fresh graph for each test. + testGraph := MakeTestGraph(t) + + // Add 25 nodes with increasing timestamps. + startTime := time.Unix(1234567890, 0) + var nodeAnns []models.LightningNode + + for i := 0; i < 25; i++ { + nodeAnn := createTestVertex(t) + nodeAnn.LastUpdate = startTime.Add(time.Duration(i) * time.Hour) + nodeAnns = append(nodeAnns, *nodeAnn) + require.NoError(t, testGraph.AddLightningNode(ctx, nodeAnn)) + } + + testCases := []struct { + name string + start time.Time + end time.Time + want int + }{ + { + name: "all nodes", + start: startTime, + end: startTime.Add(26 * time.Hour), + want: 25, + }, + { + name: "first batch only", + start: startTime, + end: startTime.Add(time.Duration(min(batchSize, 25)-1) * time.Hour), + want: min(batchSize, 25), + }, + { + name: "cross batch boundary", + start: startTime, + end: startTime.Add(time.Duration(min(batchSize, 24)) * time.Hour), + want: min(batchSize+1, 25), + }, + { + name: "exact boundary", + start: func() time.Time { + // Test querying exactly at a batch boundary. + if batchSize <= 25 { + return startTime.Add(time.Duration(batchSize-1) * time.Hour) + } + + // For batch sizes > 25, test beyond our data range. + return startTime.Add(time.Duration(25) * time.Hour) + }(), + end: func() time.Time { + if batchSize <= 25 { + return startTime.Add(time.Duration(batchSize-1) * time.Hour) + } + return startTime.Add(time.Duration(25) * time.Hour) + }(), + want: func() int { + if batchSize <= 25 { + return 1 + } + + // No nodes exist at hour 25 or beyond + return 0 + }(), + }, + { + name: "empty range before", + start: startTime.Add(-time.Hour), + end: startTime.Add(-time.Minute), + want: 0, + }, + { + name: "empty range after", + start: startTime.Add(30 * time.Hour), + end: startTime.Add(40 * time.Hour), + want: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + iter, err := testGraph.NodeUpdatesInHorizon( + tc.start, tc.end, + WithNodeUpdateIterBatchSize( + batchSize, + ), + ) + require.NoError(t, err) + + nodes := fn.Collect(iter) + require.Len( + t, nodes, tc.want, + "expected %d nodes, got %d", + tc.want, len(nodes), + ) + + // Verify nodes are in the correct time + // order. + for i := 1; i < len(nodes); i++ { + require.True(t, + nodes[i-1].LastUpdate.Before(nodes[i].LastUpdate) || + nodes[i-1].LastUpdate.Equal(nodes[i].LastUpdate), + "nodes should be in chronological order", + ) + } + }) + } + }) + } +} + +// TestNodeUpdatesInHorizonEarlyTermination tests that the iterator properly +// handles early termination when the caller stops iterating. +func TestNodeUpdatesInHorizonEarlyTermination(t *testing.T) { + t.Parallel() + ctx := context.Background() + + graph := MakeTestGraph(t) + + // We'll start by creating 100 nodes, each with an update time // spaced + // one hour apart. + startTime := time.Unix(1234567890, 0) + for i := 0; i < 100; i++ { + nodeAnn := createTestVertex(t) + nodeAnn.LastUpdate = startTime.Add(time.Duration(i) * time.Hour) + require.NoError(t, graph.AddLightningNode(ctx, nodeAnn)) + } + + // Test early termination at various points + terminationPoints := []int{0, 1, 5, 10, 23, 50, 99} + + for _, stopAt := range terminationPoints { + t.Run(fmt.Sprintf("StopAt%d", stopAt), func(t *testing.T) { + iter, err := graph.NodeUpdatesInHorizon( + startTime, startTime.Add(200*time.Hour), + WithNodeUpdateIterBatchSize(10), + ) + require.NoError(t, err) + + // Collect only up to stopAt nodes, breakign afterwards. + var collected []models.LightningNode + count := 0 + for node := range iter { + if count >= stopAt { + break + } + collected = append(collected, node) + count++ + } + + require.Len( + t, collected, stopAt, + "should have collected exactly %d nodes", + stopAt, + ) + }) + } +} + +// TestChanUpdatesInHorizonBoundaryConditions tests the channel iterator +// boundary conditions. +func TestChanUpdatesInHorizonBoundaryConditions(t *testing.T) { + t.Parallel() + ctx := context.Background() + + batchSizes := []int{1, 3, 5, 10} + + for _, batchSize := range batchSizes { + t.Run(fmt.Sprintf("BatchSize%d", batchSize), func(t *testing.T) { + // Create a fresh graph for each test, then add two new + // nodes to teh graph. + graph := MakeTestGraph(t) + node1 := createTestVertex(t) + node2 := createTestVertex(t) + require.NoError(t, graph.AddLightningNode(ctx, node1)) + require.NoError(t, graph.AddLightningNode(ctx, node2)) + + // Next, we'll create 25 channels between the two nodes, + // each with increasign timestamps. + startTime := time.Unix(1234567890, 0) + const numChans = 25 + + for i := 0; i < numChans; i++ { + updateTime := startTime.Add(time.Duration(i) * time.Hour) + + channel, chanID := createEdge( + uint32(i*10), 0, 0, 0, node1, node2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, &channel)) + + edge1 := newEdgePolicy( + chanID.ToUint64(), updateTime.Unix(), + ) + edge1.ChannelFlags = 0 + edge1.ToNode = node2.PubKeyBytes + edge1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge1)) + + edge2 := newEdgePolicy( + chanID.ToUint64(), updateTime.Unix(), + ) + edge2.ChannelFlags = 1 + edge2.ToNode = node1.PubKeyBytes + edge2.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, edge2)) + } + + // Now we'll run teh main query, and verify that we get + // back the expected number of channels. + iter, err := graph.ChanUpdatesInHorizon( + startTime, startTime.Add(26*time.Hour), + WithChanUpdateSize(batchSize), + ) + require.NoError(t, err) + + channels := fn.Collect(iter) + require.Len( + t, channels, numChans, + "expected %d channels, got %d", numChans, + len(channels), + ) + }) + } +} + // TestFilterKnownChanIDsZombieRevival tests that if a ChannelUpdateInfo is // passed to FilterKnownChanIDs that contains a channel that we have marked as // a zombie, then we will mark it as live again if the new ChannelUpdate has @@ -3483,8 +3728,9 @@ func TestNodePruningUpdateIndexDeletion(t *testing.T) { // update time of our test node. startTime := time.Unix(9, 0) endTime := node1.LastUpdate.Add(time.Minute) - nodesInHorizon, err := graph.NodeUpdatesInHorizon(startTime, endTime) + nodesInHorizonIter, err := graph.NodeUpdatesInHorizon(startTime, endTime) require.NoError(t, err, "unable to fetch nodes in horizon") + nodesInHorizon := fn.Collect(nodesInHorizonIter) // We should only have a single node, and that node should exactly // match the node we just inserted. @@ -3501,8 +3747,9 @@ func TestNodePruningUpdateIndexDeletion(t *testing.T) { // Now that the node has been deleted, we'll again query the nodes in // the horizon. This time we should have no nodes at all. - nodesInHorizon, err = graph.NodeUpdatesInHorizon(startTime, endTime) + nodesInHorizonIter, err = graph.NodeUpdatesInHorizon(startTime, endTime) require.NoError(t, err, "unable to fetch nodes in horizon") + nodesInHorizon = fn.Collect(nodesInHorizonIter) if len(nodesInHorizon) != 0 { t.Fatalf("should have zero nodes instead have: %v", From 422529f181bb6efdef0e6669d8f9b71869b561c4 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Aug 2025 16:05:57 -0700 Subject: [PATCH 9/9] discovery+graph: update callers to use new iterator APIs In this commit, we update all callers of NodeUpdatesInHorizon and ChanUpdatesInHorizon to use the new iterator-based APIs. The changes use fn.Collect to maintain existing behavior while benefiting from the memory efficiency of iterators when possible. --- discovery/chan_series.go | 24 +++--------------------- graph/builder.go | 2 +- graph/interfaces.go | 5 +++-- 3 files changed, 7 insertions(+), 24 deletions(-) diff --git a/discovery/chan_series.go b/discovery/chan_series.go index a6787edf969..2641141f818 100644 --- a/discovery/chan_series.go +++ b/discovery/chan_series.go @@ -120,7 +120,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash, if err != nil { return nil, err } - for _, channel := range chansInHorizon { + for channel := range chansInHorizon { // If the channel hasn't been fully advertised yet, or is a // private channel, then we'll skip it as we can't construct a // full authentication proof if one is requested. @@ -163,30 +163,12 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash, // within the horizon as well. We send these second to ensure that they // follow any active channels they have. nodeAnnsInHorizon, err := c.graph.NodeUpdatesInHorizon( - startTime, endTime, + startTime, endTime, graphdb.WithIterPublicNodesOnly(), ) if err != nil { return nil, err } - for _, nodeAnn := range nodeAnnsInHorizon { - nodeAnn := nodeAnn - - // Ensure we only forward nodes that are publicly advertised to - // prevent leaking information about nodes. - isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes) - if err != nil { - log.Errorf("Unable to determine if node %x is "+ - "advertised: %v", nodeAnn.PubKeyBytes, err) - continue - } - - if !isNodePublic { - log.Tracef("Skipping forwarding announcement for "+ - "node %x due to being unadvertised", - nodeAnn.PubKeyBytes) - continue - } - + for nodeAnn := range nodeAnnsInHorizon { nodeUpdate, err := nodeAnn.NodeAnnouncement(true) if err != nil { return nil, err diff --git a/graph/builder.go b/graph/builder.go index 5cfb79e4c6e..727da3e96dd 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -599,7 +599,7 @@ func (b *Builder) pruneZombieChans() error { "chans: %v", err) } - for _, u := range oldEdges { + for u := range oldEdges { err = filterPruneChans(u.Info, u.Policy1, u.Policy2) if err != nil { return fmt.Errorf("error filtering channels to "+ diff --git a/graph/interfaces.go b/graph/interfaces.go index 5494db61a58..664ccccfffc 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -2,6 +2,7 @@ package graph import ( "context" + "iter" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -153,8 +154,8 @@ type DB interface { // ChanUpdatesInHorizon returns all the known channel edges which have // at least one edge that has an update timestamp within the specified // horizon. - ChanUpdatesInHorizon(startTime, endTime time.Time) ( - []graphdb.ChannelEdge, error) + ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...graphdb.Option) (iter.Seq[graphdb.ChannelEdge], error) // DeleteChannelEdges removes edges with the given channel IDs from the // database and marks them as zombies. This ensures that we're unable to