Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions EventSourcing.Core/Aggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public TEvent Apply<TEvent>(TEvent e) where TEvent : Event<TAggregate>
/// </example>
/// <param name="s"><see cref="Snapshot"/> to apply</param>
protected virtual void Apply(Snapshot<TAggregate> s) { }

/// <summary>
/// Project current state of this Aggregate to <typeparamref name="TProjection"/>
/// </summary>
Expand All @@ -183,23 +183,16 @@ protected virtual void Apply(Snapshot<TAggregate> s) { }
/// This Aggregate state projected to <typeparamref name="TProjection"/>
/// </returns>
/// <exception cref="ArgumentException">Thrown when <see cref="ProjectionFactory{TAggregate, TProjection}"/> does not exist.</exception>
public TProjection Project<TProjection>() where TProjection : Projection
{
if (!ProjectionCache.FactoryByAggregateAndProjection.TryGetValue((GetType(), typeof(TProjection)), out var factory))
throw new ArgumentException(
$"Cannot get Projection of type '{typeof(TProjection).Name}'." +
$"No ProjectionFactory of type '{typeof(ProjectionFactory<TAggregate, TProjection>)}' found.", nameof(TProjection));

return (factory.CreateProjection(this) as TProjection)!;
}

public TProjection? Project<TProjection>() where TProjection : Projection =>
EventSourcingCache.GetProjectionFactory<TAggregate, TProjection>()?.CreateProjection(this) as TProjection;

private void ValidateAndApply(Event e)
{
if (e is not Event<TAggregate> @event)
throw new RecordValidationException($"{e} does not derive from {typeof(Event<TAggregate>)}");

RecordValidation.ValidateEventForAggregate(this, e);

Apply(@event);
Version++;
}
Expand Down
1 change: 1 addition & 0 deletions EventSourcing.Core/EventSourcing.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.Common" Version="4.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>
Expand Down
69 changes: 69 additions & 0 deletions EventSourcing.Core/EventSourcingCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace Finaps.EventSourcing.Core;

public static class EventSourcingCache
{
public static List<Type> RecordTypes { get; } = new();

private static readonly Dictionary<Type, List<ISnapshotFactory>> SnapshotFactories = new();
private static readonly Dictionary<Type, Dictionary<Type, IProjectionFactory>> ProjectionFactories = new();
private static readonly Dictionary<string, string> ProjectionFactoryHashes = new();

static EventSourcingCache()
{
var aggregateHashes = new Dictionary<Type, string>();

foreach (var type in AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x
.GetTypes().Where(type => type.IsPublic && type.IsClass && !type.IsAbstract && !type.IsGenericType)))
{
if (typeof(Record).IsAssignableFrom(type))
{
RecordTypes.Add(type);
}
else if (typeof(Aggregate).IsAssignableFrom(type))
{
var aggregate = (Aggregate)Activator.CreateInstance(type)!;
aggregateHashes.Add(aggregate.GetType(), aggregate.ComputeHash());
}
else if (typeof(ISnapshotFactory).IsAssignableFrom(type))
{
var factory = (ISnapshotFactory)Activator.CreateInstance(type)!;
SnapshotFactories.TryAdd(factory.AggregateType, new List<ISnapshotFactory>());
SnapshotFactories[factory.AggregateType].Add(factory);
}
else if (typeof(IProjectionFactory).IsAssignableFrom(type))
{
var factory = (IProjectionFactory)Activator.CreateInstance(type)!;
ProjectionFactories.TryAdd(factory.AggregateType, new Dictionary<Type, IProjectionFactory>());
ProjectionFactories[factory.AggregateType][factory.ProjectionType] = factory;
}
}

foreach (var factory in ProjectionFactories.Values.SelectMany(x => x.Values))
ProjectionFactoryHashes[factory.AggregateType.Name] = IHashable.CombineHashes(
factory.ComputeHash(), aggregateHashes[factory.AggregateType]);
}

public static IEnumerable<ISnapshotFactory> GetSnapshotFactories(Type aggregateType) =>
SnapshotFactories.TryGetValue(aggregateType, out var factories)
? factories
: Array.Empty<ISnapshotFactory>();

public static IProjectionFactory? GetProjectionFactory<TAggregate, TProjection>()
where TAggregate : Aggregate where TProjection : Projection =>
ProjectionFactories.TryGetValue(typeof(TAggregate), out var factories)
? factories.TryGetValue(typeof(TProjection), out var factory) ? factory : null
: null;

public static IEnumerable<IProjectionFactory> GetProjectionFactories(Type aggregateType) =>
ProjectionFactories.TryGetValue(aggregateType, out var factories)
? factories.Values
: Array.Empty<IProjectionFactory>();

public static IEnumerable<IProjectionFactory> GetProjectionFactories() =>
ProjectionFactories.Values.SelectMany(x => x.Values);

public static string? GetProjectionFactoryHash(string projectionFactoryTypeName) =>
ProjectionFactoryHashes.TryGetValue(projectionFactoryTypeName, out var hash)
? hash
: null;
}
3 changes: 1 addition & 2 deletions EventSourcing.Core/Records/Projection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public abstract record Projection : Record
/// to see whether this <see cref="Projection"/> is up to date.
/// </summary>
/// <remarks>To update projections, refer to the <see cref="ProjectionUpdateService"/></remarks>
public bool IsUpToDate =>
ProjectionCache.Hashes.TryGetValue(FactoryType ?? "", out var hash) && Hash == hash;
public bool IsUpToDate => Hash != null && EventSourcingCache.GetProjectionFactoryHash(FactoryType ?? "") == Hash;

/// <inheritdoc />
public override string id => $"{Kind}|{Type}|{AggregateId}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ public IAggregateTransaction Add(Aggregate aggregate)

_recordTransaction.AddEvents(aggregate.UncommittedEvents);

foreach (var snapshot in SnapshotService.CreateSnapshots(aggregate))
foreach (var snapshot in EventSourcingCache
.GetSnapshotFactories(aggregate.GetType())
.Where(factory => factory.IsSnapshotIntervalExceeded(aggregate))
.Select(factory => factory.CreateSnapshot(aggregate)))
_recordTransaction.AddSnapshot(snapshot);

foreach (var projection in ProjectionService.CreateProjections(aggregate))
foreach (var projection in EventSourcingCache
.GetProjectionFactories(aggregate.GetType())
.Select(x => x.CreateProjection(aggregate)))
_recordTransaction.UpsertProjection(projection);

return this;
Expand Down
33 changes: 0 additions & 33 deletions EventSourcing.Core/Services/ProjectionService/ProjectionCache.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Projection CreateProjection(Aggregate aggregate) => CreateProjection((TAg

Timestamp = DateTimeOffset.UtcNow,

Hash = ProjectionCache.Hashes[GetType().Name]
Hash = EventSourcingCache.GetProjectionFactoryHash(GetType().Name)
};

/// <summary>
Expand All @@ -51,8 +51,6 @@ public Projection CreateProjection(Aggregate aggregate) => CreateProjection((TAg
/// <seealso cref="Aggregate{TAggregate}"/>
/// <seealso cref="IHashable"/>
/// <returns>Hash string</returns>
public virtual string ComputeHash() => IHashable.CombineHashes(
IHashable.ComputeMethodHash(
GetType().GetMethod(nameof(CreateProjection), BindingFlags.Instance | BindingFlags.NonPublic)),
ProjectionCache.AggregateHashes[AggregateType]);
public virtual string ComputeHash() => IHashable.ComputeMethodHash(
GetType().GetMethod(nameof(CreateProjection), BindingFlags.Instance | BindingFlags.NonPublic));
}
17 changes: 0 additions & 17 deletions EventSourcing.Core/Services/ProjectionService/ProjectionService.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ public ProjectionUpdateService(IAggregateService service, IRecordStore store)
public async Task UpdateAllProjectionsAsync<TAggregate, TProjection>(CancellationToken cancellationToken = default)
where TAggregate : Aggregate, new() where TProjection : Projection
{
var factory = ProjectionCache.FactoryByAggregateAndProjection[(typeof(TAggregate), typeof(TProjection))];
var hash = ProjectionCache.Hashes[factory.GetType().Name];
var factory = EventSourcingCache.GetProjectionFactory<TAggregate, TProjection>();

if (factory == null)
throw new ArgumentException($"Can't find ProjectionFactory of type {typeof(ProjectionFactory<TAggregate, TProjection>)}");

var hash = EventSourcingCache.GetProjectionFactoryHash(factory.GetType().Name);

// TODO: Make work with large data sets
var items = await _store.GetProjections<TProjection>()
.Where(x =>
x.AggregateType == typeof(TAggregate).Name &&
x.Hash != hash)
.Where(x => x.AggregateType == typeof(TAggregate).Name && x.Hash != hash)
.Select(x => new { x.PartitionId, x.AggregateId })
.AsAsyncEnumerable()
.ToListAsync(cancellationToken);
Expand Down
31 changes: 0 additions & 31 deletions EventSourcing.Core/Services/SnapshotFactory/SnapshotService.cs

This file was deleted.

4 changes: 1 addition & 3 deletions EventSourcing.Cosmos.Tests/RecordConversionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ private record TestEvent : Event
[Fact]
public Task Converter_Throws_On_Missing_And_Null_Properties_On_Read_And_Write()
{
var converter = new RecordConverter<Event>(new RecordConverterOptions
{
RecordTypes = new List<Type> { typeof(TestEvent) },
var converter = new RecordConverter<Event>(new RecordConverterOptions {
ThrowOnMissingNonNullableProperties = true
});

Expand Down
42 changes: 31 additions & 11 deletions EventSourcing.Cosmos/RecordConverter/RecordConverter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;

Expand All @@ -11,7 +12,17 @@ namespace Finaps.EventSourcing.Core;
/// </remarks>
public class RecordConverter<TRecord> : JsonConverter<TRecord> where TRecord : Record
{
private readonly RecordTypeCache _recordTypeCache;
private static readonly Dictionary<Type, string> RecordTypeStrings =
EventSourcingCache.RecordTypes.ToDictionary(
type => type, type => type.GetCustomAttribute<RecordTypeAttribute>()?.Type ?? type.Name);

private static readonly Dictionary<string, Type> RecordStringTypes =
RecordTypeStrings.ToDictionary(x => x.Value, x => x.Key);

private static readonly Dictionary<Type, PropertyInfo[]> NonNullableRecordProperties = EventSourcingCache.RecordTypes
.ToDictionary(type => type, type => type.GetProperties().Where(property =>
property.PropertyType.IsValueType && Nullable.GetUnderlyingType(property.PropertyType) == null).ToArray());

private readonly bool _throwOnMissingNonNullableProperties;

/// <summary>
Expand All @@ -23,7 +34,6 @@ public class RecordConverter<TRecord> : JsonConverter<TRecord> where TRecord : R
/// </param>
public RecordConverter(RecordConverterOptions? options = null)
{
_recordTypeCache = new RecordTypeCache(options?.RecordTypes);
_throwOnMissingNonNullableProperties = options?.ThrowOnMissingNonNullableProperties ?? false;
}

Expand All @@ -36,8 +46,14 @@ public RecordConverter(RecordConverterOptions? options = null)
/// <summary>
/// Serialize Record
/// </summary>
public override void Write(Utf8JsonWriter writer, TRecord value, JsonSerializerOptions options) =>
JsonSerializer.Serialize(writer, value with { Type = RecordTypeCache.GetAssemblyRecordTypeString(value.GetType()) }, value.GetType());
public override void Write(Utf8JsonWriter writer, TRecord value, JsonSerializerOptions options)
{
if (!RecordTypeStrings.TryGetValue(value.GetType(), out var type))
throw new ArgumentException($"Couldn't find Record Type string for {value.GetType()}. " +
"Make sure the type is a public non-abstract record.", nameof(value));

JsonSerializer.Serialize(writer, value with { Type = type }, value.GetType());
}

/// <summary>
/// Deserialize Record
Expand All @@ -49,22 +65,26 @@ public override TRecord Read(ref Utf8JsonReader reader, Type typeToConvert, Json

private Type DeserializeRecordType(Utf8JsonReader reader)
{
// TODO: Optimize this by only querying the first field
var json = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(ref reader);

// Get Record.Type String from Json
if (json == null || !json.TryGetValue("Type", out var typeString) || typeString.ValueKind != JsonValueKind.String)
if (json == null || !json.TryGetValue(nameof(Record.Type), out var typeString) || typeString.ValueKind != JsonValueKind.String)

// Throw Exception when json has no "Type" Property
throw new RecordValidationException(
$"Error converting {typeof(TRecord)}. " +
$"Couldn't parse {typeof(TRecord)}.Type string from Json. " +
$"Does the Json contain a {nameof(Record.Type)} field?");

var type = _recordTypeCache.GetRecordType(typeString.GetString()!);
$"Couldn't parse {typeof(TRecord)}.{nameof(Record.Type)} from Json. " +
$"Does the Json document contain a {nameof(Record.Type)} field?");

if (!_throwOnMissingNonNullableProperties) return type;
if (!RecordStringTypes.TryGetValue(typeString.GetString()!, out var type))
throw new ArgumentException($"Couldn't find Record Type string for {typeString}. " +
"Make sure the type is a public non-abstract record.");

var missing = _recordTypeCache.GetNonNullableRecordProperties(type)
if (!_throwOnMissingNonNullableProperties || !NonNullableRecordProperties.TryGetValue(type, out var properties))
return type;

var missing = properties
.Where(property => !json.TryGetValue(property.Name, out var value) || value.ValueKind == JsonValueKind.Null)
.Select(property => property.Name)
.ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ namespace Finaps.EventSourcing.Core;
/// </summary>
public class RecordConverterOptions
{
/// <summary>
/// <see cref="Record"/> types to use for deserialization.
/// When not specified, <see cref="RecordConverter{TRecord}"/> will use all <see cref="Record"/> types in assembly.
/// </summary>
public List<Type>? RecordTypes { get; set; }

/// <summary>
/// If true, RecordConverter will throw exception when not-nullable properties are not included or null in JSON
/// </summary>
Expand Down
Loading