Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion FanOutFanInCrawler/Orchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,98 @@ public static async Task<HttpResponseMessage> HttpStart(
return client.CreateCheckStatusResponse(req, instanceId);
}

public static async Task CleanOldRepositoryData(
[ActivityTrigger] IDurableActivityContext context,
ILogger log)
{
// 1) Read input and apply default if invalid
int daysToRetain = context.GetInput<int>();
if (daysToRetain <= 0)
{
daysToRetain = 90;
log.LogWarning("Invalid retention period supplied; defaulting to 90 days.");
}
log.LogInformation($"Starting cleanup: removing entries older than {daysToRetain} days.");

// 2) Parse storage connection string
string storageConn = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
if (string.IsNullOrWhiteSpace(storageConn))
{
log.LogError("Environment variable 'AzureWebJobsStorage' is not set.");
return;
}

if (!CloudStorageAccount.TryParse(storageConn, out CloudStorageAccount storageAccount))
{
log.LogError("Failed to parse Azure storage connection string.");
return;
}

// 3) Get table reference
var tableClient = storageAccount.CreateCloudTableClient();
var table = tableClient.GetTableReference("Repositories");

if (!await table.ExistsAsync())
{
log.LogWarning("Table 'Repositories' does not exist—nothing to clean.");
return;
}

// 4) Compute cutoff date
DateTimeOffset cutoff = DateTimeOffset.UtcNow.AddDays(-daysToRetain);

// 5) Build a query that runs server-side
string filter = TableQuery.GenerateFilterConditionForDate(
"Timestamp",
QueryComparisons.LessThan,
cutoff);
var query = new TableQuery<Repository>().Where(filter);

// 6) Execute query in segments
var toDelete = new List<Repository>();
TableContinuationToken token = null;
do
{
var segment = await table.ExecuteQuerySegmentedAsync(query, token);
token = segment.ContinuationToken;
toDelete.AddRange(segment.Results);
} while (token != null);

if (!toDelete.Any())
{
log.LogInformation("No repository entries older than cutoff date were found.");
return;
}

log.LogInformation($"Found {toDelete.Count} entries to delete.");

// 7) Delete in batches of up to 100
int deletedCount = 0;
foreach (var batch in toDelete.Chunk(100))
{
var batchOp = new TableBatchOperation();
foreach (var entity in batch)
{
batchOp.Delete(entity);
}

try
{
await table.ExecuteBatchAsync(batchOp);
deletedCount += batch.Count;
log.LogInformation($"Deleted batch of {batch.Count} entries.");
}
catch (StorageException ex)
{
log.LogError($"Error deleting batch: {ex.Message}");
// Optionally implement retry logic here
}
}

log.LogInformation($"Cleanup finished. Total deleted: {deletedCount}");
}


[FunctionName("Orchestrator")]
public static async Task<string> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
Expand Down Expand Up @@ -137,4 +229,4 @@ public Repository(long id)
public string RepositoryName { get; set; }
}
}
}
}