Skip to content

Conversation

@martintomazic
Copy link
Contributor

@martintomazic martintomazic commented Aug 19, 2025

Motivation:
Whilst working on #6239, it became clear the runtime state sync could be optimized. More importantly the worker in the current form is very hard to read, maintain and reason about.

What was done:
This PR improves some of the existing issues, such as panicking, and unlikely goroutine leaks upon termination/cleanup. More importantly it sets the stage for further refactors.

Please consider this as incremental improvement. The code after this refactor is still far from optimal in terms of readability and maintenance.

Follow-up:

  1. Make diff sync independent worker, that could be tested and maintained in isolation.
  2. Optimize state sync (if possible and sensible).

update: I combined the two followups in #6242.

@netlify
Copy link

netlify bot commented Aug 19, 2025

Deploy Preview for oasisprotocol-oasis-core canceled.

Name Link
🔨 Latest commit dca1f4b
🔍 Latest deploy log https://app.netlify.com/projects/oasisprotocol-oasis-core/deploys/68a5bf210e24a50008ea797c

@martintomazic martintomazic force-pushed the martin/trivial/state-sync-refactor-1 branch from bd53717 to 329dfe3 Compare August 19, 2025 12:23
Comment on lines 362 to 363
// The request relies on the default timeout of the underlying p2p protocol clients.
//
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we move this constant out of the p2p protocol client and make timeout responsibility of the client?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which constant? This comment doesn't belong here, as a method should not rely on internal implementation of its parameters, only on their interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime),

I suggest to remove MaxGetDiffResponseTime from the p2p package and make client responsibility to define the context timeout. This is how is currently done for fetching chunks and is imo also idiomatic/correct.

Then I can remove all the comments that about this constant that you correctly pointed out were off?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I guess you could remove it. But current solution is also fine in general, the p2p layer could define its own timeout and state sync could lower it with context if needed.

Talking about internal implementation of another struct in comment is probably not the best, so I would remove it. Or instead of the comment, I would use context with deadline and use a similar timeout.

Also rename node to worker, to avoid confusion.

Ideally, the parent package (storage) would have runtime
as a prefix to make it clearer this is a runtime worker.
@martintomazic martintomazic force-pushed the martin/trivial/state-sync-refactor-1 branch from 329dfe3 to 58d1c6a Compare August 19, 2025 12:34
@codecov
Copy link

codecov bot commented Aug 19, 2025

Codecov Report

❌ Patch coverage is 78.75000% with 170 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.88%. Comparing base (26b367e) to head (58d1c6a).

Files with missing lines Patch % Lines
go/worker/storage/statesync/state_sync.go 80.55% 45 Missing and 18 partials ⚠️
go/worker/storage/statesync/checkpointer.go 65.89% 34 Missing and 10 partials ⚠️
go/worker/storage/statesync/diff_sync.go 87.79% 25 Missing and 6 partials ⚠️
go/worker/storage/statesync/checkpoint_sync.go 59.52% 12 Missing and 5 partials ⚠️
go/worker/storage/statesync/prune.go 36.36% 13 Missing and 1 partial ⚠️
go/oasis-node/cmd/node/node_control.go 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6299      +/-   ##
==========================================
+ Coverage   64.63%   64.88%   +0.25%     
==========================================
  Files         696      699       +3     
  Lines       67803    67765      -38     
==========================================
+ Hits        43824    43969     +145     
+ Misses      19013    18778     -235     
- Partials     4966     5018      +52     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@martintomazic martintomazic marked this pull request as ready for review August 19, 2025 13:03
Logic was preserved, the only thing that changed is that context
is passed explicitly and worker for creating checkpoints was
renamed.
In addition state sync worker should return an error and it should
be the caller responsibility to act accordingly. See e.g. new
workers such as stateless client.

Note that semantic changed slightly: Previously storage worker
would wait for all state sync workers to finish. Now it will
terminate when the first one finishes. Notice that this is not
100% true as previously state sync worker could panic (which
would in that case shutdown the whole node).
Probably the timeout should be the client responsibility.
Additionally, observe that the parent (storage worker) is
registered as background service, thus upon error inside state
sync worker there is no need to manually request the node
shutdown.
The code was broken into smaller functions. Also the
scope of variables (including channels) have been reduced.

Semantics as well as performance should stay the same.
The logic was preserved. Ideally, diff sync would only accept
context, local storage backend, and client/interface to fetch
diff. This would make it testable in isolation.

Finally, use of undefined round should be moved out of it.
Previously, if the worker returned an error it would exit main
for loop and wait for the waitgroup to be emptied. However,
this is not possible as there is no one that is reading
the fetched diffs.
In case of termination due to error exiting main for loop or
canceled context there is no point in waiting for go routines
to finish fetching/doing the cleanup. As long we cancel the
context for them and use it properly in the select statements
this should be safe and better.
@martintomazic martintomazic force-pushed the martin/trivial/state-sync-refactor-1 branch from 58d1c6a to dca1f4b Compare August 20, 2025 12:27
// 5. Registering node availability when it has synced sufficiently close to
// the latest known block header.
//
// Suggestion: This worker should not be responsible for creating and advertising p2p related stuff.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this since it’s already obvious from a "good programming" perspective. If this is a TODO, then the comment should be a bit different.

}

n.logger.Info("starting committee node")
w.logger.Info("starting state sycne worker")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
w.logger.Info("starting state sycne worker")
w.logger.Info("starting state sync worker")

I prefer only starting, as the logger should in general contain the name of the worker.

}

// Start storage node for every runtime.
// Start state sync worker for every runtime.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you improve this comment, as is doesn't make sense since we only register runtime.

}
}

// Suggestion: Limit the max time for restoring checkpoint.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a TODO and could be done now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this yes.

Any idea what would be a good context timeout that we don't shoot ourselves in the foot if the state becomes bigger and bigger and thus restoration longer...

Definitely would prefer to not make this configurable, but we should have sufficient extra time. I think currently it takes around 10-20 min on my machine to restore from the checkpoint...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea. You can start with a loose timeout and we can stricken it latter.

Comment on lines 362 to 363
// The request relies on the default timeout of the underlying p2p protocol clients.
//
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which constant? This comment doesn't belong here, as a method should not rely on internal implementation of its parameters, only on their interface.

"current_round", blk.Header.Round,
)
panic("can't get block in storage worker")
return fmt.Errorf("getting block for round %d (current round: %d): %w", i, blk.Header.Round, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be failed to ...

func (w *Worker) worker() { // nolint: gocyclo
defer close(w.quitCh)
// Run runs state sync worker.
func (w *Worker) Run(ctx context.Context) error { // nolint: gocyclo
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to Serve to implement Service interface.

for _, r := range w.runtimes {
<-r.Quit()
}
_ = w.Serve() // error logged as part of Serve already.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move everything from Start and Stop to Serve, so that we can latter replace the service manager with the one that accepts Services.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I focused on making the common committee node stateless with regards to context, will also do explicit context passing for the storage worker here. +1.

Copy link
Collaborator

@peternose peternose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though you refactored the state sync worker, I think that there is still a lot of things to be done as the code is still very unclear in functions are way too long. I would try to break the worker into sub-workers, to break down the code into smaller pieces which are easier to understand.

return
}

// Wait for the common node to be initialized.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed.

summaryCache := make(map[uint64]*blockSummary)
// Create the fetcher pool.
pendingApply := &minRoundQueue{}
pendingFinalize := &minRoundQueue{} // Suggestion: slice would suffice given that application must happen in order.
Copy link
Collaborator

@peternose peternose Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appears to make sense only from the refactorer's perspective, and may be confusing to others. I recommend removing it or addressing the slice change in a separate commit.

}
}
}
heartbeat := heartbeat{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
heartbeat := heartbeat{}
var heartbeat heartbeat

lastDiff.pf.RecordSuccess()
}
}
err := w.apply(ctx, lastDiff)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move 1 line down and into if.

delete(summaryCache, lastDiff.round-1)
lastFullyAppliedRound = lastDiff.round

// Suggestion: Rename to lastAppliedRoundMetric, as synced is often synonim for finalized in this code.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These suggestions are the same as TODO. Either you do them or not.

func (w *Worker) fetchDiff(ctx context.Context, round uint64, prevRoot, thisRoot storageApi.Root) {
func (w *Worker) triggerRoundFetches(
ctx context.Context,
wg *sync.WaitGroup,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoid passing a wait groups and channels around because it makes it unclear which parts of the code are blocking/using it.

I would try to create two fetchers, a rounder and a differ, which would accept tasks and do their job. The rounder would internally use wait group, while the differ would expose diff channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try to create two fetchers, a rounder and a differ, which would accept tasks and do their job. The rounder would internally use wait group, while the differ would expose diff channel.

Can you elaborate a bit more?

The final refactor I had in mind was done here: c7f230b. See 3x g.Go(...). Arguably could be improved further.

If you can write few sentences about how many workers and high level responsibilities of each that would be awesome! :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something like this, plus maybe some additional structs for Apply and Finalize. Simple structs, as little fields as possible, easy to test.

Note that this code might have errors.

type Worker struct { 
	nudger *availabilityNudger
	header *headerFetcher
	differ *diffFetcher
	...
}    
// availabilityNudger tracks the progress of last and last synced rounds
// and “nudges” role providers to mark themselves available or unavailable
// based on how closely the node is keeping up with consensus.
type availabilityNudger struct {
	roleProvider    registration.RoleProvider
	rpcRoleProvider registration.RoleProvider
	roleAvailable   bool

	lastRound       uint64
	lastSyncedRound uint64
}

// newAvailabilityNudger creates a new availability nudger.
func newAvailabilityNudger(localProvider, rpcProvider registration.RoleProvider) *availabilityNudger {
	return &availabilityNudger{
		roleProvider:    localProvider,
		rpcRoleProvider: rpcProvider,
		lastRound:       math.MaxUint64,
		lastSyncedRound: math.MaxUint64,
	}
}

// setLastRound updates the last round number.
func (m *availabilityNudger) setLastRound(round uint64) {
	m.lastRound = round
}

// setLastSyncedRound updates the most recently synced round number.
func (m *availabilityNudger) setLastSyncedRound(round uint64) {
	m.lastSyncedRound = round
}

// updateAvailability updates the role's availability based on the gap
// between the last round and the last synced round.
func (m *availabilityNudger) updateAvailability() {
	if m.lastRound == math.MaxUint64 || m.lastSyncedRound == math.MaxUint64 {
		return
	}
	if m.lastRound > m.lastSyncedRound {
		return
	}

	switch roundLag := m.lastRound - m.lastSyncedRound; {
	case roundLag < maximumRoundDelayForAvailability:
		m.markAvailable()
	case roundLag > minimumRoundDelayForUnavailability:
		m.markUnavailable()
	}
}

// markAvailable sets the role as available if it is not already.
func (m *availabilityNudger) markAvailable() {
	if m.roleAvailable {
		return
	}
	m.roleAvailable = true

	m.roleProvider.SetAvailable(func(*node.Node) error { return nil })
	if m.rpcRoleProvider != nil {
		m.rpcRoleProvider.SetAvailable(func(*node.Node) error { return nil })
	}
}

// markUnavailable sets the role as unavailable if it is currently available.
func (m *availabilityNudger) markUnavailable() {
	if !m.roleAvailable {
		return
	}
	m.roleAvailable = false

	m.roleProvider.SetUnavailable()
	if m.rpcRoleProvider != nil {
		m.rpcRoleProvider.SetUnavailable()
	}
}
// summaryCache is a concurrent-safe cache for block summaries.
type summaryCache struct {
	mu    sync.Mutex
	cache map[uint64]*blockSummary
}

// newSummaryCache creates a new summary cache.
func newSummaryCache() *summaryCache {
	return &summaryCache{
		cache: make(map[uint64]*blockSummary),
	}
}

// set adds the given summary to the cache.
func (s *summaryCache) set(round uint64, summary *blockSummary) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.cache[round] = summary
}

// get returns a summary from the cache.
func (s *summaryCache) get(round uint64) (*blockSummary, bool) {
	s.mu.Lock()
	defer s.mu.Unlock()
	summary, ok := s.cache[round]
	return summary, ok
}

// delete removes a summary from the cache.
func (s *summaryCache) delete(round uint64) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.cache, round)
}
// headerFetcher is responsible for fetching block headers and populating the summary cache.
type headerFetcher struct {
	history   history.History
	summaries *summaryCache
}

func newHeaderFetcher(history history.History, summaries *summaryCache) *headerFetcher {
	return &headerFetcher{
		history:   history,
		summaries: summaries,
	}
}

// fetch fetches the block header for the given round and populates the summary cache.
func (f *headerFetcher) fetch(ctx context.Context, round uint64, blk *block.Block) error {
	if _, ok := f.summaries.get(round); !ok && round == math.MaxUint64 {
		dummy := blockSummary{
			Namespace: blk.Header.Namespace,
			Round:     round + 1,
			Roots: []api.Root{
				{
					Version: round + 1,
					Type:    api.RootTypeIO,
				},
				{
					Version: round + 1,
					Type:    api.RootTypeState,
				},
			},
		}
		dummy.Roots[0].Empty()
		dummy.Roots[1].Empty()
		f.summaries.set(round, &dummy)
	}
	// Determine if we need to fetch any old block summaries. In case the first
	// round is an undefined round, we need to start with the following round
	// since the undefined round may be unsigned -1 and in this case the loop
	// would not do any iterations.
	startSummaryRound := round
	if startSummaryRound == math.MaxUint64 {
		startSummaryRound++
	}
	for i := startSummaryRound; i < blk.Header.Round; i++ {
		if _, ok := f.summaries.get(i); ok {
			continue
		}
		oldBlock, err := f.history.GetCommittedBlock(ctx, i)
		if err != nil {
			return fmt.Errorf("getting block for round %d (current round: %d): %w", i, blk.Header.Round, err)
		}
		summary := summaryFromBlock(oldBlock)
		f.summaries.set(i, summary)
	}
	if _, ok := f.summaries.get(blk.Header.Round); !ok {
		summary := summaryFromBlock(blk)
		f.summaries.set(blk.Header.Round, summary)
	}
	return nil
}
// diffFetcher is responsible for fetching storage diffs.
type diffFetcher struct {
	diffSync          diffsync.Client
	legacyStorageSync synclegacy.Client

	localStorage storageApi.LocalBackend

	syncingRounds map[uint64]*inFlight
	summaries     *summaryCache

	pool *workerpool.Pool

	ch chan *fetchedDiff
}

// fetch fetches the storage diff for the given rounds.
func (f *diffFetcher) fetch(ctx context.Context, start uint64, end uint64) {
	for round := start; round <= end; round++ {
		f.fetchRound(ctx, round)
	}
}

// fetch fetches the storage diff for the given round.
func (f *diffFetcher) fetchRound(ctx context.Context, round uint64) {
	syncing, ok := f.syncingRounds[round]
	if !ok {
		if len(f.syncingRounds) >= maxInFlightRounds {
			return
		}

		syncing = &inFlight{
			startedAt:     time.Now(),
			awaitingRetry: outstandingMaskFull,
		}
		f.syncingRounds[round] = syncing
	}
	if syncing.outstanding.hasAll() {
		return
	}

	prev, _ := f.summaries.get(round - 1)
	curr, _ := f.summaries.get(round)

	prevRoots := make([]api.Root, len(prev.Roots))
	copy(prevRoots, prev.Roots)

	for i := range prevRoots {
		if prevRoots[i].Type == api.RootTypeIO {
			// IO roots aren't chained, so clear it (but leave cache intact).
			prevRoots[i] = api.Root{
				Namespace: curr.Namespace,
				Version:   curr.Round,
				Type:      api.RootTypeIO,
			}
			prevRoots[i].Hash.Empty()
			break
		}
	}

	for i := range prevRoots {
		rootType := prevRoots[i].Type
		if syncing.outstanding.contains(rootType) {
			continue
		}
		if !syncing.awaitingRetry.contains(rootType) {
			continue
		}
		syncing.scheduleDiff(rootType)

		f.pool.Submit(func() {
			f.retryDiff(ctx, curr.Round, prevRoots[i], curr.Roots[i])
		})

	}
}

// fetch fetches the storage diff for the given round and schedules a retry on failure.
func (f *diffFetcher) retryDiff(ctx context.Context, round uint64, prevRoot, thisRoot api.Root) {
	diff, err := f.getDiff(ctx, round, prevRoot, thisRoot)
	if err != nil {
		f.syncingRounds[round].retry(thisRoot.Type)
		return
	}

	select {
	case f.ch <- diff:
	case <-ctx.Done():
	}
}

// getDiff fetches the storage diff for the local storage or remote peers.
func (f *diffFetcher) getDiff(ctx context.Context, round uint64, prevRoot, thisRoot api.Root) (*fetchedDiff, error) {
	result := &fetchedDiff{
		pf:       rpc.NewNopPeerFeedback(),
		round:    round,
		prevRoot: prevRoot,
		thisRoot: thisRoot,
	}

	// Check if the new root doesn't already exist.
	if !f.localStorage.NodeDB().HasRoot(thisRoot) {
		return result, nil
	}

	result.fetched = true

	// Even if HasRoot returns false the root can still exist if it is equal
	// to the previous root and the root was emitted by the consensus committee
	// directly (e.g., during an epoch transition).
	if thisRoot.Hash.Equal(&prevRoot.Hash) {
		result.writeLog = api.WriteLog{}
		return result, nil
	}

	wl, pf, err := f.fetchDiff(ctx, prevRoot, thisRoot)
	if err != nil {
		return nil, err
	}

	result.pf = pf
	result.writeLog = wl

	return result, nil
}

// fetchDiff fetches writelog using diff sync p2p protocol client.
//
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
func (w *diffFetcher) fetchDiff(ctx context.Context, start, end api.Root) (api.WriteLog, rpc.PeerFeedback, error) {
	rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{
		StartRoot: start,
		EndRoot:   end,
	})
	if err == nil { // if NO error
		return rsp1.WriteLog, pf, nil
	}

	rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{
		StartRoot: start,
		EndRoot:   end,
	})
	if err != nil {
		return nil, nil, err
	}
	return rsp2.WriteLog, pf, nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Indeed this approach is more readable compared to what I tried in 9a14979 (i.e. I tried avoding mutex for any cost - bad).

Anyways, will prepare one PR in-front and then I co-author you for this part :)

Copy link
Contributor Author

@martintomazic martintomazic Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also create the checkpointer struct, that requires it's last finalized round to be updated, just like the nudger.

But should state sync worker really be responsible for this orchestration and know about role providers, consensus checkpoints, etc.

How about we make it only responsible for the state initialization, checkpoint sync and diff sync. Moreover the diff sync is another worker, consisting of few structs that you provided above.

Nudger, checkpointer and pruner should instead be moved out, (possible each into independent package under /storage/*, like is the case with statesync rn). Instead of pushing the updates to them, they can watchFinalizedRounds.

Then inside storage.Worker where we registerRuntime, we could have per runtime orchestration worker (nudger, checkpointer, statesync) that also implements hook interface + fan-out of new blocks coming from the hook subscription.

Over-engineering? Code wise looks even simpler to me, maybe a bit unsure how to organize/name the packages if we go this way...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also create the checkpointer struct, that requires it's last finalized round to be updated, just like the nudger.

Probably, I just gave you few examples.

But should state sync worker really be responsible for this orchestration and know about role providers, consensus checkpoints, etc.

Maybe not, unless we rename it.

How about we make it only responsible for the state initialization, checkpoint sync and diff sync. Moreover the diff sync is another worker, consisting of few structs that you provided above.

Can we do this in 2 steps, where the second step would be what you have written above?

//
// Suggestion: Ideally syncDiffs is refactored into independent worker and made only
// responsible for the syncing.
func (w *Worker) syncDiffs(
Copy link
Collaborator

@peternose peternose Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is nice to separate diff and state, I don't prefer having worker functions scattered across multiple files. Having a sub-workers for this would solve the problem, and the main worker would just delegate tasks.

Copy link
Contributor Author

@martintomazic martintomazic Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though you refactored the state sync worker, I think that there is still a lot of things to be done as the code is still very unclear in functions are way too long. I would try to break the worker into sub-workers, to break down the code into smaller pieces which are easier to understand.

I guess its mostly about getting there incrementally. Anyways, I see your point also so there is a proposal below.

While it is nice to separate diff and state, I don't prefer having worker functions scattered across multiple files.

Not sure I agree with this else why we already have checkpoint_sync.go and checkpointer.go files. I do find the code organised in this way (even if part of the same worker) way easier to navigate and reason.

Ideally, we should also factor out state initialization out of the main worker into separate file (or at least function). Finally CheckpointSyncRetry could be moved to checkpoint_sync.go. But this would really be out of scope, as I am focusing on the diff sync here mostly.

Having a sub-workers for this would solve the problem, and the main worker would just delegate tasks.

I tried to do this in the follow-up as I think this would be way to much for one PR. See 9a14979, where this becomes an independent worker.

How about I cut this PR into two:

  1. All commits from the start all the way to avoiding panic (including).
  2. Make storage worker stateless with regards to context/use adapter you suggested (new)
  3. Add timeout to checkpoint restoration (new)
  4. Avoid potential deadlock on the clean-up.
  5. Make checkpoint.Checkpointer not require to take ctx - go/worker/storage: Refactor state sync worker #6299 (comment)

The follow-up can then be creating diff sync independent worker which on a high level consists of:

  1. Refactor - 213b17f.
  2. Move to separate file
  3. Make independent worker - 9a14979

Possibly refactor - 213b17f, could be done as part of the first PR already given that it makes code easier to reason about and is useful in itself.

Maybe you would prefer that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I agree with this else why we already have checkpoint_sync.go and checkpointer.go files. I do find the code organised in this way (even if part of the same worker) way easier to navigate and reason.

If you want to split a struct in two files, this means that it has at least two responsibilities and could probably be refactored.

Ideally, we should also factor out state initialization out of the main worker into separate file (or at least function). Finally CheckpointSyncRetry could be moved to checkpoint_sync.go. But this would really be out of scope, as I am focusing on the diff sync here mostly.

Probably yes.

How about I cut this PR into two:

Yes, that would be nice and we could review and merge quicker.

fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String())
fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount)
defer fetchPool.Stop()
fetchCtx, cancel := context.WithCancel(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just do this on the top ctx, cancel := context.WithCancel(ctx)?

rootType := prevRoots[i].Type
if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) {
syncing.scheduleDiff(rootType)
wg.Add(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like leaving go routines to be killed once the program exists as they might corrupt something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor runtime storage committee worker into smaller and independent workers

3 participants