diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 7420a4033f..b5ff262baa 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -28,10 +28,12 @@
+
+
diff --git a/src/ProjectReferences.Persisters.Audit.props b/src/ProjectReferences.Persisters.Audit.props
index 0a9c4d0dcb..55f0fb67b2 100644
--- a/src/ProjectReferences.Persisters.Audit.props
+++ b/src/ProjectReferences.Persisters.Audit.props
@@ -3,6 +3,7 @@
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/.editorconfig b/src/ServiceControl.Audit.Persistence.PostgreSQL/.editorconfig
new file mode 100644
index 0000000000..8d96183ebc
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/.editorconfig
@@ -0,0 +1,4 @@
+[*.cs]
+
+# Justification: ServiceControl app has no synchronization context
+dotnet_diagnostic.CA2007.severity = none
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/BodyStorage/PostgreSQLAttachmentsBodyStorage.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/BodyStorage/PostgreSQLAttachmentsBodyStorage.cs
new file mode 100644
index 0000000000..b1bb307b6e
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/BodyStorage/PostgreSQLAttachmentsBodyStorage.cs
@@ -0,0 +1,11 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage;
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using ServiceControl.Audit.Auditing.BodyStorage;
+class PostgreSQLAttachmentsBodyStorage : IBodyStorage
+{
+ public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) => throw new System.NotImplementedException();
+ public Task TryFetch(string bodyId, CancellationToken cancellationToken) => throw new System.NotImplementedException();
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/DatabaseConfiguration.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/DatabaseConfiguration.cs
new file mode 100644
index 0000000000..bfecfe09c5
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/DatabaseConfiguration.cs
@@ -0,0 +1,19 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+
+class DatabaseConfiguration(
+ string databaseName,
+ string adminDatabaseName,
+ int expirationProcessTimerInSeconds,
+ TimeSpan auditRetentionPeriod,
+ int maxBodySizeToStore,
+ string connectionString)
+{
+ public string Name { get; } = databaseName;
+ public string AdminDatabaseName { get; } = adminDatabaseName;
+ public int ExpirationProcessTimerInSeconds { get; } = expirationProcessTimerInSeconds;
+ public TimeSpan AuditRetentionPeriod { get; } = auditRetentionPeriod;
+ public int MaxBodySizeToStore { get; } = maxBodySizeToStore;
+ public string ConnectionString { get; } = connectionString;
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs
new file mode 100644
index 0000000000..4d9ce3074a
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs
@@ -0,0 +1,265 @@
+
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+using System.Collections.Generic;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.IO;
+using Npgsql;
+using NServiceBus;
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Auditing.MessagesView;
+using ServiceControl.Audit.Infrastructure;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.Audit.Persistence;
+using ServiceControl.Audit.Persistence.Infrastructure;
+using ServiceControl.SagaAudit;
+
+
+class PostgreSQLAuditDataStore(PostgreSQLConnectionFactory connectionFactory) : IAuditDataStore
+{
+ static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager();
+
+ public async Task GetMessageBody(string messageId, CancellationToken cancellationToken)
+ {
+ await using var conn = await connectionFactory.OpenConnection(cancellationToken);
+ await using var cmd = new NpgsqlCommand(@"
+ select headers, body from processed_messages
+ where message_id = @message_id
+ LIMIT 1;", conn);
+ cmd.Parameters.AddWithValue("message_id", messageId);
+ await using var reader = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken);
+ if (await reader.ReadAsync(cancellationToken))
+ {
+ var contentType = reader.GetFieldValue>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml");
+ using var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken);
+ var responseStream = manager.GetStream();
+ await stream.CopyToAsync(responseStream, cancellationToken);
+ responseStream.Position = 0;
+ return MessageBodyView.FromStream(responseStream, contentType, (int)stream.Length, string.Empty);
+ }
+ return MessageBodyView.NotFound();
+ }
+
+ public Task>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
+ {
+ var builder = new PostgreSQLMessagesQueryBuilder()
+ .WithSystemMessages(includeSystemMessages)
+ .WithTimeSentRange(timeSentRange)
+ .WithSorting(sortInfo)
+ .WithPaging(pagingInfo);
+ return ExecuteMessagesQuery(builder, cancellationToken);
+ }
+
+ public async Task>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
+ {
+ var startDate = DateTime.UtcNow.AddDays(-30);
+ var endDate = DateTime.UtcNow;
+ await using var connection = await connectionFactory.OpenConnection(cancellationToken);
+ await using var cmd = new NpgsqlCommand(@"
+ SELECT
+ DATE_TRUNC('day', processed_at) AS day,
+ COUNT(*) AS count
+ FROM processed_messages
+ WHERE receiving_endpoint_name = @endpoint_name
+ AND processed_at BETWEEN @start_date AND @end_date
+ GROUP BY day
+ ORDER BY day;", connection);
+ cmd.Parameters.AddWithValue("endpoint_name", endpointName);
+ cmd.Parameters.AddWithValue("start_date", startDate);
+ cmd.Parameters.AddWithValue("end_date", endDate);
+
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
+ var results = new List();
+ while (await reader.ReadAsync(cancellationToken))
+ {
+ results.Add(new AuditCount
+ {
+ UtcDate = reader.GetDateTime(reader.GetOrdinal("day")),
+ Count = reader.GetInt32(reader.GetOrdinal("count"))
+ });
+ }
+
+ return new QueryResult>(results, new QueryStatsInfo(string.Empty, results.Count));
+ }
+
+ public async Task>> QueryKnownEndpoints(CancellationToken cancellationToken)
+ {
+ // We need to return all the data from known_endpoints table in postgress
+ await using var connection = await connectionFactory.OpenConnection(cancellationToken);
+ await using var cmd = new NpgsqlCommand(@"
+ SELECT
+ id,
+ name,
+ host_id,
+ host,
+ last_seen
+ FROM known_endpoints;", connection);
+
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
+ var results = new List();
+ while (await reader.ReadAsync(cancellationToken))
+ {
+ var name = reader.GetString(reader.GetOrdinal("name"));
+ var hostId = reader.GetGuid(reader.GetOrdinal("host_id"));
+ var host = reader.GetString(reader.GetOrdinal("host"));
+ var lastSeen = reader.GetDateTime(reader.GetOrdinal("last_seen"));
+ results.Add(new KnownEndpointsView
+ {
+ Id = DeterministicGuid.MakeId(name, hostId.ToString()),
+ EndpointDetails = new EndpointDetails
+ {
+ Host = host,
+ HostId = hostId,
+ Name = name
+ },
+ HostDisplayName = host
+ });
+ }
+
+ return new QueryResult>(results, new QueryStatsInfo(string.Empty, results.Count));
+ }
+
+ public Task>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ {
+ var builder = new PostgreSQLMessagesQueryBuilder()
+ .WithSearch(searchParam)
+ .WithTimeSentRange(timeSentRange)
+ .WithSorting(sortInfo)
+ .WithPaging(pagingInfo);
+ return ExecuteMessagesQuery(builder, cancellationToken);
+ }
+
+ public Task>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
+ {
+ var builder = new PostgreSQLMessagesQueryBuilder()
+ .WithConversationId(conversationId)
+ .WithSorting(sortInfo)
+ .WithPaging(pagingInfo);
+ return ExecuteMessagesQuery(builder, cancellationToken);
+ }
+
+ public Task>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ {
+ var builder = new PostgreSQLMessagesQueryBuilder()
+ .WithSystemMessages(includeSystemMessages)
+ .WithEndpointName(endpointName)
+ .WithTimeSentRange(timeSentRange)
+ .WithSorting(sortInfo)
+ .WithPaging(pagingInfo);
+ return ExecuteMessagesQuery(builder, cancellationToken);
+ }
+
+ public Task>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange? timeSentRange = null, CancellationToken cancellationToken = default)
+ {
+ var builder = new PostgreSQLMessagesQueryBuilder()
+ .WithSearch(keyword)
+ .WithEndpointName(endpoint)
+ .WithTimeSentRange(timeSentRange)
+ .WithSorting(sortInfo)
+ .WithPaging(pagingInfo);
+ return ExecuteMessagesQuery(builder, cancellationToken);
+ }
+
+ public async Task> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
+ {
+ await using var conn = await connectionFactory.OpenConnection(cancellationToken);
+ await using var cmd = new NpgsqlCommand(@"
+ SELECT
+ id,
+ saga_id,
+ saga_type,
+ changes
+ FROM saga_snapshots
+ WHERE saga_id = @saga_id
+ LIMIT 1", conn);
+
+ cmd.Parameters.AddWithValue("saga_id", input);
+
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
+
+ if (await reader.ReadAsync(cancellationToken))
+ {
+ var changes = GetValue>(reader, "changes") ?? [];
+ var sagaHistory = new SagaHistory
+ {
+ Id = GetValue(reader, "id"),
+ SagaId = GetValue(reader, "saga_id"),
+ SagaType = GetValue(reader, "saga_type"),
+ Changes = changes
+ };
+
+ return new QueryResult(sagaHistory, new QueryStatsInfo(string.Empty, changes.Count));
+ }
+
+ return QueryResult.Empty();
+ }
+
+ async Task>> ExecuteMessagesQuery(
+ PostgreSQLMessagesQueryBuilder builder,
+ CancellationToken cancellationToken)
+ {
+ await using var conn = await connectionFactory.OpenConnection(cancellationToken);
+ var (query, parameters) = builder.Build();
+ await using var cmd = new NpgsqlCommand(query, conn);
+ foreach (var param in parameters)
+ {
+ cmd.Parameters.Add(param);
+ }
+ return await ReturnResults(cmd, cancellationToken);
+ }
+
+ static T? DeserializeOrDefault(Dictionary dict, string key, T? defaultValue = default)
+ {
+ if (dict.TryGetValue(key, out var value) && value is JsonElement element && element.ValueKind != JsonValueKind.Null)
+ {
+ try
+ {
+ return JsonSerializer.Deserialize(element);
+ }
+ catch { }
+ }
+ return defaultValue;
+ }
+
+ static T GetValue(NpgsqlDataReader reader, string column)
+ => reader.GetFieldValue(reader.GetOrdinal(column));
+
+ async Task>> ReturnResults(NpgsqlCommand cmd, CancellationToken cancellationToken = default)
+ {
+ var results = new List();
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
+ while (await reader.ReadAsync(cancellationToken))
+ {
+ var headers = GetValue>(reader, "headers");
+ var messageMetadata = GetValue>(reader, "message_metadata");
+
+ results.Add(new MessagesView
+ {
+ Id = GetValue(reader, "unique_message_id"),
+ MessageId = GetValue(reader, "message_id"),
+ MessageType = GetValue(reader, "message_type"),
+ SendingEndpoint = DeserializeOrDefault(messageMetadata, "SendingEndpoint"),
+ ReceivingEndpoint = DeserializeOrDefault(messageMetadata, "ReceivingEndpoint"),
+ TimeSent = GetValue(reader, "time_sent"),
+ ProcessedAt = GetValue(reader, "processed_at"),
+ CriticalTime = GetValue(reader, "critical_time"),
+ ProcessingTime = GetValue(reader, "processing_time"),
+ DeliveryTime = GetValue(reader, "delivery_time"),
+ IsSystemMessage = GetValue(reader, "is_system_message"),
+ ConversationId = GetValue(reader, "conversation_id"),
+ Headers = [.. headers],
+ Status = (MessageStatus)GetValue(reader, "status"),
+ MessageIntent = (MessageIntent)DeserializeOrDefault(messageMetadata, "MessageIntent", 1),
+ BodyUrl = string.Format(BodyUrlFormatString, GetValue(reader, "message_id")),
+ BodySize = DeserializeOrDefault(messageMetadata, "ContentLength", 0),
+ InvokedSagas = DeserializeOrDefault>(messageMetadata, "InvokedSagas", []),
+ OriginatesFromSaga = DeserializeOrDefault(messageMetadata, "OriginatesFromSaga")
+ });
+ }
+ return new QueryResult>(results, new QueryStatsInfo(string.Empty, results.Count));
+ }
+ public const string BodyUrlFormatString = "/messages/{0}/body";
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs
new file mode 100644
index 0000000000..3b254897b1
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs
@@ -0,0 +1,47 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using Npgsql;
+using System.Threading.Tasks;
+using System.Threading;
+
+class PostgreSQLConnectionFactory
+{
+ readonly NpgsqlDataSource dataSource;
+ readonly NpgsqlDataSource dataSourceAdmin;
+
+ public PostgreSQLConnectionFactory(DatabaseConfiguration databaseConfiguration)
+ {
+ var dataSourceBuilder = new NpgsqlDataSourceBuilder(databaseConfiguration.ConnectionString)
+ {
+ Name = "ServiceControl.Audit"
+ };
+ //dataSourceBuilder.UseLoggerFactory(loggerFactory);
+ dataSourceBuilder.EnableDynamicJson();
+ dataSource = dataSourceBuilder.Build();
+
+ var builder = new NpgsqlConnectionStringBuilder(databaseConfiguration.ConnectionString)
+ {
+ Database = databaseConfiguration.AdminDatabaseName
+ };
+ var dataSourceBuilderAdmin = new NpgsqlDataSourceBuilder(builder.ConnectionString)
+ {
+ Name = "ServiceControl.Audit-admin",
+ };
+ //dataSourceBuilderAdmin.UseLoggerFactory(loggerFactory);
+ dataSourceBuilderAdmin.EnableDynamicJson();
+ dataSourceAdmin = dataSourceBuilderAdmin.Build();
+ }
+
+ public async Task OpenConnection(CancellationToken cancellationToken)
+ {
+ var conn = await dataSource.OpenConnectionAsync(cancellationToken);
+ return conn;
+ }
+
+ public async Task OpenAdminConnection(CancellationToken cancellationToken)
+ {
+ var conn = dataSourceAdmin.CreateConnection();
+ await conn.OpenAsync(cancellationToken);
+ return conn;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs
new file mode 100644
index 0000000000..bacdb4f890
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs
@@ -0,0 +1,13 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Persistence;
+class PostgreSQLFailedAuditStorage : IFailedAuditStorage
+{
+ public Task GetFailedAuditsCount() => Task.FromResult(0);
+ public Task ProcessFailedMessages(Func, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException();
+ public Task SaveFailedAuditImport(FailedAuditImport message) => throw new NotImplementedException();
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLMessagesQueryBuilder.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLMessagesQueryBuilder.cs
new file mode 100644
index 0000000000..4a7106d0d0
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLMessagesQueryBuilder.cs
@@ -0,0 +1,142 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System.Collections.Generic;
+using System.Text;
+using Npgsql;
+using ServiceControl.Audit.Infrastructure;
+
+public class PostgreSQLMessagesQueryBuilder
+{
+ readonly StringBuilder sql = new();
+ readonly List parameters = [];
+
+ public PostgreSQLMessagesQueryBuilder()
+ {
+ sql.Append(@"select unique_message_id,
+ message_metadata,
+ headers,
+ processed_at,
+ message_id,
+ message_type,
+ is_system_message,
+ status,
+ time_sent,
+ receiving_endpoint_name,
+ critical_time,
+ processing_time,
+ delivery_time,
+ conversation_id from processed_messages
+ where 1 = 1");
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithSystemMessages(bool? includeSystemMessages)
+ {
+ if (includeSystemMessages.HasValue)
+ {
+ sql.Append(" and is_system_message = @is_system_message");
+ parameters.Add(new NpgsqlParameter("is_system_message", includeSystemMessages));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithSearch(string? q)
+ {
+ if (!string.IsNullOrWhiteSpace(q))
+ {
+ sql.Append(" and query @@ websearch_to_tsquery('english', @search)");
+ parameters.Add(new NpgsqlParameter("search", q));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithConversationId(string? conversationId)
+ {
+ if (!string.IsNullOrWhiteSpace(conversationId))
+ {
+ sql.Append(" and conversation_id = @conversation_id");
+ parameters.Add(new NpgsqlParameter("conversation_id", conversationId));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithMessageId(string? messageId)
+ {
+ if (!string.IsNullOrWhiteSpace(messageId))
+ {
+ sql.Append(" and message_id = @message_id");
+ parameters.Add(new NpgsqlParameter("message_id", messageId));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithEndpointName(string? endpointName)
+ {
+ if (!string.IsNullOrWhiteSpace(endpointName))
+ {
+ sql.Append(" and receiving_endpoint_name = @endpoint_name");
+ parameters.Add(new NpgsqlParameter("endpoint_name", endpointName));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithTimeSentRange(DateTimeRange? timeSentRange)
+ {
+ if (timeSentRange?.From != null)
+ {
+ sql.Append(" and time_sent >= @time_sent_start");
+ parameters.Add(new NpgsqlParameter("time_sent_start", timeSentRange.From));
+ }
+ if (timeSentRange?.To != null)
+ {
+ sql.Append(" and time_sent <= @time_sent_end");
+ parameters.Add(new NpgsqlParameter("time_sent_end", timeSentRange.To));
+ }
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithSorting(SortInfo sortInfo)
+ {
+ sql.Append(" ORDER BY");
+ switch (sortInfo.Sort)
+ {
+ case "id":
+ case "message_id":
+ sql.Append(" message_id");
+ break;
+ case "message_type":
+ sql.Append(" message_type");
+ break;
+ case "critical_time":
+ sql.Append(" critical_time");
+ break;
+ case "delivery_time":
+ sql.Append(" delivery_time");
+ break;
+ case "processing_time":
+ sql.Append(" processing_time");
+ break;
+ case "processed_at":
+ sql.Append(" processed_at");
+ break;
+ case "status":
+ sql.Append(" status");
+ break;
+ default:
+ sql.Append(" time_sent");
+ break;
+ }
+ sql.Append(sortInfo.Direction == "asc" ? " ASC" : " DESC");
+ return this;
+ }
+
+ public PostgreSQLMessagesQueryBuilder WithPaging(PagingInfo pagingInfo)
+ {
+ sql.Append($" LIMIT {pagingInfo.PageSize} OFFSET {pagingInfo.Offset};");
+ return this;
+ }
+
+ public (string Sql, List Parameters) Build()
+ {
+ return (sql.ToString(), parameters);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistence.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistence.cs
new file mode 100644
index 0000000000..f9ef859af5
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistence.cs
@@ -0,0 +1,30 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using Microsoft.Extensions.DependencyInjection;
+using ServiceControl.Audit.Auditing.BodyStorage;
+using ServiceControl.Audit.Persistence;
+using ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage;
+using ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+
+class PostgreSQLPersistence(DatabaseConfiguration databaseConfiguration) : IPersistence
+{
+ public void AddInstaller(IServiceCollection services)
+ {
+ services.AddSingleton(databaseConfiguration);
+ services.AddSingleton();
+ services.AddHostedService();
+ }
+
+ public void AddPersistence(IServiceCollection services)
+ {
+ services.AddSingleton(databaseConfiguration);
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddHostedService();
+ services.AddHostedService();
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceConfiguration.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceConfiguration.cs
new file mode 100644
index 0000000000..5d28ff08f9
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceConfiguration.cs
@@ -0,0 +1,42 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+using System.Collections.Generic;
+using Npgsql;
+using ServiceControl.Audit.Persistence;
+
+public class PostgreSQLPersistenceConfiguration : IPersistenceConfiguration
+{
+ public string Name => "PostgreSQL";
+
+ public IEnumerable ConfigurationKeys => ["PostgreSql/ConnectionString", "PostgreSql/DatabaseName"];
+
+ const int ExpirationProcessTimerInSecondsDefault = 600;
+
+ public IPersistence Create(PersistenceSettings settings)
+ {
+ if (!settings.PersisterSpecificSettings.TryGetValue("PostgreSql/ConnectionString", out var connectionString))
+ {
+ throw new Exception("PostgreSql/ConnectionString is not configured.");
+ }
+
+ var builder = new NpgsqlConnectionStringBuilder(connectionString);
+
+ if (settings.PersisterSpecificSettings.TryGetValue("PostgreSql/DatabaseName", out var databaseName))
+ {
+ builder.Database = databaseName;
+ }
+
+ settings.PersisterSpecificSettings.TryGetValue("PostgreSql/AdminDatabaseName", out var adminDatabaseName);
+
+ builder.Database ??= "servicecontrol-audit";
+
+ return new PostgreSQLPersistence(new DatabaseConfiguration(
+ builder.Database,
+ adminDatabaseName ?? "postgres",
+ ExpirationProcessTimerInSecondsDefault,
+ settings.AuditRetentionPeriod,
+ settings.MaxBodySizeToStore,
+ connectionString));
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs
new file mode 100644
index 0000000000..2b5bdd360b
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs
@@ -0,0 +1,238 @@
+
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+using Npgsql;
+
+class PostgreSQLPersistenceInstaller(DatabaseConfiguration databaseConfiguration, PostgreSQLConnectionFactory connectionFactory) : IHostedService
+{
+ public async Task StartAsync(CancellationToken cancellationToken)
+ {
+ await using var adminConnection = await connectionFactory.OpenAdminConnection(cancellationToken);
+
+ await using (var cmd = new NpgsqlCommand($"SELECT 1 FROM pg_database WHERE datname = @dbname", adminConnection))
+ {
+ cmd.Parameters.AddWithValue("@dbname", databaseConfiguration.Name);
+ var exists = await cmd.ExecuteScalarAsync(cancellationToken);
+ if (exists == null)
+ {
+ using var createCmd = new NpgsqlCommand($"CREATE DATABASE \"{databaseConfiguration.Name}\"", adminConnection);
+ await createCmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+ }
+
+ await using var connection = await connectionFactory.OpenConnection(cancellationToken);
+ // Create processed_messages table
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE TABLE IF NOT EXISTS processed_messages (
+ id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
+ unique_message_id TEXT,
+ message_metadata JSONB,
+ headers JSONB,
+ processed_at TIMESTAMPTZ,
+ body BYTEA,
+ message_id TEXT,
+ message_type TEXT,
+ is_system_message BOOLEAN,
+ status NUMERIC,
+ time_sent TIMESTAMPTZ,
+ receiving_endpoint_name TEXT,
+ critical_time INTERVAL,
+ processing_time INTERVAL,
+ delivery_time INTERVAL,
+ conversation_id TEXT,
+ query tsvector,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
+ )
+ WITH (
+ autovacuum_vacuum_scale_factor = 0.05,
+ autovacuum_analyze_scale_factor = 0.02
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create trigger for full text search
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE OR REPLACE FUNCTION processed_messages_tsvector_update() RETURNS trigger AS $$
+ BEGIN
+ NEW.query :=
+ setweight(to_tsvector('english', coalesce(NEW.headers::text, '')), 'A') ||
+ setweight(to_tsvector('english', coalesce(convert_from(NEW.body, 'UTF8'), '')), 'B');
+ RETURN NEW;
+ END
+ $$ LANGUAGE plpgsql;
+
+ DROP TRIGGER IF EXISTS processed_messages_tsvector_trigger ON processed_messages;
+ CREATE TRIGGER processed_messages_tsvector_trigger
+ BEFORE INSERT OR UPDATE ON processed_messages
+ FOR EACH ROW EXECUTE FUNCTION processed_messages_tsvector_update();", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create index on processed_messages for specified columns
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_receiving_endpoint_name ON processed_messages (
+ receiving_endpoint_name
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_is_system_message ON processed_messages (
+ is_system_message
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_time_sent ON processed_messages (
+ time_sent
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_critical_time ON processed_messages (
+ critical_time
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_processing_time ON processed_messages (
+ processing_time
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_delivery_time ON processed_messages (
+ delivery_time
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_message_id ON processed_messages (
+ message_id
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_created_at ON processed_messages (
+ created_at
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_conversation ON processed_messages (
+ conversation_id,
+ time_sent
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_query ON processed_messages (
+ query
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_processed_messages_audit_counts ON processed_messages (
+ receiving_endpoint_name, processed_at
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create saga_snapshots table
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE TABLE IF NOT EXISTS saga_snapshots (
+ id UUID PRIMARY KEY,
+ saga_id UUID,
+ saga_type TEXT,
+ changes JSONB,
+ updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create index on saga_snapshots for faster saga_id lookups
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE INDEX IF NOT EXISTS idx_saga_snapshots_saga_id ON saga_snapshots (
+ saga_id
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create known_endpoints table
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE TABLE IF NOT EXISTS known_endpoints_insert (
+ id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
+ endpoint_id TEXT,
+ name TEXT,
+ host_id UUID,
+ host TEXT,
+ last_seen TIMESTAMPTZ
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create known_endpoints table
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE TABLE IF NOT EXISTS known_endpoints (
+ id TEXT PRIMARY KEY,
+ name TEXT,
+ host_id UUID,
+ host TEXT,
+ last_seen TIMESTAMPTZ
+ );", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ // Create trigger to auto-update updated_at for saga_snapshots
+ await using (var cmd = new NpgsqlCommand(@"
+ CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS trigger AS $$
+ BEGIN
+ NEW.updated_at = now();
+ RETURN NEW;
+ END
+ $$ LANGUAGE plpgsql;
+
+ DROP TRIGGER IF EXISTS saga_snapshots_updated_at_trigger ON saga_snapshots;
+ CREATE TRIGGER saga_snapshots_updated_at_trigger
+ BEFORE UPDATE ON saga_snapshots
+ FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();", connection))
+ {
+ await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken)
+ {
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/RetentionCleanupService.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/RetentionCleanupService.cs
new file mode 100644
index 0000000000..5022c8410d
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/RetentionCleanupService.cs
@@ -0,0 +1,91 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Npgsql;
+
+class RetentionCleanupService(
+ ILogger logger,
+ DatabaseConfiguration config,
+ PostgreSQLConnectionFactory connectionFactory) : BackgroundService
+{
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation($"{nameof(RetentionCleanupService)} started.");
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(config.ExpirationProcessTimerInSeconds), stoppingToken);
+
+ await CleanupOldMessages(stoppingToken);
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation($"{nameof(RetentionCleanupService)} stopped.");
+ break;
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error during cleanup task.");
+ }
+ }
+ }
+
+ async Task CleanupOldMessages(CancellationToken cancellationToken)
+ {
+ await using var conn = await connectionFactory.OpenConnection(cancellationToken);
+
+ var cutoffDate = DateTime.UtcNow - config.AuditRetentionPeriod;
+
+ // Cleanup processed messages
+ await CleanupTable("processed_messages", "created_at", cutoffDate, conn, cancellationToken);
+
+ // Cleanup saga snapshots
+ await CleanupTable("saga_snapshots", "updated_at", cutoffDate, conn, cancellationToken);
+
+ // Cleanup known endpoints
+ await CleanupTable("known_endpoints", "last_seen", cutoffDate, conn, cancellationToken);
+ }
+
+ async Task CleanupTable(string tableName, string dateColumn, DateTime cutoffDate, NpgsqlConnection conn, CancellationToken cancellationToken)
+ {
+ const int batchSize = 1000;
+ var totalDeleted = 0;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ // Delete in batches - skip if another process is already cleaning this table
+ var sql = $@"
+ DELETE FROM {tableName}
+ WHERE pg_try_advisory_xact_lock(hashtext('{tableName}'))
+ AND ctid IN (
+ SELECT ctid FROM {tableName}
+ WHERE {dateColumn} < @cutoff
+ LIMIT {batchSize}
+ );";
+
+ await using var cmd = new NpgsqlCommand(sql, conn);
+ cmd.Parameters.AddWithValue("cutoff", cutoffDate);
+
+ var rows = await cmd.ExecuteNonQueryAsync(cancellationToken);
+ totalDeleted += rows;
+
+ if (rows < batchSize)
+ {
+ break; // no more rows to delete in this run
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(20), cancellationToken);
+ }
+
+ if (totalDeleted > 0)
+ {
+ logger.LogInformation("Deleted {Count} old records from {Table} older than {Cutoff}", totalDeleted, tableName, cutoffDate);
+ }
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/ServiceControl.Audit.Persistence.PostgreSQL.csproj b/src/ServiceControl.Audit.Persistence.PostgreSQL/ServiceControl.Audit.Persistence.PostgreSQL.csproj
new file mode 100644
index 0000000000..96ad791bdf
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/ServiceControl.Audit.Persistence.PostgreSQL.csproj
@@ -0,0 +1,30 @@
+
+
+
+ net8.0
+ true
+ true
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs
new file mode 100644
index 0000000000..587395e1ef
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs
@@ -0,0 +1,148 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Npgsql;
+using ServiceControl.Audit.Auditing;
+using ServiceControl.Audit.Monitoring;
+using ServiceControl.Audit.Persistence.Monitoring;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+using ServiceControl.SagaAudit;
+
+class PostgreSQLAuditIngestionUnitOfWork : IAuditIngestionUnitOfWork
+{
+ readonly NpgsqlBatch batch;
+ readonly NpgsqlConnection connection;
+ readonly DatabaseConfiguration databaseConfiguration;
+
+ public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection, DatabaseConfiguration databaseConfiguration)
+ {
+ batch = new NpgsqlBatch(connection);
+ this.connection = connection;
+ this.databaseConfiguration = databaseConfiguration;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ try
+ {
+ await batch.PrepareAsync();
+ await batch.ExecuteNonQueryAsync();
+ }
+ finally
+ {
+ await batch.DisposeAsync();
+ await connection.DisposeAsync();
+ }
+ }
+
+ public Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory body, CancellationToken cancellationToken)
+ {
+ T? GetMetadata(string key)
+ {
+ if (processedMessage.MessageMetadata.TryGetValue(key, out var value))
+ {
+ return (T?)value;
+ }
+ else
+ {
+ return default;
+ }
+ }
+
+ object GetMetadataOrDbNull(string key) => GetMetadata(key) ?? (object)DBNull.Value;
+
+ // Insert ProcessedMessage into processed_messages table
+ var cmd = batch.CreateBatchCommand();
+ cmd.CommandText = @"
+ INSERT INTO processed_messages (
+ unique_message_id, message_metadata, headers, processed_at, body,
+ message_id, message_type, is_system_message, status, time_sent, receiving_endpoint_name,
+ critical_time, processing_time, delivery_time, conversation_id
+ ) VALUES (
+ @unique_message_id, @message_metadata, @headers, @processed_at, @body,
+ @message_id, @message_type, @is_system_message, @status, @time_sent, @receiving_endpoint_name,
+ @critical_time, @processing_time, @delivery_time, @conversation_id
+ );";
+
+ processedMessage.MessageMetadata["ContentLength"] = body.Length;
+ if (!body.IsEmpty && body.Length <= databaseConfiguration.MaxBodySizeToStore)
+ {
+ cmd.Parameters.AddWithValue("body", body);
+ }
+ else
+ {
+ cmd.Parameters.AddWithValue("body", DBNull.Value);
+ }
+ cmd.Parameters.AddWithValue("unique_message_id", processedMessage.UniqueMessageId);
+ cmd.Parameters.AddWithValue("message_metadata", NpgsqlTypes.NpgsqlDbType.Jsonb, processedMessage.MessageMetadata);
+ cmd.Parameters.AddWithValue("headers", NpgsqlTypes.NpgsqlDbType.Jsonb, processedMessage.Headers);
+ cmd.Parameters.AddWithValue("processed_at", processedMessage.ProcessedAt);
+ cmd.Parameters.AddWithValue("message_id", GetMetadataOrDbNull("MessageId"));
+ cmd.Parameters.AddWithValue("message_type", GetMetadataOrDbNull("MessageType"));
+ cmd.Parameters.AddWithValue("is_system_message", GetMetadataOrDbNull("IsSystemMessage"));
+ cmd.Parameters.AddWithValue("time_sent", GetMetadataOrDbNull("TimeSent"));
+ cmd.Parameters.AddWithValue("receiving_endpoint_name", GetMetadata("ReceivingEndpoint")?.Name ?? (object)DBNull.Value);
+ cmd.Parameters.AddWithValue("critical_time", GetMetadataOrDbNull("CriticalTime"));
+ cmd.Parameters.AddWithValue("processing_time", GetMetadataOrDbNull("ProcessingTime"));
+ cmd.Parameters.AddWithValue("delivery_time", GetMetadataOrDbNull("DeliveryTime"));
+ cmd.Parameters.AddWithValue("conversation_id", GetMetadataOrDbNull("ConversationId"));
+ cmd.Parameters.AddWithValue("status", (int)(GetMetadata("IsRetried") ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful));
+
+ batch.BatchCommands.Add(cmd);
+ return Task.CompletedTask;
+ }
+
+ public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
+ {
+ var newChange = new
+ {
+ sagaSnapshot.StartTime,
+ sagaSnapshot.FinishTime,
+ sagaSnapshot.Status,
+ sagaSnapshot.StateAfterChange,
+ sagaSnapshot.InitiatingMessage,
+ sagaSnapshot.OutgoingMessages,
+ sagaSnapshot.Endpoint
+ };
+
+ // Insert or update saga_snapshots table - add new change to the changes array
+ var cmd = batch.CreateBatchCommand();
+ cmd.CommandText = @"
+ INSERT INTO saga_snapshots (id, saga_id, saga_type, changes)
+ VALUES (@saga_id, @saga_id, @saga_type, @new_change)
+ ON CONFLICT (id) DO UPDATE SET
+ changes = COALESCE(saga_snapshots.changes, '[]'::jsonb) || @new_change::jsonb;";
+
+ cmd.Parameters.AddWithValue("saga_id", sagaSnapshot.SagaId);
+ cmd.Parameters.AddWithValue("saga_type", sagaSnapshot.SagaType);
+ cmd.Parameters.AddWithValue("new_change", NpgsqlTypes.NpgsqlDbType.Jsonb, new[] { newChange });
+
+ batch.BatchCommands.Add(cmd);
+
+ return Task.CompletedTask;
+ }
+
+ public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
+ {
+ // Insert KnownEndpoint into known_endpoints table
+ var cmd = batch.CreateBatchCommand();
+ cmd.CommandText = @"
+
+ INSERT INTO known_endpoints_insert (
+ endpoint_id, name, host_id, host, last_seen
+ ) VALUES (
+ @endpoint_id, @name, @host_id, @host, @last_seen
+ );";
+
+ cmd.Parameters.AddWithValue("endpoint_id", knownEndpoint.Id);
+ cmd.Parameters.AddWithValue("name", knownEndpoint.Name);
+ cmd.Parameters.AddWithValue("host_id", knownEndpoint.HostId);
+ cmd.Parameters.AddWithValue("host", knownEndpoint.Host);
+ cmd.Parameters.AddWithValue("last_seen", knownEndpoint.LastSeen);
+ batch.BatchCommands.Add(cmd);
+
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWorkFactory.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWorkFactory.cs
new file mode 100644
index 0000000000..0585cd98ac
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWorkFactory.cs
@@ -0,0 +1,17 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
+
+using System.Threading;
+using System.Threading.Tasks;
+using ServiceControl.Audit.Persistence.UnitOfWork;
+using ServiceControl.Audit.Persistence.PostgreSQL;
+
+class PostgreSQLAuditIngestionUnitOfWorkFactory(PostgreSQLConnectionFactory connectionFactory, DatabaseConfiguration databaseConfiguration) : IAuditIngestionUnitOfWorkFactory
+{
+ public async ValueTask StartNew(int batchSize, CancellationToken cancellationToken)
+ {
+ var connection = await connectionFactory.OpenConnection(cancellationToken);
+ return new PostgreSQLAuditIngestionUnitOfWork(connection, databaseConfiguration);
+ }
+
+ public bool CanIngestMore() => true;
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/UpdateKnownEndpointTable.cs b/src/ServiceControl.Audit.Persistence.PostgreSQL/UpdateKnownEndpointTable.cs
new file mode 100644
index 0000000000..1945e8cff1
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/UpdateKnownEndpointTable.cs
@@ -0,0 +1,61 @@
+namespace ServiceControl.Audit.Persistence.PostgreSQL;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Npgsql;
+
+class UpdateKnownEndpointTable(
+ ILogger logger,
+ PostgreSQLConnectionFactory connectionFactory) : BackgroundService
+{
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ logger.LogInformation($"{nameof(UpdateKnownEndpointTable)} started.");
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
+
+ await UpdateTable(stoppingToken);
+ }
+ catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
+ {
+ logger.LogInformation($"{nameof(UpdateKnownEndpointTable)} stopped.");
+ break;
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error during update known_endpoints table.");
+ }
+ }
+ }
+
+ async Task UpdateTable(CancellationToken stoppingToken)
+ {
+ await using var conn = await connectionFactory.OpenConnection(stoppingToken);
+
+ var sql = @"
+ DO $$
+ BEGIN
+ IF pg_try_advisory_xact_lock(hashtext('known_endpoints_sync')) THEN
+ INSERT INTO known_endpoints (id, name, host_id, host, last_seen)
+ SELECT DISTINCT ON (endpoint_id) endpoint_id, name, host_id, host, last_seen
+ FROM known_endpoints_insert
+ ORDER BY endpoint_id, last_seen DESC
+ ON CONFLICT (id) DO UPDATE SET
+ last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);
+
+ DELETE FROM known_endpoints_insert;
+ END IF;
+ END $$;
+ ";
+
+ await using var cmd = new NpgsqlCommand(sql, conn);
+ await cmd.ExecuteNonQueryAsync(stoppingToken);
+ }
+}
diff --git a/src/ServiceControl.Audit.Persistence.PostgreSQL/persistence.manifest b/src/ServiceControl.Audit.Persistence.PostgreSQL/persistence.manifest
new file mode 100644
index 0000000000..bfb6a91db5
--- /dev/null
+++ b/src/ServiceControl.Audit.Persistence.PostgreSQL/persistence.manifest
@@ -0,0 +1,8 @@
+{
+ "IsSupported": false,
+ "Name": "PostgreSQL",
+ "DisplayName": "PostgreSQL",
+ "Description": "PostgreSQL ServiceControl Audit persister",
+ "AssemblyName": "ServiceControl.Audit.Persistence.PostgreSQL",
+ "TypeName": "ServiceControl.Audit.Persistence.PostgreSQL.PostgreSQLPersistenceConfiguration, ServiceControl.Audit.Persistence.PostgreSQL"
+}
\ No newline at end of file
diff --git a/src/ServiceControl.Audit.Persistence/DevelopmentPersistenceLocations.cs b/src/ServiceControl.Audit.Persistence/DevelopmentPersistenceLocations.cs
index 08925bde80..4853ae65f3 100644
--- a/src/ServiceControl.Audit.Persistence/DevelopmentPersistenceLocations.cs
+++ b/src/ServiceControl.Audit.Persistence/DevelopmentPersistenceLocations.cs
@@ -19,6 +19,7 @@ static DevelopmentPersistenceLocations()
{
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Audit.Persistence.InMemory"));
ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Audit.Persistence.RavenDB"));
+ ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Audit.Persistence.PostgreSQL"));
}
}
diff --git a/src/ServiceControl.sln b/src/ServiceControl.sln
index fa8d9a30e6..4a3169f3be 100644
--- a/src/ServiceControl.sln
+++ b/src/ServiceControl.sln
@@ -1,3 +1,4 @@
+
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31815.197
@@ -187,6 +188,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Hosting", "S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SetupProcessFake", "SetupProcessFake\SetupProcessFake.csproj", "{5837F789-69B9-44BE-B114-3A2880F06CAB}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Audit.Persistence.PostgreSQL", "ServiceControl.Audit.Persistence.PostgreSQL\ServiceControl.Audit.Persistence.PostgreSQL.csproj", "{921392AB-D2A9-40DB-A5DE-00D642CE541F}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -1025,6 +1028,18 @@ Global
{5837F789-69B9-44BE-B114-3A2880F06CAB}.Release|x64.Build.0 = Release|Any CPU
{5837F789-69B9-44BE-B114-3A2880F06CAB}.Release|x86.ActiveCfg = Release|Any CPU
{5837F789-69B9-44BE-B114-3A2880F06CAB}.Release|x86.Build.0 = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|x64.Build.0 = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Debug|x86.Build.0 = Debug|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|x64.ActiveCfg = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|x64.Build.0 = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|x86.ActiveCfg = Release|Any CPU
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -1110,6 +1125,7 @@ Global
{18DBEEF5-42EE-4C1D-A05B-87B21C067D53} = {E0E45F22-35E3-4AD8-B09E-EFEA5A2F18EE}
{481032A1-1106-4C6C-B75E-512F2FB08882} = {9AF9D3C7-E859-451B-BA4D-B954D289213A}
{5837F789-69B9-44BE-B114-3A2880F06CAB} = {927A078A-E271-4878-A153-86D71AE510E2}
+ {921392AB-D2A9-40DB-A5DE-00D642CE541F} = {BD162BC6-705F-45B4-A6B5-C138DC966C1D}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3B9E5B72-F580-465A-A22C-2D2148AF4EB4}
diff --git a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/PersistenceManifestTests.ApproveAuditInstanceManifests.approved.txt b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/PersistenceManifestTests.ApproveAuditInstanceManifests.approved.txt
index 59a50c03c0..3aa68f701c 100644
--- a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/PersistenceManifestTests.ApproveAuditInstanceManifests.approved.txt
+++ b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/PersistenceManifestTests.ApproveAuditInstanceManifests.approved.txt
@@ -1,4 +1,5 @@
[
+ "PostgreSQL: PostgreSQL",
"RavenDB: RavenDB",
"RavenDB35: RavenDB 3.5 (Legacy)"
]
\ No newline at end of file
diff --git a/src/ServiceControlInstaller.Packaging.UnitTests/AuditDeploymentPackageTests.cs b/src/ServiceControlInstaller.Packaging.UnitTests/AuditDeploymentPackageTests.cs
index d6f319cacc..4d6ba4a1b5 100644
--- a/src/ServiceControlInstaller.Packaging.UnitTests/AuditDeploymentPackageTests.cs
+++ b/src/ServiceControlInstaller.Packaging.UnitTests/AuditDeploymentPackageTests.cs
@@ -16,6 +16,7 @@ public AuditDeploymentPackageTests()
public void Should_package_storages_individually()
{
var expectedPersisters = new[] {
+ "PostgreSQL",
"RavenDB35", // Still must exist, as Raven35 persistence.manifest file must be available for SCMU to understand old versions
"RavenDB"
};