Skip to content

Commit b580890

Browse files
committed
graph/db: migrate channels and polices to SQL
In this commit, the `MigrateGraphToSQL` function is expanded to migrate the channel and channe policy data. Both of these have the special case where the kvdb store records may contain invalid TLV. If we encounter a channel with invalid TLV, we skip it and its policies. If we encounter a policy with invalid TLV, we skip it. The `TestMigrateGraphToSQL` and `TestMigrationWithChannelDB` tests are updated accordingly.
1 parent 61c1d25 commit b580890

File tree

3 files changed

+714
-5
lines changed

3 files changed

+714
-5
lines changed

graph/db/kv_store.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4497,8 +4497,9 @@ func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
44974497
//
44984498
// TODO(halseth): get rid of these invalid policies in a
44994499
// migration.
4500-
// TODO(elle): complete the above TODO in migration from kvdb
4501-
// to SQL.
4500+
//
4501+
// NOTE: the above TODO was completed in the SQL migration and
4502+
// so such edge cases no longer need to be handled there.
45024503
oldEdgePolicy, err := deserializeChanEdgePolicy(
45034504
bytes.NewReader(edgeBytes),
45044505
)

graph/db/sql_migration.go

Lines changed: 282 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
// only for now and will be called from the main lnd binary once the
2424
// migration is fully implemented and tested.
2525
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
26-
sqlDB SQLQueries, _ chainhash.Hash) error {
26+
sqlDB SQLQueries, chain chainhash.Hash) error {
2727

2828
log.Infof("Starting migration of the graph store from KV to SQL")
2929
t0 := time.Now()
@@ -48,6 +48,13 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
4848
return fmt.Errorf("could not migrate source node: %w", err)
4949
}
5050

51+
// 3) Migrate all the channels and channel policies.
52+
err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
53+
if err != nil {
54+
return fmt.Errorf("could not migrate channels and policies: %w",
55+
err)
56+
}
57+
5158
log.Infof("Finished migration of the graph store from KV to SQL in %v",
5259
time.Since(t0))
5360

@@ -261,3 +268,277 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
261268

262269
return nil
263270
}
271+
272+
// migrateChannelsAndPolicies migrates all channels and their policies
273+
// from the KV backend to the SQL database.
274+
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
275+
sqlDB SQLQueries, chain chainhash.Hash) error {
276+
277+
var (
278+
channelCount uint64
279+
skippedChanCount uint64
280+
policyCount uint64
281+
skippedPolicyCount uint64
282+
)
283+
migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
284+
// If the policy is nil, we can skip it.
285+
if policy == nil {
286+
return nil
287+
}
288+
289+
// Unlike the special case of invalid TLV bytes for node and
290+
// channel announcements, we don't need to handle the case for
291+
// channel policies here because it is already handled in the
292+
// `forEachChannel` function. If the policy has invalid TLV
293+
// bytes, then `nil` will be passed to this function.
294+
295+
policyCount++
296+
297+
_, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
298+
if err != nil {
299+
return fmt.Errorf("could not migrate channel "+
300+
"policy %d: %w", policy.ChannelID, err)
301+
}
302+
303+
return nil
304+
}
305+
306+
// Iterate over each channel in the KV store and migrate it and its
307+
// policies to the SQL database.
308+
err := forEachChannel(kvBackend, func(channel *models.ChannelEdgeInfo,
309+
policy1 *models.ChannelEdgePolicy,
310+
policy2 *models.ChannelEdgePolicy) error {
311+
312+
scid := channel.ChannelID
313+
314+
// Here, we do a sanity check to ensure that the chain hash of
315+
// the channel returned by the KV store matches the expected
316+
// chain hash. This is important since in the SQL store, we will
317+
// no longer explicitly store the chain hash in the channel
318+
// info, but rather rely on the chain hash LND is running with.
319+
// So this is our way of ensuring that LND is running on the
320+
// correct network at migration time.
321+
if channel.ChainHash != chain {
322+
return fmt.Errorf("channel %d has chain hash %s, "+
323+
"expected %s", scid, channel.ChainHash, chain)
324+
}
325+
326+
// Sanity check to ensure that the channel has valid extra
327+
// opaque data. If it does not, we'll skip it. We need to do
328+
// this because previously we would just persist any TLV bytes
329+
// that we received without validating them. Now, however, we
330+
// normalise the storage of extra opaque data, so we need to
331+
// ensure that the data is valid. We don't want to abort the
332+
// migration if we encounter a channel with invalid extra opaque
333+
// data, so we'll just skip it and log a warning.
334+
_, err := marshalExtraOpaqueData(channel.ExtraOpaqueData)
335+
if errors.Is(err, ErrParsingExtraTLVBytes) {
336+
log.Warnf("Skipping channel %d with invalid "+
337+
"extra opaque data: %v", scid,
338+
channel.ExtraOpaqueData)
339+
340+
skippedChanCount++
341+
342+
// If we skip a channel, we also skip its policies.
343+
if policy1 != nil {
344+
skippedPolicyCount++
345+
}
346+
if policy2 != nil {
347+
skippedPolicyCount++
348+
}
349+
350+
return nil
351+
} else if err != nil {
352+
return fmt.Errorf("unable to marshal extra opaque "+
353+
"data for channel %d (%v): %w", scid,
354+
channel.ExtraOpaqueData, err)
355+
}
356+
357+
channelCount++
358+
err = migrateSingleChannel(
359+
ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
360+
)
361+
if err != nil {
362+
return fmt.Errorf("could not migrate channel %d: %w",
363+
scid, err)
364+
}
365+
366+
return nil
367+
})
368+
if err != nil {
369+
return fmt.Errorf("could not migrate channels and policies: %w",
370+
err)
371+
}
372+
373+
log.Infof("Migrated %d channels and %d policies from KV to SQL "+
374+
"(skipped %d channels and %d policies due to invalid TLV "+
375+
"streams)", channelCount, policyCount, skippedChanCount,
376+
skippedPolicyCount)
377+
378+
return nil
379+
}
380+
381+
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
382+
channel *models.ChannelEdgeInfo,
383+
policy1, policy2 *models.ChannelEdgePolicy,
384+
migChanPolicy func(*models.ChannelEdgePolicy) error) error {
385+
386+
scid := channel.ChannelID
387+
388+
// First, migrate the channel info along with its policies.
389+
dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
390+
if err != nil {
391+
return fmt.Errorf("could not insert record for channel %d "+
392+
"in SQL store: %w", scid, err)
393+
}
394+
395+
// Now, migrate the two channel policies.
396+
err = migChanPolicy(policy1)
397+
if err != nil {
398+
return fmt.Errorf("could not migrate policy1(%d): %w", scid,
399+
err)
400+
}
401+
err = migChanPolicy(policy2)
402+
if err != nil {
403+
return fmt.Errorf("could not migrate policy2(%d): %w", scid,
404+
err)
405+
}
406+
407+
// Now, fetch the channel and its policies from the SQL DB.
408+
row, err := sqlDB.GetChannelBySCIDWithPolicies(
409+
ctx, sqlc.GetChannelBySCIDWithPoliciesParams{
410+
Scid: channelIDToBytes(scid),
411+
Version: int16(ProtocolV1),
412+
},
413+
)
414+
if err != nil {
415+
return fmt.Errorf("could not get channel by SCID(%d): %w", scid,
416+
err)
417+
}
418+
419+
// Assert that the DB IDs for the channel and nodes are as expected
420+
// given the inserted channel info.
421+
err = sqldb.CompareRecords(
422+
dbChanInfo.channelID, row.Channel.ID, "channel DB ID",
423+
)
424+
if err != nil {
425+
return err
426+
}
427+
err = sqldb.CompareRecords(
428+
dbChanInfo.node1ID, row.Node.ID, "node1 DB ID",
429+
)
430+
if err != nil {
431+
return err
432+
}
433+
err = sqldb.CompareRecords(
434+
dbChanInfo.node2ID, row.Node_2.ID, "node2 DB ID",
435+
)
436+
if err != nil {
437+
return err
438+
}
439+
440+
migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
441+
ctx, sqlDB, row, channel.ChainHash,
442+
)
443+
if err != nil {
444+
return fmt.Errorf("could not build migrated channel and "+
445+
"policies: %w", err)
446+
}
447+
448+
// Finally, compare the original channel info and
449+
// policies with the migrated ones to ensure they match.
450+
if len(channel.ExtraOpaqueData) == 0 {
451+
channel.ExtraOpaqueData = nil
452+
}
453+
if len(migChan.ExtraOpaqueData) == 0 {
454+
migChan.ExtraOpaqueData = nil
455+
}
456+
457+
err = sqldb.CompareRecords(
458+
channel, migChan, fmt.Sprintf("channel %d", scid),
459+
)
460+
if err != nil {
461+
return err
462+
}
463+
464+
checkPolicy := func(expPolicy,
465+
migPolicy *models.ChannelEdgePolicy) error {
466+
467+
switch {
468+
// Both policies are nil, nothing to compare.
469+
case expPolicy == nil && migPolicy == nil:
470+
return nil
471+
472+
// One of the policies is nil, but the other is not.
473+
case expPolicy == nil || migPolicy == nil:
474+
return fmt.Errorf("expected both policies to be "+
475+
"non-nil. Got expPolicy: %v, "+
476+
"migPolicy: %v", expPolicy, migPolicy)
477+
478+
// Both policies are non-nil, we can compare them.
479+
default:
480+
}
481+
482+
if len(expPolicy.ExtraOpaqueData) == 0 {
483+
expPolicy.ExtraOpaqueData = nil
484+
}
485+
if len(migPolicy.ExtraOpaqueData) == 0 {
486+
migPolicy.ExtraOpaqueData = nil
487+
}
488+
489+
return sqldb.CompareRecords(
490+
*expPolicy, *migPolicy, "channel policy",
491+
)
492+
}
493+
494+
err = checkPolicy(policy1, migPol1)
495+
if err != nil {
496+
return fmt.Errorf("policy1 mismatch for channel %d: %w", scid,
497+
err)
498+
}
499+
500+
err = checkPolicy(policy2, migPol2)
501+
if err != nil {
502+
return fmt.Errorf("policy2 mismatch for channel %d: %w", scid,
503+
err)
504+
}
505+
506+
return nil
507+
}
508+
509+
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
510+
row sqlc.GetChannelBySCIDWithPoliciesRow,
511+
chain chainhash.Hash) (*models.ChannelEdgeInfo,
512+
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
513+
514+
node1, node2, err := buildNodeVertices(
515+
row.Node.PubKey, row.Node_2.PubKey,
516+
)
517+
if err != nil {
518+
return nil, nil, nil, err
519+
}
520+
521+
edge, err := getAndBuildEdgeInfo(
522+
ctx, db, chain, row.Channel.ID, row.Channel, node1, node2,
523+
)
524+
if err != nil {
525+
return nil, nil, nil, fmt.Errorf("unable to build channel "+
526+
"info: %w", err)
527+
}
528+
529+
dbPol1, dbPol2, err := extractChannelPolicies(row)
530+
if err != nil {
531+
return nil, nil, nil, fmt.Errorf("unable to extract channel "+
532+
"policies: %w", err)
533+
}
534+
535+
policy1, policy2, err := getAndBuildChanPolicies(
536+
ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
537+
)
538+
if err != nil {
539+
return nil, nil, nil, fmt.Errorf("unable to build channel "+
540+
"policies: %w", err)
541+
}
542+
543+
return edge, policy1, policy2, nil
544+
}

0 commit comments

Comments
 (0)