Skip to content

Adds filter to target specific cluster connections #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions KustoSchemaTools/KustoSchemaHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,26 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
public YamlDatabaseHandlerFactory<T> YamlDatabaseHandlerFactory { get; }
public KustoDatabaseHandlerFactory<T> KustoDatabaseHandlerFactory { get; }

public async Task<(string markDown, bool isValid)> GenerateDiffMarkdown(string path, string databaseName)
public Clusters FilterClusters(Clusters clusters, List<string>? includedConnections)
{
// Connections may be marked inactive, so we need to filter them out
// However, if includedConnections is provided we should only process those regardless of the IsActive flag
var includedConnectionsIsNullOrEmpty = includedConnections == null || includedConnections.Count == 0;
var filteredConnections = includedConnectionsIsNullOrEmpty
? clusters.Connections.Where(cluster => cluster.IsActive).ToList()
: clusters.Connections.Where(cluster => includedConnections != null && includedConnections.Any(name => string.Equals(name, cluster.Name, StringComparison.OrdinalIgnoreCase))).ToList();

return new Clusters
{
Connections = filteredConnections
};
}

public async Task<(string markDown, bool isValid)> GenerateDiffMarkdown(string path, string databaseName, List<string>? includedConnections = null)
{
var clustersFile = File.ReadAllText(Path.Combine(path, "clusters.yml"));
var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize<Clusters>(clustersFile);
clusters = FilterClusters(clusters, includedConnections);
var sb = new StringBuilder();
bool isValid = true;

Expand Down Expand Up @@ -74,20 +89,18 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
}

var scriptSb = new StringBuilder();
foreach(var script in changes.SelectMany(itm => itm.Scripts).Where(itm => itm.IsValid == true).OrderBy(itm => itm.Order))
foreach (var script in changes.SelectMany(itm => itm.Scripts).Where(itm => itm.IsValid == true).OrderBy(itm => itm.Order))
{
scriptSb.AppendLine(script.Text);
}

Log.LogInformation($"Following scripts will be applied:\n{scriptSb}");
}

foreach(var follower in yamlDb.Followers)
foreach (var follower in yamlDb.Followers)
{

Log.LogInformation($"Generating diff markdown for {Path.Combine(path, databaseName)} => {follower.Key}/{follower.Value.DatabaseName}");


var followerClient = new KustoClient(follower.Key);
var oldModel = FollowerLoader.LoadFollower(follower.Value.DatabaseName, followerClient);

Expand All @@ -100,7 +113,7 @@ public KustoSchemaHandler(ILogger<KustoSchemaHandler<T>> schemaHandlerLogger, Ya
foreach (var change in changes)
{
sb.AppendLine(change.Markdown);
sb.AppendLine();
sb.AppendLine();
}
}
return (sb.ToString(), isValid);
Expand All @@ -110,14 +123,15 @@ public async Task Import(string path, string databaseName, bool includeColumns)
{
var clustersFile = File.ReadAllText(Path.Combine(path, "clusters.yml"));
var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize<Clusters>(clustersFile);

clusters = FilterClusters(clusters, null);

var escapedDbName = databaseName.BracketIfIdentifier();
var dbHandler = KustoDatabaseHandlerFactory.Create(clusters.Connections[0].Url, escapedDbName);

var db = await dbHandler.LoadAsync();
if (includeColumns == false)
{
foreach(var table in db.Tables.Values)
foreach (var table in db.Tables.Values)
{
table.Columns = new Dictionary<string, string>();
}
Expand All @@ -128,16 +142,16 @@ public async Task Import(string path, string databaseName, bool includeColumns)
}


public async Task<ConcurrentDictionary<string,Exception>> Apply(string path, string databaseName)
public async Task<ConcurrentDictionary<string, Exception>> Apply(string path, string databaseName, List<string>? includedConnections = null)
{
var clustersFile = File.ReadAllText(Path.Combine(path, "clusters.yml"));
var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize<Clusters>(clustersFile);

clusters = FilterClusters(clusters, includedConnections);
var escapedDbName = databaseName.BracketIfIdentifier();
var yamlHandler = YamlDatabaseHandlerFactory.Create(path, databaseName);
var yamlDb = await yamlHandler.LoadAsync();

var results = new ConcurrentDictionary<string,Exception>();
var results = new ConcurrentDictionary<string, Exception>();

await Parallel.ForEachAsync(clusters.Connections, async (cluster, token) =>
{
Expand Down
1 change: 1 addition & 0 deletions KustoSchemaTools/Model/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class Cluster
public string Name { get; set; }
public string Url { get; set; }
public List<DatabaseScript> Scripts { get; set; } = new List<DatabaseScript>();
public bool IsActive { get; set; } = true;
}

}