Skip to content

[Access] Asynchronous Collection Indexing Design #8121

@zhangchiqing

Description

@zhangchiqing

1) Purpose & Outcomes

Goal. Index block collections reliably without overloading the node, even when finalized blocks arrive faster than we can index.

Outcomes.

  • Workers (job processors) stay focused on a small, sliding window of heights just above the latest indexed height.
  • Collection retrieval and indexing are decoupled from finalization and from each other.
  • Either of two sources may supply collections: (a) on‑demand collection requests; (b) the optional Execution Data Indexer (EDI). Whichever delivers first completes the job; duplicates are ignored.

2) High‑Level Flow (reader‑first overview)

  1. Finalization happens → a lazy signal wakes the Job Consumer.

  2. Job Consumer consults: (a) Progress Tracker → highest indexed height; (b) Jobs module → latest safe (head) height. It computes a bounded work window [indexed+1 .. min(indexed+N, head)].

  3. For each height in the window, the consumer spins up (or reuses) a Job Processor.

  4. The Job Processor checks if the block is already indexed. If yes → finish immediately. If not:

    • It enqueues the block’s missing collection IDs into MissingCollectionQueue (MCQ) with a completion callback.
    • It triggers CollectionRequester (unless we are in EDI‑only mode).
  5. As collections arrive (from requester or EDI), the Job Processor forwards them to MCQ. When MCQ detects a block is now complete, the processor passes the collections to BlockCollectionIndexer to store+index, then calls MCQ to mark the job done.

  6. Jobs may complete out of order; the progress tracker advances once any gaps below are closed.


3) Core Interfaces

3.1 BlockCollectionIndexer

Stores and indexes collections for a given block height; provides the latest indexed height for windowing and fast no‑op checks.

// Persists and indexes collections; exposes the last fully indexed height.
type BlockCollectionIndexer interface {
    LatestIndexedHeight() uint64

    // Idempotent. If blockHeight <= LatestIndexedHeight(), return quickly.
    // Otherwise: lock, re‑check, then persist+index the collections for that height.
    // Double‑check pattern minimizes lock contention when called by multiple sources.
    OnReceivedCollectionsForBlock(blockHeight uint64, cols []*flow.Collection) error
}

3.2 MissingCollectionQueue (MCQ)

An in‑memory coordinator for jobs and their completion callbacks. MCQ does not index; it only tracks missing collections per height and announces when a height becomes complete.

// Tracks missing collections per height and invokes job callbacks when complete.
type MissingCollectionQueue interface {
    EnqueueMissingCollections(blockHeight uint64, ids []flow.Identifier, callback func()) error
    OnIndexedForBlock(blockHeight uint64) // mark done (post‑indexing)

    // On receipt of a collection, MCQ updates internal state and, if a block
    // just became complete, returns (completedHeights, true).
    OnReceivedCollection(collectionID flow.Identifier) (*CompletedBlockHeights, bool)
}

3.3 CollectionRequester

Abstracts the engine that requests collections by ID (e.g., from collection nodes).

type CollectionRequester interface {
    RequestCollections(ids []flow.Identifier) error
}

3.4 JobProcessor

Owns the state of ongoing jobs (delegated to MCQ) and orchestrates request → receive → index → complete.

// Implements the job lifecycle for a single block height.
type JobProcessor interface {
    ProcessJob(ctx irrecoverable.SignalerContext, job module.Job, done func()) error
    OnReceivedCollectionsForBlock(blockHeight uint64, cols []*flow.Collection) error // called by EDI or requester
}

4) Job Consumer (Windowed Throttling)

Why: Prevent node overload when finalized heights advance rapidly.

How:

  • Reads latestIndexed = BlockCollectionIndexer.LatestIndexedHeight().
  • Reads head = Jobs.Head() (latest height safe to work on).
  • Defines a fixed‑size window W (e.g., next K heights). Work range is R = [latestIndexed+1 .. min(latestIndexed+K, head)].
  • Schedules/assigns one JobProcessor per height in R. Heights beyond R are ignored until capacity frees up.

Lazy notification:

  • Finalization pushes a single, coalescing signal to workSignal (buffer size 1). The consumer wakes, recomputes R, and may ignore new jobs if it’s already at capacity.

5) Detailed Job Lifecycle (out‑of‑order safe)

  1. Spawn/Assign. Consumer gives (height, job, doneCb) to a JobProcessor.

  2. Already Indexed? Processor queries storage/indexer. If yes → invoke doneCb and return.

  3. Track Missing. Otherwise call MCQ.EnqueueMissingCollections(height, collectionIDs, doneCb).

  4. Fetch (optional). If not in EDI‑only mode, call CollectionRequester.RequestCollections(ids).

  5. Receive Collections. When any collection arrives (from requester or EDI):

    • Processor calls MCQ.OnReceivedCollection(id).
    • If MCQ reports height complete, the processor calls BlockCollectionIndexer.OnReceivedCollectionsForBlock(height, cols).
    • After indexing, call MCQ.OnIndexedForBlock(height) to fire doneCb.
  6. Progress Advancement. Jobs may complete out of order (e.g., 10 before 9). The progress tracker advances only when all lower gaps are closed.

  7. Crash/Restart. On startup, the consumer rebuilds R. Re‑created jobs short‑circuit if already indexed (idempotent check in the indexer and early “already indexed?” checks in the processor).


6) Execution Data Indexer (EDI) Integration

Modes:

  • EDI‑only: A configuration flag disables active collection requests; the processor relies solely on EDI to supply collections. This must panic if EDI is disabled.
  • Hybrid (default): Both EDI and requester race to supply collections. The first source to complete a height wins; duplicates are dropped by the indexer’s idempotence and MCQ’s bookkeeping.

Why not let EDI call the indexer directly?

  • The JobProcessor needs to (a) update MCQ to complete the job, (b) suppress now‑redundant fetches, and (c) move the queue forward. Routing via JobProcessor.OnReceivedCollectionsForBlock centralizes those responsibilities.

Contention analysis:

  • Block‑level contention may occur if EDI and requester store near‑simultaneously.

    • If EDI lags, no contention; it skips already indexed heights.
    • If EDI leads, the consumer’s window advances quickly and the requester stops fetching already‑indexed heights.
    • If they’re close, brief contention is acceptable; indexing is typically faster than data ingress.

7) Concurrency & Locking Principles

  • Double‑check before lock in BlockCollectionIndexer.OnReceivedCollectionsForBlock to minimize contention under concurrent invocations (from EDI and requester paths).
  • Short‑held locks inside MCQ: per‑height sets/maps of outstanding collection IDs. Callbacks run outside critical sections.
  • Single‑slot workSignal coalesces bursts of finalization events.

8) Failure, Idempotence, & Restart Semantics

  • Idempotent indexing: re‑invoking OnReceivedCollectionsForBlock(height, cols) for an already‑indexed height is a no‑op.
  • At‑least‑once delivery: both EDI and requester may deliver; MCQ/Indexer handle duplicates.
  • Crash safety: on restart, jobs are recreated; already‑indexed heights complete immediately via early checks.

9) Configuration & Tuning

  • K — window size (max concurrently targeted heights).
  • workersPerHeight (optional) — typically 1 per height to simplify ordering; scale by increasing K.
  • ediOnly flag — require EDI presence; panic if EDI is off.
  • Backoff/retry policy for CollectionRequester. (This has been implemented by the requester engine)

10) Observability (MUST‑Haves)

  • Gauges: latestIndexedHeight, headHeight, inFlightHeights, collectionsMissing{count}.
  • Counters: collectionsRequested, collectionsReceived, heightsCompleted, duplicateCollections.
  • Timers: per‑height queue→complete latency, request→receive latency, receive→indexed latency.
  • Logs with height, collection IDs, source (EDI vs requester), and dedupe decisions.

11) Key Data Structures (sketch)

  • MCQ:

    • map[height]struct{ missing set[collectionID]; callback func(); }
    • map[collectionID]height (reverse index for quick lookups)
  • Progress Tracker: monotonic latestIndexedHeight.

  • Job Consumer: sliding window [h+1 .. h+K] and a per‑height worker registry.


12) Sequence (happy path)

  1. Finalization → workSignal <- struct{}{} (coalesced)
  2. Consumer figures out the range R and spins up a JobProcessor for each block height in that range. (This is implemented by the JobConsumer already)
  3. Processor checks storage; not indexed → MCQ.EnqueueMissingCollections(height, ids, cb).
  4. Processor requests collections (unless ediOnly).
  5. EDI and/or requester deliver collections → Processor.OnReceivedCollectionsForBlock(height, cols)MCQ.OnReceivedCollection.
  6. MCQ detects completeness → Processor → BlockCollectionIndexer.OnReceivedCollectionsForBlock(height, cols)MCQ.OnIndexedForBlock(height)cb().
  7. Consumer sees gaps close; latestIndexedHeight advances; window slides. (This is implemented by the JobConsumer already)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions