Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ func (s *state) saveExecutionResults(
return fmt.Errorf("can not retrieve chunk data packs: %w", err)
}

// Within the following `Store` call, the chunk data packs themselves are going to be persisted into their
// dedicated database. However, we have not yet persisted that this execution node is committing to the
// result represented by the chunk data packs. Populating the index from chunk ID to chunk data pack ID
// in the protocol database (signifying the node's slashable commitment to the respective result) is
// done by the functor returned by `Store`. The functor's is invoked as part of the atomic batch update
// of the protocol database below.
storeFunc, err := s.chunkDataPacks.Store(chunks)
if err != nil {
return fmt.Errorf("can not store chunk data packs for block ID: %v: %w", blockID, err)
Expand All @@ -427,9 +433,11 @@ func (s *state) saveExecutionResults(
// This design guarantees consistency in two scenarios:
//
// Case 1: If the batch update is interrupted, the mapping has not yet been saved.
// Later, if we attempt to store another execution result that references a
// different chunk data pack but the same chunk ID, there is no conflict,
// because no previous mapping exists.
// Later, if we attempt to store another execution result that references a different
// chunk data pack but the same chunk ID, there is no conflict, because no previous mapping
// exists. By convention, a node should only share information once it has persisted its
// commitment in the database. Therefore, if the database write was interrupted, none of the
// information is stored and no binding commitment to a different result could have been made.
//
// Case 2: If the batch update succeeds, the mapping is saved. Later, if we
// attempt to store another execution result that references a different
Expand Down Expand Up @@ -484,7 +492,6 @@ func (s *state) saveExecutionResults(
return nil
})
})

}

func (s *state) UpdateLastExecutedBlock(ctx context.Context, executedID flow.Identifier) error {
Expand Down
14 changes: 12 additions & 2 deletions model/flow/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (ch *Chunk) ID() Identifier {
return MakeID(ch)
}

// ChunkDataPackHeader is a reduced representation of ChunkDataPack. In a nutshell, we substitute
// the larger [ChunkDataPack.Proof] and [ChunkDataPack.Collection] with their collision-resistant hashes.
// Note, ChunkDataPackHeader.ID() is the same as ChunkDataPack.ID().
//
//structwrite:immutable - mutations allowed only within the constructor
type ChunkDataPackHeader struct {
ChunkID Identifier // ID of the chunk this data pack is for
StartState StateCommitment // commitment for starting state
Expand All @@ -141,6 +146,8 @@ type ChunkDataPackHeader struct {
ExecutionDataRoot BlockExecutionDataRoot
}

// NewChunkDataPackHeader instantiates an "immutable" ChunkDataPackHeader.
// The `CollectionID` field is set to [flow.ZeroID] for system chunks.
func NewChunkDataPackHeader(ChunkID Identifier, StartState StateCommitment, ProofID Identifier, CollectionID Identifier, ExecutionDataRoot BlockExecutionDataRoot) *ChunkDataPackHeader {
return &ChunkDataPackHeader{
ChunkID: ChunkID,
Expand Down Expand Up @@ -197,8 +204,9 @@ type ChunkDataPack struct {
// a trusted ChunkDataPack using NewChunkDataPack constructor.
type UntrustedChunkDataPack ChunkDataPack

// NewChunkDataPack returns an initialized chunk data pack.
// Construction ChunkDataPack allowed only within the constructor.
// FromUntrustedChunkDataPack converts a chunk data pack from an untrusted source
// into its canonical representation. Here, basic structural validation is performed.
// Construction of ChunkDataPacks is ONLY allowed via THIS CONSTRUCTOR.
//
// All errors indicate a valid ChunkDataPack cannot be constructed from the input.
func FromUntrustedChunkDataPack(untrusted UntrustedChunkDataPack) (*ChunkDataPack, error) {
Expand Down Expand Up @@ -231,6 +239,8 @@ func FromUntrustedChunkDataPack(untrusted UntrustedChunkDataPack) (*ChunkDataPac
), nil
}

// NewChunkDataPack instantiates an "immutable" ChunkDataPack.
// The `collection` field is set to `nil` for system chunks.
func NewChunkDataPack(chunkID Identifier, startState StateCommitment, proof StorageProof, collection *Collection, executionDataRoot BlockExecutionDataRoot) *ChunkDataPack {
return &ChunkDataPack{
ChunkID: chunkID,
Expand Down
17 changes: 13 additions & 4 deletions storage/chunk_data_packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ type ChunkDataPacks interface {
// chunk data pack (or it will get slashed). This mapping from chunk ID to the ID of the chunk data pack that the Execution Node
// actually committed to is stored in the protocol database, in the following phase 2.
// - In the second phase, we populate the index mappings from ChunkID to one "distinguished" chunk data pack ID. This mapping
// is stored in the protocol database. Typically, en Execution Node uses this for indexing its own chunk data packs which it
// is stored in the protocol database. Typically, an Execution Node uses this for indexing its own chunk data packs which it
// publicly committed to.
// - This function can approximately be described as an atomic operation. When it completes successfully, either both databases
// have been updated, or neither. However, this is an approximation only, because interim states exist, where the chunk data
// packs already have been stored in the chunk data pack database, but the index mappings do not yet exist.
//
// ATOMICITY:
// [ChunkDataPacks.Store] executes phase 1 immediately, persisting the chunk data packs in their dedicated database. However,
// the index mappings in phase 2 is deferred to the caller, who must invoke the returned functor to perform phase 2. This
// approach has the following benefits:
// - Our API reflects that we are writing to two different databases here, with the chunk data pack database containing largely
// specialized data subject to pruning. In contrast, the protocol database persists the commitments a node make (subject to
// slashing). The caller receives the ability to persist this commitment in the form of the returned functor. The functor
// may be discarded by the caller without corrupting the state (if anything, we have just stored some additional chunk data
// packs).
// - The serialization and storage of the comparatively large chunk data packs is separated from the protocol database writes.
// - The locking duration of the protocol database is reduced.
//
// The Store method returns:
// - func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error: Function for populating the index mapping from chunkID
Expand Down
18 changes: 12 additions & 6 deletions storage/chunk_data_packs_stored.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@ type StoredChunkDataPacks interface {
BatchRemove(chunkDataPackIDs []flow.Identifier, rw ReaderBatchWriter) error
}

// StoredChunkDataPack is an in-storage representation of chunk data pack.
// Its prime difference is instead of an actual collection, it keeps a collection ID hence relying on maintaining
// the collection on a secondary storage.
// StoredChunkDataPack is an in-storage representation of chunk data pack. Its prime difference is instead of an
// actual collection, it keeps a collection ID hence relying on maintaining the collection on a secondary storage.
// Note, StoredChunkDataPack.ID() is the same as ChunkDataPack.ID()
//
//structwrite:immutable - mutations allowed only within the constructor
type StoredChunkDataPack struct {
ChunkID flow.Identifier
StartState flow.StateCommitment
Proof flow.StorageProof
CollectionID flow.Identifier
CollectionID flow.Identifier // flow.ZeroID for system chunks
ExecutionDataRoot flow.BlockExecutionDataRoot
}

// NewStoredChunkDataPack instantiates an "immutable" [StoredChunkDataPack].
// The `collectionID` field is set to [flow.ZeroID] for system chunks.
func NewStoredChunkDataPack(
chunkID flow.Identifier,
startState flow.StateCommitment,
Expand All @@ -58,16 +60,18 @@ func NewStoredChunkDataPack(
}
}

// IsSystemChunk returns true if this chunk data pack is for a system chunk.
func (s *StoredChunkDataPack) IsSystemChunk() bool {
return s.CollectionID == flow.ZeroID
}

// ToStoredChunkDataPack converts the given Chunk Data Pack to its reduced representation.
// (Collections are stored separately and don't need to be included again here).
func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack {
collectionID := flow.ZeroID
if c.Collection != nil {
collectionID = c.Collection.ID()
}

return NewStoredChunkDataPack(
c.ChunkID,
c.StartState,
Expand All @@ -77,7 +81,9 @@ func ToStoredChunkDataPack(c *flow.ChunkDataPack) *StoredChunkDataPack {
)
}

func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack { // ToStoredChunkDataPack converts the given ChunkDataPacks to their reduced representation,
// ToStoredChunkDataPacks converts the given Chunk Data Packs to their reduced representation.
// (Collections are stored separately and don't need to be included again here).
func ToStoredChunkDataPacks(cs []*flow.ChunkDataPack) []*StoredChunkDataPack {
scs := make([]*StoredChunkDataPack, 0, len(cs))
for _, c := range cs {
scs = append(scs, ToStoredChunkDataPack(c))
Expand Down
14 changes: 9 additions & 5 deletions storage/operation/chunk_data_packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/onflow/flow-go/storage"
)

// IndexChunkDataPackByChunkID inserts a mapping from chunk ID to stored chunk data pack ID.
// It requires the [storage.LockInsertOwnReceipt] lock to be held by the caller.
// IndexChunkDataPackByChunkID inserts a mapping from chunk ID to stored chunk data pack ID. It requires
// the [storage.LockInsertOwnReceipt] lock to be acquired by the caller and held until the write batch has been committed.
// Returns [storage.ErrDataMismatch] if a different chunk data pack ID already exists for the given chunk ID.
func IndexChunkDataPackByChunkID(lctx lockctx.Proof, rw storage.ReaderBatchWriter, chunkID flow.Identifier, chunkDataPackID flow.Identifier) error {
if !lctx.HoldsLock(storage.LockInsertOwnReceipt) {
Expand All @@ -35,7 +35,7 @@ func IndexChunkDataPackByChunkID(lctx lockctx.Proof, rw storage.ReaderBatchWrite
}

// RetrieveChunkDataPackID retrieves the stored chunk data pack ID for a given chunk ID.
// Returns [storage.ErrNotFound] if no mapping exists for the given chunk ID.
// Returns [storage.ErrNotFound] if no chunk data pack has been indexed as result for the given chunk ID.
func RetrieveChunkDataPackID(r storage.Reader, chunkID flow.Identifier, chunkDataPackID *flow.Identifier) error {
return RetrieveByKey(r, MakePrefix(codeIndexChunkDataPackByChunkID, chunkID), chunkDataPackID)
}
Expand All @@ -47,14 +47,18 @@ func RemoveChunkDataPackID(w storage.Writer, chunkID flow.Identifier) error {
}

// InsertStoredChunkDataPack inserts a [storage.StoredChunkDataPack] into the database, keyed by its own ID.
// The caller must ensure the chunkDataPackID is the same as c.ID().
//
// CAUTION: The caller must ensure `storeChunkDataPackID` is the same as `c.ID()`, ie. a collision-resistant
// hash of the chunk data pack! This method silently overrides existing data, which is safe only if for the
// same key, we always write the same value.
//
// No error returns expected during normal operations.
func InsertStoredChunkDataPack(rw storage.ReaderBatchWriter, storeChunkDataPackID flow.Identifier, c *storage.StoredChunkDataPack) error {
return UpsertByKey(rw.Writer(), MakePrefix(codeChunkDataPack, storeChunkDataPackID), c)
}

// RetrieveStoredChunkDataPack retrieves a chunk data pack by stored chunk data pack ID.
// It returns [storage.ErrNotFound] if the chunk data pack is not found
// It returns [storage.ErrNotFound] if no chunk data pack with the given ID is known.
func RetrieveStoredChunkDataPack(r storage.Reader, storeChunkDataPackID flow.Identifier, c *storage.StoredChunkDataPack) error {
return RetrieveByKey(r, MakePrefix(codeChunkDataPack, storeChunkDataPackID), c)
}
Expand Down
51 changes: 40 additions & 11 deletions storage/store/chunk_data_packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@ import (
"github.com/onflow/flow-go/storage/operation"
)

// ChunkDataPacks manages storage and retrieval of ChunkDataPacks, primarily serving the use case of EXECUTION NODES persisting
// and indexing chunk data packs for their OWN RESULTS. Essentially, the chunk describes a batch of work to be done, and the
// chunk data pack describes the result of that work. The storage of chunk data packs is segregated across different
// storage components for efficiency and modularity reasons:
// 0. Usually (ignoring the system chunk for a moment), the batch of work is given by the collection referenced in the chunk
// data pack. For any chunk data pack being stored, we assume that the executed collection has *previously* been persisted
// in [storage.Collections]. It is useful to persist the collections individually, so we can individually retrieve them.
// 1. The actual chunk data pack itself is stored in a dedicated storage component `cdpStorage`. Note that for this storage
// component, no atomicity is required, as we are storing chunk data packs by their collision-resistant hashes, so
// different chunk data packs will be stored under different keys.
// Theoretically, nodes could store persist multiple different (disagreeing) chunk data packs for the same
// chunk in this step. However, for efficiency, Execution Nodes only store their own chunk data packs.
// 2. The index mapping from ChunkID to chunkDataPackID is stored in the protocol database for fast retrieval.
// This index is intended to be populated by execution nodes when they commit to a specific result represented by the chunk
// data pack. Here, we require atomicity, as an execution node should not be changing / overwriting which chunk data pack
// it committed to (during normal operations).
//
// Since the executed collections are stored separately (step 0, above), we can just use the collection ID in context of the
// chunk data pack storage (step 1, above). Therefore, we utilize the reduced representation [storage.StoredChunkDataPack]
// internally. While removing redundant data from storage, it takes 3 look-ups to return chunk data pack by chunk ID:
//
// i. a lookup for chunkID -> chunkDataPackID
// ii. a lookup for chunkDataPackID -> StoredChunkDataPack (only has CollectionID, no collection data)
// iii. a lookup for CollectionID -> Collection, then reconstruct the chunk data pack from the collection and the StoredChunkDataPack
type ChunkDataPacks struct {
// the protocol DB is used for storing index mappings from chunk ID to chunk data pack ID
protocolDB storage.DB
Expand All @@ -27,11 +51,6 @@ type ChunkDataPacks struct {

// cache chunkID -> chunkDataPackID
chunkIDToChunkDataPackIDCache *Cache[flow.Identifier, flow.Identifier]

// it takes 3 look ups to return chunk data pack by chunk ID:
// 1. a cache lookup for chunkID -> chunkDataPackID
// 2. a lookup for chunkDataPackID -> StoredChunkDataPack (only has CollectionID, no collection data)
// 3. a lookup for CollectionID -> Collection, then restore the chunk data pack with the collection and the StoredChunkDataPack
}

var _ storage.ChunkDataPacks = (*ChunkDataPacks)(nil)
Expand Down Expand Up @@ -76,11 +95,20 @@ func NewChunkDataPacks(collector module.CacheMetrics, db storage.DB, stored stor
// chunk data pack (or it will get slashed). This mapping from chunk ID to the ID of the chunk data pack that the Execution Node
// actually committed to is stored in the protocol database, in the following phase 2.
// - In the second phase, we populate the index mappings from ChunkID to one "distinguished" chunk data pack ID. This mapping
// is stored in the protocol database. Typically, en Execution Node uses this for indexing its own chunk data packs which it
// is stored in the protocol database. Typically, an Execution Node uses this for indexing its own chunk data packs which it
// publicly committed to.
// - This function can approximately be described as an atomic operation. When it completes successfully, either both databases
// have been updated, or neither. However, this is an approximation only, because interim states exist, where the chunk data
// packs already have been stored in the chunk data pack database, but the index mappings do not yet exist.
//
// ATOMICITY:
// [ChunkDataPacks.Store] executes phase 1 immediately, persisting the chunk data packs in their dedicated database. However,
// the index mappings in phase 2 is deferred to the caller, who must invoke the returned functor to perform phase 2. This
// approach has the following benefits:
// - Our API reflects that we are writing to two different databases here, with the chunk data pack database containing largely
// specialized data subject to pruning. In contrast, the protocol database persists the commitments a node make (subject to
// slashing). The caller receives the ability to persist this commitment in the form of the returned functor. The functor
// may be discarded by the caller without corrupting the state (if anything, we have just stored some additional chunk data
// packs).
// - The serialization and storage of the comparatively large chunk data packs is separated from the protocol database writes.
// - The locking duration of the protocol database is reduced.
//
// The Store method returns:
// - func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error: Function for populating the index mapping from chunkID
Expand Down Expand Up @@ -133,7 +161,8 @@ func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) (
return nil
}

// Return the function that completes the storage process
// Returned Functor: when invoked, will add the deferred storage operations to the provided ReaderBatchWriter
// NOTE: until this functor is called, only the chunk data packs are stored by their respective IDs.
return storeChunkDataPacksFunc, nil
}

Expand Down Expand Up @@ -242,7 +271,7 @@ func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPac
return nil, fmt.Errorf("cannot retrieve stored chunk data pack %x for chunk %x: %w", chunkDataPackID, chunkID, err)
}

var collection *flow.Collection
var collection *flow.Collection // nil by default, which only represents system chunk
if schdp.CollectionID != flow.ZeroID {
collection, err = ch.collections.ByID(schdp.CollectionID)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions storage/store/chunk_data_packs_stored.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/onflow/flow-go/storage/operation"
)

// StoredChunkDataPacks represents persistent storage for chunk data packs.
// It works with the reduced representation `StoredChunkDataPack` for chunk data packs,
// where instead of the full collection data, only the collection's hash (ID) is contained.
type StoredChunkDataPacks struct {
db storage.DB
byIDCache *Cache[flow.Identifier, *storage.StoredChunkDataPack]
Expand Down
2 changes: 1 addition & 1 deletion storage/store/chunk_data_packs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestChunkDataPacks_Store(t *testing.T) {
require.Equal(t, chunkDataPack, stored, "mismatched chunk data pack at index %d", i)
}

// Store again is idemopotent
// Store again is idempotent
storeFunc, err = chunkDataPackStore.Store(chunkDataPacks)
if err != nil {
return err
Expand Down