-
Notifications
You must be signed in to change notification settings - Fork 134
Fetch configuration from DB #3407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fetch configuration from DB #3407
Conversation
7f140e4
to
00660b8
Compare
if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { | ||
state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel | ||
} | ||
cfg = syncStateToConfigProtoInCatalog(ctx, cfg, state.FlowConfigUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this > 0
behavior was added recently in #3435, this probably regresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that we no longer have or rely on state, this should be a non-issue.
flow/workflows/cdc_flow.go
Outdated
ctx workflow.Context, | ||
cfg *protos.FlowConnectionConfigs, | ||
flowJobName string, | ||
// cfg *protos.FlowConnectionConfigs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this breaks backwards compatibility. Should still take config & pull name from that to maintain function signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with keeing the protos.FlowConnectionConfig
here, is that in cases where this is too large - it will still fail.
Is the suggestion to pass both and then remove the cfg
from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, only pass cfg. new code should pass an empty cfg with only FlowJobName present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK understood, I'll go in this direction then :)
} | ||
// because we have renamed the tables. | ||
cfg.TableMappings = state.SyncFlowOptions.TableMappings | ||
return nil, errors.New("cannot start CDCFlow with Resync enabled, please drop the flow and start again") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required. There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.
2341981
to
190bc92
Compare
20f7fd4
to
0b42cca
Compare
func FetchConfigFromDB(flowName string) (*protos.FlowConnectionConfigs, error) { | ||
var configBytes sql.RawBytes | ||
dbCtx := context.Background() | ||
pool, _ := GetCatalogConnectionPoolFromEnv(dbCtx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should take context as param
) (*protos.SetupFlowOutput, error) { | ||
s.Info("executing setup flow") | ||
// gotta fetch the config from the catalog. | ||
config, err := internal.FetchConfigFromDB(flowJobName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't call this in workflows
could maybe run as side effect, but then it ends up in history
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can this not be called in workflows? 🤔
No longer passing the entire configuration object to the Temporal job as we can hit the 2MB hard limit imposed by Temporal. Instead, pass flowJobName and fetch the config from the DB in all the places required.
There are some additional changes that were required to be made, as some of the jobs are used from multiple contexts - and I had to adapt some of the information passed to ensure that they continue to work as expected.