diff --git a/README.md b/README.md index 6a60fbbc..2c9904f9 100644 --- a/README.md +++ b/README.md @@ -473,7 +473,7 @@ var cachedRemoteProxy = Proxy.Create(id => remoteProxy.Execute(id)) --- ## Patterns Table -PatternKit currently tracks 117 production-readiness patterns. Each catalog pattern is represented in tests, documentation, real-world examples, IoC integration, and the BenchmarkDotNet coverage matrix. +PatternKit currently tracks 118 production-readiness patterns. Each catalog pattern is represented in tests, documentation, real-world examples, IoC integration, and the BenchmarkDotNet coverage matrix. | Category | Count | Patterns | | --- | ---: | --- | @@ -481,7 +481,7 @@ PatternKit currently tracks 117 production-readiness patterns. Each catalog patt | Behavioral | 12 | Chain of Responsibility, Command, Interpreter, Iterator, Mediator, Memento, Null Object, Observer, State, Strategy, Template Method, Visitor | | Cloud Architecture | 20 | Ambassador, Backends for Frontends, Bulkhead, Cache-Aside, Cache Stampede Protection, Circuit Breaker, External Configuration Store, Gateway Aggregation, Gateway Routing, Health Endpoint Monitoring, Leader Election, Priority Queue, Queue-Based Load Leveling, Rate Limiting, Read-Through Cache, Retry, Scheduler Agent Supervisor, Sidecar, Strangler Fig, Write-Through Cache | | Creational | 6 | Abstract Factory, Builder, Factory Method, Object Pool, Prototype, Singleton | -| Enterprise Integration | 41 | Aggregator, Canonical Data Model, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | +| Enterprise Integration | 42 | Aggregator, Canonical Data Model, Change Data Capture, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | | Messaging Reliability | 4 | Backpressure, Idempotent Receiver, Inbox, Outbox | | Structural | 7 | Adapter, Bridge, Composite, Decorator, Facade, Flyweight, Proxy | @@ -535,6 +535,8 @@ BenchmarkDotNet guidance is documented in [docs/guides/benchmarks.md](docs/guide | Cache-Aside | Execution | 216.50 ns | 1,048 B | 208.60 ns | 1,048 B | Same allocation; generated was slightly faster for the miss-then-hit workflow. | | Read-Through / Write-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Read-Through / Write-Through Cache | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Change Data Capture | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Change Data Capture | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Read-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | | Read-Through Cache | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | | Write-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | diff --git a/benchmarks/PatternKit.Benchmarks/Messaging/ChangeDataCaptureBenchmarks.cs b/benchmarks/PatternKit.Benchmarks/Messaging/ChangeDataCaptureBenchmarks.cs new file mode 100644 index 00000000..e7555e0e --- /dev/null +++ b/benchmarks/PatternKit.Benchmarks/Messaging/ChangeDataCaptureBenchmarks.cs @@ -0,0 +1,43 @@ +using BenchmarkDotNet.Attributes; +using PatternKit.Examples.ChangeDataCaptureDemo; +using PatternKit.Messaging.ChangeDataCapture; + +namespace PatternKit.Benchmarks.Messaging; + +[BenchmarkCategory("EnterpriseIntegration", "ChangeDataCapture")] +public class ChangeDataCaptureBenchmarks +{ + private static readonly ProductMutation Mutation = new("sku-1", "Desk", 125m, 1); + + [Benchmark(Baseline = true, Description = "Fluent: create CDC pipeline")] + [BenchmarkCategory("Fluent", "Construction")] + public ChangeDataCapturePipeline Fluent_Create() + => ProductCatalogChangeDataCapturePolicies.CreateFluent( + new InMemoryChangeDataCaptureStore(), + new InMemoryProductChangePublisher()); + + [Benchmark(Description = "Generated: create CDC pipeline")] + [BenchmarkCategory("Generated", "Construction")] + public ChangeDataCapturePipeline Generated_Create() + => GeneratedProductCatalogChangeDataCapture.CreateGenerated( + static (_, _) => default, + new InMemoryChangeDataCaptureStore()); + + [Benchmark(Description = "Fluent: capture and publish mutation")] + [BenchmarkCategory("Fluent", "Execution")] + public async ValueTask Fluent_Capture_And_Publish() + { + var pipeline = Fluent_Create(); + await pipeline.CaptureAsync(Mutation).ConfigureAwait(false); + return await pipeline.PublishPendingAsync().ConfigureAwait(false); + } + + [Benchmark(Description = "Generated: capture and publish mutation")] + [BenchmarkCategory("Generated", "Execution")] + public async ValueTask Generated_Capture_And_Publish() + { + var pipeline = Generated_Create(); + await pipeline.CaptureAsync(Mutation).ConfigureAwait(false); + return await pipeline.PublishPendingAsync().ConfigureAwait(false); + } +} diff --git a/docs/examples/index.md b/docs/examples/index.md index 8cffc98b..f1f37d23 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -60,6 +60,9 @@ Welcome! This section collects small, focused demos that show **how to compose b * **Customer Profile Lazy Load** A Generic Host importable deferred profile lookup with fluent and source-generated routes, `IServiceCollection` registration, TTL caching, and invalidation. See [Customer Profile Lazy Load](customer-profile-lazy-load.md). +* **Product Catalog Change Data Capture** + A Generic Host importable mutation capture workflow with fluent and source-generated routes, ordered pending records, and post-commit publication. See [Product Catalog Change Data Capture](product-catalog-change-data-capture.md). + * **Minimal Web Request Router** A tiny "API gateway" that separates **first-match middleware** (side effects/logging/auth) from **first-match routes** and **content negotiation**. A crisp example of Strategy patterns in an HTTP-ish setting. diff --git a/docs/examples/product-catalog-change-data-capture.md b/docs/examples/product-catalog-change-data-capture.md new file mode 100644 index 00000000..e48ccd89 --- /dev/null +++ b/docs/examples/product-catalog-change-data-capture.md @@ -0,0 +1,18 @@ +# Product Catalog Change Data Capture + +The product catalog CDC example shows how a catalog service can capture product mutations as ordered integration events and publish them after the mutation is stored. + +Import it into a host: + +```csharp +services.AddProductCatalogChangeDataCaptureDemo(); +``` + +The demo registers: + +- `IChangeDataCaptureStore` for pending capture records. +- `IProductChangePublisher` for event handoff. +- `ChangeDataCapturePipeline` as the application-owned CDC pipeline. +- `ProductCatalogChangeDataCaptureService` as the host-facing workflow. + +The source-generated route emits the same pipeline factory while letting the application supply its real publisher and durable store. diff --git a/docs/examples/toc.yml b/docs/examples/toc.yml index bcb1de5e..97d3b866 100644 --- a/docs/examples/toc.yml +++ b/docs/examples/toc.yml @@ -211,6 +211,9 @@ - name: Customer Profile Lazy Load href: customer-profile-lazy-load.md +- name: Product Catalog Change Data Capture + href: product-catalog-change-data-capture.md + - name: Order Transaction Script Pattern href: order-transaction-script-pattern.md diff --git a/docs/generators/change-data-capture.md b/docs/generators/change-data-capture.md new file mode 100644 index 00000000..28a12aeb --- /dev/null +++ b/docs/generators/change-data-capture.md @@ -0,0 +1,33 @@ +# Change Data Capture Generator + +`[GenerateChangeDataCapture]` emits a typed factory for `ChangeDataCapturePipeline` from a mapper method. + +```csharp +[GenerateChangeDataCapture( + typeof(ProductMutation), + typeof(ProductChanged), + FactoryMethodName = "CreatePipeline", + MapperMethodName = "Map", + PipelineName = "product-catalog-cdc")] +public static partial class ProductCatalogCdc +{ + public static ProductChanged Map(ProductMutation mutation, long sequence) + => new(sequence, mutation.Sku, mutation.Name); +} +``` + +## Diagnostics + +| ID | Severity | Message | +| --- | --- | --- | +| `PKCDC001` | Error | The host type must be partial. | +| `PKCDC002` | Error | Factory and mapper method names must be valid C# identifiers. | +| `PKCDC003` | Error | Mutation and event types are required. | + +Register generated pipelines by supplying the application publisher and durable store: + +```csharp +services.AddSingleton(sp => ProductCatalogCdc.CreatePipeline( + (changed, ct) => sp.GetRequiredService().PublishAsync(changed, ct), + sp.GetRequiredService>())); +``` diff --git a/docs/generators/index.md b/docs/generators/index.md index c3c88778..32de7135 100644 --- a/docs/generators/index.md +++ b/docs/generators/index.md @@ -125,6 +125,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato | [**Saga**](messaging.md#generated-saga) | Typed process-manager transition factories | `[GenerateSaga]` | | [**Mailbox**](messaging.md#generated-mailbox) | Serialized in-process inbox factories | `[GenerateMailbox]` | | [**Backpressure**](backpressure.md) | Admission-control policy factories for saturated work boundaries | `[GenerateBackpressurePolicy]` | +| [**Change Data Capture**](change-data-capture.md) | Ordered mutation capture pipeline factories | `[GenerateChangeDataCapture]` | | [**Reliability Pipeline**](messaging.md#generated-reliability-pipeline) | Idempotent receiver, inbox, and outbox factories | `[GenerateReliabilityPipeline]` | | [**Backplane Topology**](messaging.md#generated-backplane-topology) | Request/reply routes and publish/subscribe endpoint topology | `[GenerateBackplaneTopology]` | @@ -290,6 +291,10 @@ public static partial class OrderMailbox { } [GenerateBackpressurePolicy(typeof(CheckoutAdmission), Capacity = 8, Mode = "Wait", WaitTimeoutMilliseconds = 50)] public static partial class CheckoutBackpressurePolicy { } +// Change data capture - generated mutation capture pipeline +[GenerateChangeDataCapture(typeof(ProductMutation), typeof(ProductChanged), MapperMethodName = "Map")] +public static partial class ProductCatalogCdc { } + // Reliability pipeline - generated idempotent receiver, inbox, and outbox factories [GenerateReliabilityPipeline(typeof(AcceptOrder), typeof(string), typeof(OrderAccepted))] public static partial class OrderReliability { } diff --git a/docs/generators/toc.yml b/docs/generators/toc.yml index 95a6f458..f0a2f238 100644 --- a/docs/generators/toc.yml +++ b/docs/generators/toc.yml @@ -31,6 +31,9 @@ - name: Backpressure href: backpressure.md +- name: Change Data Capture + href: change-data-capture.md + - name: Cache-Aside href: cache-aside.md diff --git a/docs/guides/benchmark-results.md b/docs/guides/benchmark-results.md index cf28e0b8..ee5d3aa9 100644 --- a/docs/guides/benchmark-results.md +++ b/docs/guides/benchmark-results.md @@ -59,6 +59,8 @@ The latest measured timings below were captured on Windows 11, Intel Core i9-149 | Cache-Aside | Execution | 216.50 ns | 1,048 B | 208.60 ns | 1,048 B | Same allocation; generated was slightly faster for the miss-then-hit workflow. | | Read-Through / Write-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Read-Through / Write-Through Cache | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Change Data Capture | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Change Data Capture | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Read-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | | Read-Through Cache | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | | Write-Through Cache | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix through the read/write-through cache route. | @@ -256,7 +258,7 @@ The latest measured timings below were captured on Windows 11, Intel Core i9-149 ## Coverage Matrix Summary -The coverage matrix currently publishes 117 catalog patterns and 468 pattern route results. Each pattern has four BenchmarkDotNet routes: fluent construction, fluent execution, source-generated construction, and source-generated execution. The reusable hosting integration matrix publishes 12 reusable hosting integration route results for package-level `IServiceCollection` registrations. +The coverage matrix currently publishes 118 catalog patterns and 472 pattern route results. Each pattern has four BenchmarkDotNet routes: fluent construction, fluent execution, source-generated construction, and source-generated execution. The reusable hosting integration matrix publishes 12 reusable hosting integration route results for package-level `IServiceCollection` registrations. | Category | Patterns | Published route results | | --- | ---: | ---: | @@ -264,11 +266,11 @@ The coverage matrix currently publishes 117 catalog patterns and 468 pattern rou | Behavioral | 12 | 48 | | Cloud Architecture | 20 | 80 | | Creational | 6 | 24 | -| Enterprise Integration | 41 | 164 | +| Enterprise Integration | 42 | 168 | | Messaging Reliability | 4 | 16 | | Structural | 7 | 28 | -The generator matrix currently publishes 112 generator source route results. +The generator matrix currently publishes 113 generator source route results. ## Hosting Integration Matrix Results @@ -358,6 +360,7 @@ The generator matrix currently publishes 112 generator source route results. | Creational | Singleton | Covered | Covered | Covered | Covered | | Enterprise Integration | Aggregator | Covered | Covered | Covered | Covered | | Enterprise Integration | Canonical Data Model | Covered | Covered | Covered | Covered | +| Enterprise Integration | Change Data Capture | Covered | Covered | Covered | Covered | | Enterprise Integration | Channel Adapter | Covered | Covered | Covered | Covered | | Enterprise Integration | Channel Purger | Covered | Covered | Covered | Covered | | Enterprise Integration | Invalid Message Channel | Covered | Covered | Covered | Covered | @@ -466,6 +469,7 @@ The generator matrix currently publishes 112 generator source route results. | NullObjectGenerator | `src/PatternKit.Generators/NullObject/NullObjectGenerator.cs` | Covered | | BackplaneTopologyGenerator | `src/PatternKit.Generators/Messaging/BackplaneTopologyGenerator.cs` | Covered | | ChannelAdapterGenerator | `src/PatternKit.Generators/Messaging/ChannelAdapterGenerator.cs` | Covered | +| ChangeDataCaptureGenerator | `src/PatternKit.Generators/ChangeDataCapture/ChangeDataCaptureGenerator.cs` | Covered | | ChannelPurgerGenerator | `src/PatternKit.Generators/Messaging/ChannelPurgerGenerator.cs` | Covered | | InvalidMessageChannelGenerator | `src/PatternKit.Generators/Messaging/InvalidMessageChannelGenerator.cs` | Covered | | ClaimCheckGenerator | `src/PatternKit.Generators/Messaging/ClaimCheckGenerator.cs` | Covered | diff --git a/docs/index.md b/docs/index.md index 9db05552..2956d2b5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -66,7 +66,7 @@ if (parser.Execute("123", out var value)) ## 📚 Available Patterns -PatternKit covers 117 production-readiness patterns with fluent APIs, source-generated routes where applicable, IoC integration examples, TinyBDD coverage, and BenchmarkDotNet coverage-matrix validation: +PatternKit covers 118 production-readiness patterns with fluent APIs, source-generated routes where applicable, IoC integration examples, TinyBDD coverage, and BenchmarkDotNet coverage-matrix validation: | Category | Count | Patterns | | --- | ---: | --- | @@ -74,7 +74,7 @@ PatternKit covers 117 production-readiness patterns with fluent APIs, source-gen | Behavioral | 12 | Chain of Responsibility, Command, Interpreter, Iterator, Mediator, Memento, Null Object, Observer, State, Strategy, Template Method, Visitor | | Cloud Architecture | 20 | Ambassador, Backends for Frontends, Bulkhead, Cache-Aside, Cache Stampede Protection, Circuit Breaker, External Configuration Store, Gateway Aggregation, Gateway Routing, Health Endpoint Monitoring, Leader Election, Priority Queue, Queue-Based Load Leveling, Rate Limiting, Read-Through Cache, Retry, Scheduler Agent Supervisor, Sidecar, Strangler Fig, Write-Through Cache | | Creational | 6 | Abstract Factory, Builder, Factory Method, Object Pool, Prototype, Singleton | -| Enterprise Integration | 41 | Aggregator, Canonical Data Model, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | +| Enterprise Integration | 42 | Aggregator, Canonical Data Model, Change Data Capture, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | | Messaging Reliability | 4 | Backpressure, Idempotent Receiver, Inbox, Outbox | | Structural | 7 | Adapter, Bridge, Composite, Decorator, Facade, Flyweight, Proxy | diff --git a/docs/patterns/messaging/change-data-capture.md b/docs/patterns/messaging/change-data-capture.md new file mode 100644 index 00000000..ae498215 --- /dev/null +++ b/docs/patterns/messaging/change-data-capture.md @@ -0,0 +1,25 @@ +# Change Data Capture + +Change Data Capture tracks durable data mutations as ordered integration events. Use it when application state changes must be published to downstream consumers without coupling the write path to a broker, webhook, or projection updater. + +## Fluent Path + +```csharp +var pipeline = ChangeDataCapturePipeline + .Create("product-catalog-cdc") + .UseStore(store) + .MapWith((mutation, sequence) => new ProductChanged(sequence, mutation.Sku, mutation.Name)) + .PublishWith((changed, ct) => publisher.PublishAsync(changed, ct)) + .Build(); + +await pipeline.CaptureAsync(new ProductMutation("sku-1", "Desk")); +await pipeline.PublishPendingAsync(); +``` + +`CaptureAsync` appends an ordered CDC entry before publication. `PublishPendingAsync` reads unpublished entries in sequence, publishes each event through the configured publisher, marks successes as published, and leaves failures pending with an incremented attempt count. + +## Production Notes + +`InMemoryChangeDataCaptureStore` is deterministic for tests and examples. Production hosts should implement `IChangeDataCaptureStore` over the same durable store that commits the business mutation so the change record and domain write share a transaction boundary. + +See [Product Catalog Change Data Capture](../../examples/product-catalog-change-data-capture.md). diff --git a/docs/patterns/toc.yml b/docs/patterns/toc.yml index 3f7289e0..3a81ef06 100644 --- a/docs/patterns/toc.yml +++ b/docs/patterns/toc.yml @@ -371,6 +371,8 @@ href: messaging/saga.md - name: Mailbox href: messaging/mailbox.md + - name: Change Data Capture + href: messaging/change-data-capture.md - name: Backpressure href: messaging/backpressure.md - name: Idempotent Receiver, Inbox, and Outbox diff --git a/src/PatternKit.Core/Messaging/ChangeDataCapture/ChangeDataCapturePipeline.cs b/src/PatternKit.Core/Messaging/ChangeDataCapture/ChangeDataCapturePipeline.cs new file mode 100644 index 00000000..e0f2edc5 --- /dev/null +++ b/src/PatternKit.Core/Messaging/ChangeDataCapture/ChangeDataCapturePipeline.cs @@ -0,0 +1,338 @@ +namespace PatternKit.Messaging.ChangeDataCapture; + +public enum ChangeDataCaptureOperation +{ + Insert, + Update, + Delete, + Upsert +} + +public sealed class ChangeDataCaptureMutation +{ + public ChangeDataCaptureMutation(TKey key, ChangeDataCaptureOperation operation, string entityName, TPayload payload, long version, DateTimeOffset occurredAt) + { + Key = key; + Operation = operation; + EntityName = string.IsNullOrWhiteSpace(entityName) ? throw new ArgumentException("Entity name is required.", nameof(entityName)) : entityName; + Payload = payload; + Version = version; + OccurredAt = occurredAt; + } + + public TKey Key { get; } + public ChangeDataCaptureOperation Operation { get; } + public string EntityName { get; } + public TPayload Payload { get; } + public long Version { get; } + public DateTimeOffset OccurredAt { get; } +} + +public sealed class ChangeDataCaptureEntry +{ + public ChangeDataCaptureEntry( + long sequence, + string pipelineName, + TMutation mutation, + TEvent @event, + DateTimeOffset capturedAt, + bool published = false, + DateTimeOffset? publishedAt = null, + int attempts = 0) + { + if (sequence <= 0) + throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "CDC sequence must be positive."); + + Sequence = sequence; + PipelineName = string.IsNullOrWhiteSpace(pipelineName) ? throw new ArgumentException("CDC pipeline name is required.", nameof(pipelineName)) : pipelineName; + Mutation = mutation; + Event = @event; + CapturedAt = capturedAt; + Published = published; + PublishedAt = publishedAt; + Attempts = attempts; + } + + public long Sequence { get; } + public string PipelineName { get; } + public TMutation Mutation { get; } + public TEvent Event { get; } + public DateTimeOffset CapturedAt { get; } + public bool Published { get; } + public DateTimeOffset? PublishedAt { get; } + public int Attempts { get; } + + public ChangeDataCaptureEntry MarkPublished(DateTimeOffset publishedAt) + => new(Sequence, PipelineName, Mutation, Event, CapturedAt, true, publishedAt, Attempts + 1); + + public ChangeDataCaptureEntry MarkAttempt() + => new(Sequence, PipelineName, Mutation, Event, CapturedAt, Published, PublishedAt, Attempts + 1); +} + +public readonly struct ChangeDataCapturePublishSummary : IEquatable +{ + public ChangeDataCapturePublishSummary(int published, int failed) + { + Published = published; + Failed = failed; + } + + public int Published { get; } + public int Failed { get; } + + public bool Equals(ChangeDataCapturePublishSummary other) + => Published == other.Published && Failed == other.Failed; + + public override bool Equals(object? obj) + => obj is ChangeDataCapturePublishSummary other && Equals(other); + + public override int GetHashCode() + => (Published * 397) ^ Failed; + + public static bool operator ==(ChangeDataCapturePublishSummary left, ChangeDataCapturePublishSummary right) + => left.Equals(right); + + public static bool operator !=(ChangeDataCapturePublishSummary left, ChangeDataCapturePublishSummary right) + => !left.Equals(right); +} + +public interface IChangeDataCaptureStore +{ + ValueTask GetNextSequenceAsync(string pipelineName, CancellationToken cancellationToken = default); + + ValueTask> AppendAsync( + string pipelineName, + long sequence, + TMutation mutation, + TEvent @event, + DateTimeOffset capturedAt, + CancellationToken cancellationToken = default); + + ValueTask>> ReadPendingAsync( + string pipelineName, + CancellationToken cancellationToken = default); + + ValueTask MarkPublishedAsync( + long sequence, + DateTimeOffset publishedAt, + CancellationToken cancellationToken = default); + + ValueTask MarkAttemptAsync( + long sequence, + CancellationToken cancellationToken = default); +} + +public sealed class InMemoryChangeDataCaptureStore : IChangeDataCaptureStore +{ + private readonly object _sync = new(); + private readonly List> _entries = []; + private long _sequence; + + public ValueTask GetNextSequenceAsync(string pipelineName, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (string.IsNullOrWhiteSpace(pipelineName)) + throw new ArgumentException("CDC pipeline name is required.", nameof(pipelineName)); + + lock (_sync) + return new(_sequence + 1); + } + + public ValueTask> AppendAsync( + string pipelineName, + long sequence, + TMutation mutation, + TEvent @event, + DateTimeOffset capturedAt, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (string.IsNullOrWhiteSpace(pipelineName)) + throw new ArgumentException("CDC pipeline name is required.", nameof(pipelineName)); + if (sequence <= 0) + throw new ArgumentOutOfRangeException(nameof(sequence), sequence, "CDC sequence must be positive."); + + lock (_sync) + { + if (sequence <= _sequence) + throw new InvalidOperationException($"CDC sequence '{sequence}' has already been used."); + + _sequence = sequence; + var entry = new ChangeDataCaptureEntry(sequence, pipelineName, mutation, @event, capturedAt); + _entries.Add(entry); + return new(entry); + } + } + + public ValueTask>> ReadPendingAsync( + string pipelineName, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (string.IsNullOrWhiteSpace(pipelineName)) + throw new ArgumentException("CDC pipeline name is required.", nameof(pipelineName)); + + ChangeDataCaptureEntry[] pending; + lock (_sync) + { + pending = _entries + .Where(entry => string.Equals(entry.PipelineName, pipelineName, StringComparison.Ordinal) && !entry.Published) + .OrderBy(static entry => entry.Sequence) + .ToArray(); + } + + return new(pending); + } + + public ValueTask MarkPublishedAsync( + long sequence, + DateTimeOffset publishedAt, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + Update(sequence, entry => entry.MarkPublished(publishedAt)); + return default; + } + + public ValueTask MarkAttemptAsync(long sequence, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + Update(sequence, static entry => entry.MarkAttempt()); + return default; + } + + public IReadOnlyList> Snapshot() + { + lock (_sync) + return _entries.ToArray(); + } + + private void Update(long sequence, Func, ChangeDataCaptureEntry> update) + { + lock (_sync) + { + var index = _entries.FindIndex(entry => entry.Sequence == sequence); + if (index < 0) + throw new KeyNotFoundException($"CDC entry '{sequence}' was not found."); + + _entries[index] = update(_entries[index]); + } + } +} + +public sealed class ChangeDataCapturePipeline +{ + private readonly Func _eventFactory; + private readonly Func _publisher; + private readonly IChangeDataCaptureStore _store; + private readonly Func _utcNow; + + private ChangeDataCapturePipeline( + string name, + Func eventFactory, + Func publisher, + IChangeDataCaptureStore store, + Func utcNow) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("CDC pipeline name is required.", nameof(name)); + + Name = name; + _eventFactory = eventFactory ?? throw new ArgumentNullException(nameof(eventFactory)); + _publisher = publisher ?? throw new ArgumentNullException(nameof(publisher)); + _store = store ?? throw new ArgumentNullException(nameof(store)); + _utcNow = utcNow ?? throw new ArgumentNullException(nameof(utcNow)); + } + + public string Name { get; } + + public static Builder Create(string name = "change-data-capture") => new(name); + + public async ValueTask> CaptureAsync( + TMutation mutation, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (mutation is null) + throw new ArgumentNullException(nameof(mutation)); + + var nextSequence = await _store.GetNextSequenceAsync(Name, cancellationToken).ConfigureAwait(false); + var @event = _eventFactory(mutation, nextSequence); + return await _store.AppendAsync(Name, nextSequence, mutation, @event, _utcNow(), cancellationToken).ConfigureAwait(false); + } + + public async ValueTask PublishPendingAsync(CancellationToken cancellationToken = default) + { + var published = 0; + var failed = 0; + + foreach (var entry in await _store.ReadPendingAsync(Name, cancellationToken).ConfigureAwait(false)) + { + try + { + await _publisher(entry.Event, cancellationToken).ConfigureAwait(false); + await _store.MarkPublishedAsync(entry.Sequence, _utcNow(), cancellationToken).ConfigureAwait(false); + published++; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch + { + await _store.MarkAttemptAsync(entry.Sequence, cancellationToken).ConfigureAwait(false); + failed++; + } + } + + return new(published, failed); + } + + public ValueTask>> ReadPendingAsync(CancellationToken cancellationToken = default) + => _store.ReadPendingAsync(Name, cancellationToken); + + public sealed class Builder + { + private readonly string _name; + private Func? _eventFactory; + private Func? _publisher; + private IChangeDataCaptureStore _store = new InMemoryChangeDataCaptureStore(); + private Func _utcNow = () => DateTimeOffset.UtcNow; + + internal Builder(string name) => _name = name; + + public Builder MapWith(Func eventFactory) + { + _eventFactory = eventFactory ?? throw new ArgumentNullException(nameof(eventFactory)); + return this; + } + + public Builder PublishWith(Func publisher) + { + _publisher = publisher ?? throw new ArgumentNullException(nameof(publisher)); + return this; + } + + public Builder UseStore(IChangeDataCaptureStore store) + { + _store = store ?? throw new ArgumentNullException(nameof(store)); + return this; + } + + public Builder WithClock(Func utcNow) + { + _utcNow = utcNow ?? throw new ArgumentNullException(nameof(utcNow)); + return this; + } + + public ChangeDataCapturePipeline Build() + { + if (_eventFactory is null) + throw new InvalidOperationException("CDC pipeline requires an event mapper."); + if (_publisher is null) + throw new InvalidOperationException("CDC pipeline requires a publisher."); + + return new(_name, _eventFactory, _publisher, _store, _utcNow); + } + } +} diff --git a/src/PatternKit.Examples/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemo.cs b/src/PatternKit.Examples/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemo.cs new file mode 100644 index 00000000..a6d8281a --- /dev/null +++ b/src/PatternKit.Examples/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemo.cs @@ -0,0 +1,78 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Generators.ChangeDataCapture; +using PatternKit.Messaging.ChangeDataCapture; + +namespace PatternKit.Examples.ChangeDataCaptureDemo; + +public sealed record ProductMutation(string Sku, string Name, decimal Price, long Version); +public sealed record ProductChanged(long Sequence, string Sku, string Name, decimal Price, long Version); + +public interface IProductChangePublisher +{ + ValueTask PublishAsync(ProductChanged changed, CancellationToken cancellationToken = default); +} + +public sealed class InMemoryProductChangePublisher : IProductChangePublisher +{ + private readonly List _published = []; + + public IReadOnlyList Published => _published; + + public ValueTask PublishAsync(ProductChanged changed, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + _published.Add(changed); + return default; + } +} + +public sealed class ProductCatalogChangeDataCaptureService(ChangeDataCapturePipeline pipeline) +{ + public async ValueTask UpsertAsync(ProductMutation mutation, CancellationToken cancellationToken = default) + { + await pipeline.CaptureAsync(mutation, cancellationToken).ConfigureAwait(false); + return await pipeline.PublishPendingAsync(cancellationToken).ConfigureAwait(false); + } +} + +public static class ProductCatalogChangeDataCapturePolicies +{ + public static ChangeDataCapturePipeline CreateFluent( + IChangeDataCaptureStore store, + IProductChangePublisher publisher) + => ChangeDataCapturePipeline.Create("product-catalog-cdc") + .UseStore(store) + .MapWith(Map) + .PublishWith((changed, ct) => publisher.PublishAsync(changed, ct)) + .Build(); + + public static ProductChanged Map(ProductMutation mutation, long sequence) + => new(sequence, mutation.Sku, mutation.Name, mutation.Price, mutation.Version); +} + +[GenerateChangeDataCapture( + typeof(ProductMutation), + typeof(ProductChanged), + FactoryMethodName = nameof(CreateGenerated), + MapperMethodName = nameof(MapGenerated), + PipelineName = "product-catalog-cdc")] +public static partial class GeneratedProductCatalogChangeDataCapture +{ + public static ProductChanged MapGenerated(ProductMutation mutation, long sequence) + => new(sequence, mutation.Sku, mutation.Name, mutation.Price, mutation.Version); +} + +public static class ProductCatalogChangeDataCaptureServiceCollectionExtensions +{ + public static IServiceCollection AddProductCatalogChangeDataCaptureDemo(this IServiceCollection services) + { + services.AddSingleton, InMemoryChangeDataCaptureStore>(); + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddSingleton(sp => ProductCatalogChangeDataCapturePolicies.CreateFluent( + sp.GetRequiredService>(), + sp.GetRequiredService())); + services.AddSingleton(); + return services; + } +} diff --git a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs index bfe9c610..462d6039 100644 --- a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs +++ b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs @@ -44,6 +44,7 @@ using PatternKit.Examples.CanonicalDataModelDemo; using PatternKit.Examples.Chain; using PatternKit.Examples.Chain.ConfigDriven; +using PatternKit.Examples.ChangeDataCaptureDemo; using PatternKit.Examples.CircuitBreakerDemo; using PatternKit.Examples.ContextMapDemo; using PatternKit.Examples.DataMapperDemo; @@ -229,6 +230,7 @@ public sealed record CheckoutUnitOfWorkPatternExample(CheckoutUnitOfWorkDemoRunn public sealed record OrderDataMapperPatternExample(OrderDataMapperDemoRunner Runner, OrderDataMapperWorkflow Workflow); public sealed record OrderIdentityMapPatternExample(OrderIdentityMapDemoRunner Runner); public sealed record CustomerProfileLazyLoadPatternExample(CustomerProfileLazyLoadService Service); +public sealed record ProductCatalogChangeDataCaptureExample(ProductCatalogChangeDataCaptureService Service); public sealed record OrderTransactionScriptPatternExample(OrderTransactionScriptDemoRunner Runner); public sealed record CustomerServiceLayerPatternExample(CustomerServiceLayerDemoRunner Runner); public sealed record OrderDomainEventPatternExample(OrderDomainEventDemoRunner Runner); @@ -354,6 +356,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s .AddOrderDataMapperPatternExample() .AddOrderIdentityMapPatternExample() .AddCustomerProfileLazyLoadPatternExample() + .AddProductCatalogChangeDataCaptureExample() .AddOrderTransactionScriptPatternExample() .AddCustomerServiceLayerPatternExample() .AddOrderDomainEventPatternExample() @@ -1048,6 +1051,13 @@ public static IServiceCollection AddCustomerProfileLazyLoadPatternExample(this I return services.RegisterExample("Customer Profile Lazy Load", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost); } + public static IServiceCollection AddProductCatalogChangeDataCaptureExample(this IServiceCollection services) + { + services.AddProductCatalogChangeDataCaptureDemo(); + services.AddSingleton(sp => new(sp.GetRequiredService())); + return services.RegisterExample("Product Catalog Change Data Capture", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost); + } + public static IServiceCollection AddOrderTransactionScriptPatternExample(this IServiceCollection services) { services.AddOrderTransactionScriptDemo(); diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs index dfa117d4..f1d1fdd3 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs @@ -600,6 +600,14 @@ public sealed class PatternKitExampleCatalog : IPatternKitExampleCatalog ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost, ["Lazy Load"], ["deferred profile relationship", "source-generated lazy-load factory", "DI composition"]), + Descriptor( + "Product Catalog Change Data Capture", + "src/PatternKit.Examples/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemo.cs", + "test/PatternKit.Examples.Tests/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemoTests.cs", + "docs/examples/product-catalog-change-data-capture.md", + ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost, + ["Change Data Capture"], + ["ordered mutation capture", "source-generated CDC pipeline factory", "DI composition"]), Descriptor( "Order Transaction Script Pattern", "src/PatternKit.Examples/TransactionScriptDemo/OrderTransactionScriptDemo.cs", diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs index 333ff3af..28a43e40 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs @@ -896,6 +896,19 @@ public sealed class PatternKitPatternCatalog : IPatternKitPatternCatalog "test/PatternKit.Examples.Tests/Messaging/MailboxExampleTests.cs", ["fluent serialized inbox", "generated mailbox", "DI-importable bounded worker example"]), + Pattern("Change Data Capture", PatternFamily.EnterpriseIntegration, + "docs/patterns/messaging/change-data-capture.md", + "src/PatternKit.Core/Messaging/ChangeDataCapture/ChangeDataCapturePipeline.cs", + "test/PatternKit.Tests/Messaging/ChangeDataCapture/ChangeDataCapturePipelineTests.cs", + "docs/generators/change-data-capture.md", + "src/PatternKit.Generators/ChangeDataCapture/ChangeDataCaptureGenerator.cs", + "test/PatternKit.Generators.Tests/ChangeDataCaptureGeneratorTests.cs", + null, + "docs/examples/product-catalog-change-data-capture.md", + "src/PatternKit.Examples/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemo.cs", + "test/PatternKit.Examples.Tests/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemoTests.cs", + ["fluent CDC pipeline", "generated CDC factory", "DI-importable product catalog mutation publisher"]), + Pattern("Backpressure", PatternFamily.MessagingReliability, "docs/patterns/messaging/backpressure.md", "src/PatternKit.Core/Messaging/Reliability/Backpressure/BackpressurePolicy.cs", diff --git a/src/PatternKit.Generators.Abstractions/ChangeDataCapture/ChangeDataCaptureAttributes.cs b/src/PatternKit.Generators.Abstractions/ChangeDataCapture/ChangeDataCaptureAttributes.cs new file mode 100644 index 00000000..f82d5cf5 --- /dev/null +++ b/src/PatternKit.Generators.Abstractions/ChangeDataCapture/ChangeDataCaptureAttributes.cs @@ -0,0 +1,11 @@ +namespace PatternKit.Generators.ChangeDataCapture; + +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = false)] +public sealed class GenerateChangeDataCaptureAttribute(Type mutationType, Type eventType) : Attribute +{ + public Type MutationType { get; } = mutationType; + public Type EventType { get; } = eventType; + public string FactoryMethodName { get; set; } = "Create"; + public string MapperMethodName { get; set; } = "Map"; + public string PipelineName { get; set; } = "change-data-capture"; +} diff --git a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md index beb5f72c..22ffb15f 100644 --- a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md +++ b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md @@ -458,6 +458,10 @@ PKBP001 | PatternKit.Generators.Backpressure | Error | Backpressure policy host PKBP002 | PatternKit.Generators.Backpressure | Error | Backpressure policy configuration is invalid. PKBP003 | PatternKit.Generators.Backpressure | Error | Backpressure factory method name is invalid. PKBP004 | PatternKit.Generators.Backpressure | Error | Backpressure mode is invalid. +PKCDC001 | PatternKit.Generators.ChangeDataCapture | Error | Change Data Capture host must be partial. +PKCDC002 | PatternKit.Generators.ChangeDataCapture | Error | Change Data Capture member names must be valid identifiers. +PKCDC003 | PatternKit.Generators.ChangeDataCapture | Error | Change Data Capture mutation and event types are required. +PKCDC004 | PatternKit.Generators.ChangeDataCapture | Error | Change Data Capture containing type must be partial. PKLL001 | PatternKit.Generators.LazyLoading | Error | Lazy load host must be partial. PKLL002 | PatternKit.Generators.LazyLoading | Error | Lazy load configuration is invalid. PKLL003 | PatternKit.Generators.LazyLoading | Error | Lazy load member name is invalid. diff --git a/src/PatternKit.Generators/ChangeDataCapture/ChangeDataCaptureGenerator.cs b/src/PatternKit.Generators/ChangeDataCapture/ChangeDataCaptureGenerator.cs new file mode 100644 index 00000000..d0eb625c --- /dev/null +++ b/src/PatternKit.Generators/ChangeDataCapture/ChangeDataCaptureGenerator.cs @@ -0,0 +1,231 @@ +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using Microsoft.CodeAnalysis.CSharp.Syntax; +using Microsoft.CodeAnalysis.Text; + +namespace PatternKit.Generators.ChangeDataCapture; + +[Generator] +public sealed class ChangeDataCaptureGenerator : IIncrementalGenerator +{ + private const string AttributeName = "PatternKit.Generators.ChangeDataCapture.GenerateChangeDataCaptureAttribute"; + + private static readonly SymbolDisplayFormat TypeFormat = new( + globalNamespaceStyle: SymbolDisplayGlobalNamespaceStyle.Included, + typeQualificationStyle: SymbolDisplayTypeQualificationStyle.NameAndContainingTypesAndNamespaces, + genericsOptions: SymbolDisplayGenericsOptions.IncludeTypeParameters, + miscellaneousOptions: SymbolDisplayMiscellaneousOptions.IncludeNullableReferenceTypeModifier | SymbolDisplayMiscellaneousOptions.UseSpecialTypes); + + private static readonly DiagnosticDescriptor MustBePartial = new( + "PKCDC001", + "Change Data Capture host must be partial", + "Type '{0}' is marked with [GenerateChangeDataCapture] but is not declared as partial", + "PatternKit.Generators.ChangeDataCapture", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidIdentifier = new( + "PKCDC002", + "Change Data Capture member name is invalid", + "Change Data Capture '{0}' has an invalid member name '{1}'", + "PatternKit.Generators.ChangeDataCapture", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor MissingTypes = new( + "PKCDC003", + "Change Data Capture types are required", + "Change Data Capture '{0}' requires mutation and event types", + "PatternKit.Generators.ChangeDataCapture", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor ContainingTypeMustBePartial = new( + "PKCDC004", + "Change Data Capture containing type must be partial", + "Type '{0}' contains a [GenerateChangeDataCapture] host but is not declared as partial", + "PatternKit.Generators.ChangeDataCapture", + DiagnosticSeverity.Error, + true); + + public void Initialize(IncrementalGeneratorInitializationContext context) + { + var candidates = context.SyntaxProvider.ForAttributeWithMetadataName( + AttributeName, + static (node, _) => node is TypeDeclarationSyntax, + static (ctx, _) => (Type: (INamedTypeSymbol)ctx.TargetSymbol, Node: (TypeDeclarationSyntax)ctx.TargetNode, Attributes: ctx.Attributes)); + + context.RegisterSourceOutput(candidates, static (spc, candidate) => + { + var attr = candidate.Attributes.FirstOrDefault(static attribute => + attribute.AttributeClass?.ToDisplayString() == AttributeName); + if (attr is not null) + Generate(spc, candidate.Type, candidate.Node, attr); + }); + } + + private static void Generate(SourceProductionContext context, INamedTypeSymbol type, TypeDeclarationSyntax node, AttributeData attribute) + { + if (!node.Modifiers.Any(static modifier => modifier.Text == "partial")) + { + context.ReportDiagnostic(Diagnostic.Create(MustBePartial, node.Identifier.GetLocation(), type.Name)); + return; + } + + foreach (var containingType in GetContainingTypes(type)) + { + if (!IsPartial(containingType)) + { + context.ReportDiagnostic(Diagnostic.Create( + ContainingTypeMustBePartial, + containingType.Locations.FirstOrDefault() ?? node.Identifier.GetLocation(), + containingType.Name)); + return; + } + } + + var mutationType = attribute.ConstructorArguments.Length >= 1 + ? attribute.ConstructorArguments[0].Value as INamedTypeSymbol + : null; + var eventType = attribute.ConstructorArguments.Length >= 2 + ? attribute.ConstructorArguments[1].Value as INamedTypeSymbol + : null; + if (mutationType is null || eventType is null) + { + context.ReportDiagnostic(Diagnostic.Create(MissingTypes, node.Identifier.GetLocation(), type.Name)); + return; + } + + var factoryName = GetNamedString(attribute, "FactoryMethodName") ?? "Create"; + if (!IsIdentifier(factoryName)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidIdentifier, node.Identifier.GetLocation(), type.Name, factoryName)); + return; + } + + var mapperName = GetNamedString(attribute, "MapperMethodName") ?? "Map"; + if (!IsIdentifier(mapperName)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidIdentifier, node.Identifier.GetLocation(), type.Name, mapperName)); + return; + } + + var pipelineName = GetNamedString(attribute, "PipelineName") ?? "change-data-capture"; + context.AddSource( + $"{type.Name}.ChangeDataCapture.g.cs", + SourceText.From(GenerateSource(type, mutationType, eventType, factoryName, mapperName, pipelineName), Encoding.UTF8)); + } + + private static string GenerateSource( + INamedTypeSymbol type, + INamedTypeSymbol mutationType, + INamedTypeSymbol eventType, + string factoryName, + string mapperName, + string pipelineName) + { + var ns = type.ContainingNamespace.IsGlobalNamespace ? null : type.ContainingNamespace.ToDisplayString(); + var mutationTypeName = mutationType.ToDisplayString(TypeFormat); + var eventTypeName = eventType.ToDisplayString(TypeFormat); + var sb = new StringBuilder(); + sb.AppendLine("// "); + sb.AppendLine("#nullable enable"); + sb.AppendLine(); + if (ns is not null) + { + sb.Append("namespace ").Append(ns).AppendLine(";"); + sb.AppendLine(); + } + + var containingTypes = GetContainingTypes(type); + var indentLevel = 0; + foreach (var containingType in containingTypes) + { + AppendTypeDeclaration(sb, containingType, indentLevel); + sb.AppendLine(); + sb.AppendLine(new string(' ', indentLevel * 4) + "{"); + indentLevel++; + } + + AppendTypeDeclaration(sb, type, indentLevel); + sb.AppendLine(); + var indent = new string(' ', indentLevel * 4); + var memberIndent = indent + " "; + var bodyIndent = memberIndent + " "; + sb.AppendLine(indent + "{"); + sb.Append(memberIndent).Append("public static global::PatternKit.Messaging.ChangeDataCapture.ChangeDataCapturePipeline<") + .Append(mutationTypeName).Append(", ").Append(eventTypeName).Append("> ").Append(factoryName).AppendLine("("); + sb.Append(bodyIndent).Append("global::System.Func<").Append(eventTypeName) + .Append(", global::System.Threading.CancellationToken, global::System.Threading.Tasks.ValueTask> publisher,"); + sb.AppendLine(); + sb.Append(bodyIndent).Append("global::PatternKit.Messaging.ChangeDataCapture.IChangeDataCaptureStore<") + .Append(mutationTypeName).Append(", ").Append(eventTypeName).Append(">? store = null)"); + sb.AppendLine(); + sb.AppendLine(memberIndent + "{"); + sb.Append(bodyIndent).Append("var builder = global::PatternKit.Messaging.ChangeDataCapture.ChangeDataCapturePipeline<") + .Append(mutationTypeName).Append(", ").Append(eventTypeName).Append(">.Create(\"") + .Append(Escape(pipelineName)).AppendLine("\")"); + sb.Append(bodyIndent).Append(" .MapWith(").Append(mapperName).AppendLine(")"); + sb.AppendLine(bodyIndent + " .PublishWith(publisher);"); + sb.AppendLine(bodyIndent + "if (store is not null)"); + sb.AppendLine(bodyIndent + " builder.UseStore(store);"); + sb.AppendLine(bodyIndent + "return builder.Build();"); + sb.AppendLine(memberIndent + "}"); + sb.AppendLine(indent + "}"); + for (var i = containingTypes.Length - 1; i >= 0; i--) + sb.AppendLine(new string(' ', i * 4) + "}"); + + return sb.ToString(); + } + + private static INamedTypeSymbol[] GetContainingTypes(INamedTypeSymbol type) + { + var containingTypes = new Stack(); + for (var current = type.ContainingType; current is not null; current = current.ContainingType) + containingTypes.Push(current); + + return containingTypes.ToArray(); + } + + private static bool IsPartial(INamedTypeSymbol type) + => type.DeclaringSyntaxReferences + .Select(static reference => reference.GetSyntax()) + .OfType() + .Any(static declaration => declaration.Modifiers.Any(static modifier => modifier.Text == "partial")); + + private static void AppendTypeDeclaration(StringBuilder sb, INamedTypeSymbol type, int indentLevel) + { + sb.Append(new string(' ', indentLevel * 4)); + sb.Append(GetAccessibility(type.DeclaredAccessibility)).Append(' '); + if (type.IsStatic) + sb.Append("static "); + else if (type.IsAbstract && type.TypeKind == TypeKind.Class) + sb.Append("abstract "); + else if (type.IsSealed && type.TypeKind == TypeKind.Class) + sb.Append("sealed "); + sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ').Append(type.Name); + } + + private static bool IsIdentifier(string value) + => SyntaxFacts.IsValidIdentifier(value) && SyntaxFacts.GetKeywordKind(value) == SyntaxKind.None; + + private static string Escape(string value) => value.Replace("\\", "\\\\").Replace("\"", "\\\""); + + private static string GetAccessibility(Accessibility accessibility) + => accessibility switch + { + Accessibility.Public => "public", + Accessibility.Internal => "internal", + Accessibility.Private => "private", + Accessibility.Protected => "protected", + Accessibility.ProtectedAndInternal => "private protected", + Accessibility.ProtectedOrInternal => "protected internal", + _ => "internal" + }; + + private static string? GetNamedString(AttributeData attribute, string name) + => attribute.NamedArguments.FirstOrDefault(kv => kv.Key == name).Value.Value as string; +} diff --git a/test/PatternKit.Examples.Tests/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemoTests.cs b/test/PatternKit.Examples.Tests/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemoTests.cs new file mode 100644 index 00000000..f6e14108 --- /dev/null +++ b/test/PatternKit.Examples.Tests/ChangeDataCaptureDemo/ProductCatalogChangeDataCaptureDemoTests.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Examples.ChangeDataCaptureDemo; +using PatternKit.Messaging.ChangeDataCapture; +using TinyBDD; + +namespace PatternKit.Examples.Tests.ChangeDataCaptureDemo; + +public sealed class ProductCatalogChangeDataCaptureDemoTests +{ + [Scenario("Product catalog change data capture works through fluent and generated policies")] + [Fact] + public async Task Product_Catalog_Change_Data_Capture_Works_Through_Fluent_And_Generated_Policies() + { + var fluentPublisher = new InMemoryProductChangePublisher(); + var generatedPublisher = new InMemoryProductChangePublisher(); + var fluent = ProductCatalogChangeDataCapturePolicies.CreateFluent( + new InMemoryChangeDataCaptureStore(), + fluentPublisher); + var generated = GeneratedProductCatalogChangeDataCapture.CreateGenerated( + (changed, ct) => generatedPublisher.PublishAsync(changed, ct), + new InMemoryChangeDataCaptureStore()); + + await fluent.CaptureAsync(new("sku-1", "Desk", 125m, 1)); + await generated.CaptureAsync(new("sku-2", "Lamp", 40m, 1)); + var fluentSummary = await fluent.PublishPendingAsync(); + var generatedSummary = await generated.PublishPendingAsync(); + + ScenarioExpect.Equal(new ChangeDataCapturePublishSummary(1, 0), fluentSummary); + ScenarioExpect.Equal(new ChangeDataCapturePublishSummary(1, 0), generatedSummary); + ScenarioExpect.Equal("Desk", ScenarioExpect.Single(fluentPublisher.Published).Name); + ScenarioExpect.Equal("Lamp", ScenarioExpect.Single(generatedPublisher.Published).Name); + } + + [Scenario("Product catalog change data capture is importable through IServiceCollection")] + [Fact] + public async Task Product_Catalog_Change_Data_Capture_Is_Importable_Through_ServiceCollection() + { + using var provider = new ServiceCollection() + .AddProductCatalogChangeDataCaptureDemo() + .BuildServiceProvider(validateScopes: true); + + var service = provider.GetRequiredService(); + var publisher = provider.GetRequiredService(); + + var summary = await service.UpsertAsync(new("sku-1", "Desk", 125m, 1)); + + ScenarioExpect.Equal(new ChangeDataCapturePublishSummary(1, 0), summary); + var changed = ScenarioExpect.Single(publisher.Published); + ScenarioExpect.Equal("sku-1", changed.Sku); + ScenarioExpect.Equal(1, changed.Sequence); + } +} diff --git a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs index 3521ba0e..05819705 100644 --- a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs +++ b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs @@ -107,7 +107,7 @@ public Task Published_Benchmark_Results_Include_Every_Catalog_Pattern() .Then("every catalog pattern appears in the benchmark results matrix", ctx => ScenarioExpect.Empty(ctx.MissingPatterns)) .And("the guide publishes the route result total", ctx => - ScenarioExpect.Contains("468 pattern route results", ctx.ResultsGuide)) + ScenarioExpect.Contains("472 pattern route results", ctx.ResultsGuide)) .AssertPassed(); [Scenario("Published benchmark results include reusable hosting integrations")] diff --git a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs index e5561d69..8a8e67dd 100644 --- a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs +++ b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs @@ -78,6 +78,7 @@ public sealed class PatternKitPatternCatalogTests(ITestOutputHelper output) : Ti "Saga / Process Manager", "Mailbox", "Backpressure", + "Change Data Capture", "Idempotent Receiver", "Inbox", "Outbox", @@ -172,7 +173,7 @@ public Task Catalog_Includes_Enterprise_Integration_And_Architecture_Patterns() ScenarioExpect.Equal(EnterprisePatternAdditions.OrderBy(static x => x), patterns.Select(static p => p.Name).OrderBy(static x => x))) .And("enterprise entries are grouped by integration reliability and architecture families", patterns => { - ScenarioExpect.Equal(41, patterns.Count(static p => p.Family == PatternFamily.EnterpriseIntegration)); + ScenarioExpect.Equal(42, patterns.Count(static p => p.Family == PatternFamily.EnterpriseIntegration)); ScenarioExpect.Equal(4, patterns.Count(static p => p.Family == PatternFamily.MessagingReliability)); ScenarioExpect.Equal(20, patterns.Count(static p => p.Family == PatternFamily.CloudArchitecture)); ScenarioExpect.Equal(27, patterns.Count(static p => p.Family == PatternFamily.ApplicationArchitecture)); diff --git a/test/PatternKit.Generators.Tests/ChangeDataCaptureGeneratorTests.cs b/test/PatternKit.Generators.Tests/ChangeDataCaptureGeneratorTests.cs new file mode 100644 index 00000000..8d2e56ba --- /dev/null +++ b/test/PatternKit.Generators.Tests/ChangeDataCaptureGeneratorTests.cs @@ -0,0 +1,164 @@ +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using PatternKit.Generators.ChangeDataCapture; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Generators.Tests; + +[Feature("Change Data Capture generator")] +public sealed partial class ChangeDataCaptureGeneratorTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + [Scenario("Generates change data capture factory")] + [Fact] + public Task Generates_Change_Data_Capture_Factory() + => Given("a configured CDC declaration", () => Compile(""" + using PatternKit.Generators.ChangeDataCapture; + using System.Threading; + using System.Threading.Tasks; + namespace Demo; + + public sealed record Mutation(string Sku, int Quantity); + public sealed record Changed(long Sequence, string Sku, int Quantity); + + [GenerateChangeDataCapture(typeof(Mutation), typeof(Changed), FactoryMethodName = "Build", MapperMethodName = "MapMutation", PipelineName = "inventory-cdc")] + public static partial class InventoryCdc + { + public static Changed MapMutation(Mutation mutation, long sequence) => new(sequence, mutation.Sku, mutation.Quantity); + } + """)) + .Then("generated source creates the configured pipeline", result => + { + ScenarioExpect.Empty(result.Diagnostics); + var source = ScenarioExpect.Single(result.GeneratedSources); + ScenarioExpect.Equal("InventoryCdc.ChangeDataCapture.g.cs", source.HintName); + ScenarioExpect.Contains("Build(", source.Source); + ScenarioExpect.Contains("Create(\"inventory-cdc\")", source.Source); + ScenarioExpect.Contains(".MapWith(MapMutation)", source.Source); + ScenarioExpect.True(result.EmitSuccess, string.Join(Environment.NewLine, result.EmitDiagnostics)); + }) + .AssertPassed(); + + [Scenario("Generates change data capture factory for nested hosts with defaults")] + [Fact] + public Task Generates_Change_Data_Capture_Factory_For_Nested_Hosts_With_Defaults() + => Given("a nested CDC declaration with default configuration", () => Compile(""" + using PatternKit.Generators.ChangeDataCapture; + + public sealed record Mutation(string Sku); + public sealed record Changed(string Sku); + + public abstract partial class Outer + { + [GenerateChangeDataCapture(typeof(Mutation), typeof(Changed))] + internal sealed partial class CdcHost + { + public static Changed Map(Mutation mutation, long sequence) => new(mutation.Sku); + } + } + """)) + .Then("generated source preserves the nested partial shape and default names", result => + { + ScenarioExpect.Empty(result.Diagnostics); + var source = ScenarioExpect.Single(result.GeneratedSources); + ScenarioExpect.Contains("public abstract partial class Outer", source.Source); + ScenarioExpect.Contains("internal sealed partial class CdcHost", source.Source); + ScenarioExpect.Contains("Create(\"change-data-capture\")", source.Source); + ScenarioExpect.Contains("Create(", source.Source); + ScenarioExpect.True(result.EmitSuccess, string.Join(Environment.NewLine, result.EmitDiagnostics)); + }) + .AssertPassed(); + + [Scenario("Reports diagnostics for invalid change data capture declarations")] + [Theory] + [InlineData("public static class CdcHost { }", "PKCDC001")] + [InlineData("public static partial class CdcHost { }", "PKCDC002", "FactoryMethodName = \"class\"")] + [InlineData("public static partial class CdcHost { }", "PKCDC002", "MapperMethodName = \"1bad\"")] + public Task Reports_Diagnostics_For_Invalid_Change_Data_Capture_Declarations(string declaration, string diagnosticId, string configuration = "") + => Given("an invalid CDC declaration", () => Compile($$""" + using PatternKit.Generators.ChangeDataCapture; + public sealed record Mutation(string Sku); + public sealed record Changed(string Sku); + [GenerateChangeDataCapture(typeof(Mutation), typeof(Changed){{(string.IsNullOrWhiteSpace(configuration) ? "" : ", " + configuration)}})] + {{declaration}} + """)) + .Then("the expected diagnostic is reported", result => + ScenarioExpect.Contains(result.Diagnostics, diagnostic => diagnostic.Id == diagnosticId)) + .AssertPassed(); + + [Scenario("Reports diagnostic when change data capture host has a non-partial containing type")] + [Fact] + public Task Reports_Diagnostic_When_Change_Data_Capture_Host_Has_A_Non_Partial_Containing_Type() + => Given("a nested CDC host inside a non-partial container", () => Compile(""" + using PatternKit.Generators.ChangeDataCapture; + public sealed record Mutation(string Sku); + public sealed record Changed(string Sku); + + public static class Outer + { + [GenerateChangeDataCapture(typeof(Mutation), typeof(Changed))] + public static partial class CdcHost + { + public static Changed Map(Mutation mutation, long sequence) => new(mutation.Sku); + } + } + """)) + .Then("the containing type diagnostic is reported", result => + ScenarioExpect.Contains(result.Diagnostics, diagnostic => diagnostic.Id == "PKCDC004")) + .AssertPassed(); + + [Scenario("Change data capture attribute exposes generator configuration")] + [Fact] + public void Change_Data_Capture_Attribute_Exposes_Generator_Configuration() + { + var attribute = new GenerateChangeDataCaptureAttribute(typeof(string), typeof(int)) + { + FactoryMethodName = "CreateCdc", + MapperMethodName = "MapCdc", + PipelineName = "cdc" + }; + + ScenarioExpect.Equal(typeof(string), attribute.MutationType); + ScenarioExpect.Equal(typeof(int), attribute.EventType); + ScenarioExpect.Equal("CreateCdc", attribute.FactoryMethodName); + ScenarioExpect.Equal("MapCdc", attribute.MapperMethodName); + ScenarioExpect.Equal("cdc", attribute.PipelineName); + } + + private static GeneratorResult Compile(string source) + { + var compilation = CreateCompilation(source, "ChangeDataCaptureGeneratorTests"); + _ = RoslynTestHelpers.Run(compilation, new ChangeDataCaptureGenerator(), out var run, out var updated); + var result = run.Results.Single(); + var emit = updated.Emit(Stream.Null); + return new( + result.Diagnostics.ToArray(), + result.GeneratedSources.Select(static source => new GeneratedSource(source.HintName, source.SourceText.ToString())).ToArray(), + emit.Success, + emit.Diagnostics.Select(static diagnostic => diagnostic.ToString()).ToArray()); + } + + private static CSharpCompilation CreateCompilation(string source, string assemblyName) + => RoslynTestHelpers.CreateCompilation( + source, + assemblyName, + extra: + [ + MetadataReference.CreateFromFile(GetAbstractionsAssemblyPath()), + MetadataReference.CreateFromFile(typeof(PatternKit.Messaging.ChangeDataCapture.ChangeDataCapturePipeline<,>).Assembly.Location) + ]); + + private static string GetAbstractionsAssemblyPath() + => Path.Combine( + Path.GetDirectoryName(typeof(ChangeDataCaptureGenerator).Assembly.Location)!, + "PatternKit.Generators.Abstractions.dll"); + + private sealed record GeneratorResult( + IReadOnlyList Diagnostics, + IReadOnlyList GeneratedSources, + bool EmitSuccess, + IReadOnlyList EmitDiagnostics); + + private sealed record GeneratedSource(string HintName, string Source); +} diff --git a/test/PatternKit.Tests/Messaging/ChangeDataCapture/ChangeDataCapturePipelineTests.cs b/test/PatternKit.Tests/Messaging/ChangeDataCapture/ChangeDataCapturePipelineTests.cs new file mode 100644 index 00000000..3d0a3dc5 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/ChangeDataCapture/ChangeDataCapturePipelineTests.cs @@ -0,0 +1,177 @@ +using PatternKit.Messaging.ChangeDataCapture; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.ChangeDataCapture; + +public sealed class ChangeDataCapturePipelineTests +{ + [Scenario("Change Data Capture exposes mutation entry and summary value contracts")] + [Fact] + public void Change_Data_Capture_Exposes_Mutation_Entry_And_Summary_Value_Contracts() + { + var occurredAt = new DateTimeOffset(2026, 1, 2, 3, 4, 5, TimeSpan.Zero); + var mutation = new ChangeDataCaptureMutation( + "sku-1", + ChangeDataCaptureOperation.Upsert, + "InventoryItem", + new("sku-1", 5), + 42, + occurredAt); + var entry = new ChangeDataCaptureEntry, InventoryChanged>( + 7, + "inventory-cdc", + mutation, + new(7, "sku-1", 5), + occurredAt); + var published = entry.MarkPublished(occurredAt.AddMinutes(1)); + var attempted = entry.MarkAttempt(); + var summary = new ChangeDataCapturePublishSummary(1, 0); + + ScenarioExpect.Equal("sku-1", mutation.Key); + ScenarioExpect.Equal(ChangeDataCaptureOperation.Upsert, mutation.Operation); + ScenarioExpect.Equal("InventoryItem", mutation.EntityName); + ScenarioExpect.Equal(42, mutation.Version); + ScenarioExpect.Equal(occurredAt, mutation.OccurredAt); + ScenarioExpect.Equal("sku-1", mutation.Payload.Sku); + ScenarioExpect.Equal(7, entry.Sequence); + ScenarioExpect.Equal("inventory-cdc", entry.PipelineName); + ScenarioExpect.Same(mutation, entry.Mutation); + ScenarioExpect.Equal("sku-1", entry.Event.Sku); + ScenarioExpect.Equal(occurredAt, entry.CapturedAt); + ScenarioExpect.True(published.Published); + ScenarioExpect.Equal(occurredAt.AddMinutes(1), published.PublishedAt); + ScenarioExpect.Equal(1, published.Attempts); + ScenarioExpect.Equal(1, attempted.Attempts); + ScenarioExpect.Equal(1, summary.Published); + ScenarioExpect.Equal(0, summary.Failed); + ScenarioExpect.True(summary.Equals(new ChangeDataCapturePublishSummary(1, 0))); + ScenarioExpect.True(summary.Equals((object)new ChangeDataCapturePublishSummary(1, 0))); + ScenarioExpect.True(summary == new ChangeDataCapturePublishSummary(1, 0)); + ScenarioExpect.True(summary != new ChangeDataCapturePublishSummary(0, 1)); + ScenarioExpect.NotEqual(0, summary.GetHashCode()); + ScenarioExpect.Throws(() => new ChangeDataCaptureMutation("sku", ChangeDataCaptureOperation.Insert, " ", new("sku", 1), 1, occurredAt)); + ScenarioExpect.Throws(() => new ChangeDataCaptureEntry(0, "inventory", new("sku", 1), new(1, "sku", 1), occurredAt)); + ScenarioExpect.Throws(() => new ChangeDataCaptureEntry(1, "", new("sku", 1), new(1, "sku", 1), occurredAt)); + } + + [Scenario("Change Data Capture captures and publishes mutations in order")] + [Fact] + public async Task Change_Data_Capture_Captures_And_Publishes_Mutations_In_Order() + { + var published = new List(); + var store = new InMemoryChangeDataCaptureStore(); + var pipeline = ChangeDataCapturePipeline.Create("inventory-cdc") + .UseStore(store) + .MapWith(static (mutation, sequence) => new InventoryChanged(sequence, mutation.Sku, mutation.Quantity)) + .PublishWith((@event, _) => + { + published.Add(@event); + return default; + }) + .Build(); + + var first = await pipeline.CaptureAsync(new("sku-1", 5)); + var second = await pipeline.CaptureAsync(new("sku-2", 9)); + var summary = await pipeline.PublishPendingAsync(); + + ScenarioExpect.Equal(1, first.Sequence); + ScenarioExpect.Equal(2, second.Sequence); + ScenarioExpect.Equal([1L, 2L], published.Select(static e => e.Sequence).ToArray()); + ScenarioExpect.Equal(new ChangeDataCapturePublishSummary(2, 0), summary); + ScenarioExpect.True(store.Snapshot().All(static entry => entry.Published)); + } + + [Scenario("Change Data Capture leaves failed publications pending")] + [Fact] + public async Task Change_Data_Capture_Leaves_Failed_Publications_Pending() + { + var pipeline = ChangeDataCapturePipeline.Create("inventory-cdc") + .MapWith(static (mutation, sequence) => new InventoryChanged(sequence, mutation.Sku, mutation.Quantity)) + .PublishWith((_, _) => throw new InvalidOperationException("broker unavailable")) + .Build(); + + await pipeline.CaptureAsync(new("sku-1", 5)); + var summary = await pipeline.PublishPendingAsync(); + var pending = await pipeline.ReadPendingAsync(); + + ScenarioExpect.Equal(new ChangeDataCapturePublishSummary(0, 1), summary); + var entry = ScenarioExpect.Single(pending); + ScenarioExpect.False(entry.Published); + ScenarioExpect.Equal(1, entry.Attempts); + } + + [Scenario("Change Data Capture validates configuration and store operations")] + [Fact] + public async Task Change_Data_Capture_Validates_Configuration_And_Store_Operations() + { + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create("").MapWith(Map).PublishWith(Publish).Build()); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().MapWith(null!)); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().PublishWith(null!)); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().UseStore(null!)); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().WithClock(null!)); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().PublishWith(Publish).Build()); + ScenarioExpect.Throws(() => ChangeDataCapturePipeline.Create().MapWith(Map).Build()); + + var store = new InMemoryChangeDataCaptureStore(); + await ScenarioExpect.ThrowsAsync(() => store.GetNextSequenceAsync("").AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.GetNextSequenceAsync("inventory", new CancellationToken(true)).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.AppendAsync("", 1, new("sku", 1), new(1, "sku", 1), DateTimeOffset.UtcNow).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.ReadPendingAsync(" ").AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.ReadPendingAsync("inventory", new CancellationToken(true)).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.AppendAsync("inventory", 0, new("sku", 1), new(1, "sku", 1), DateTimeOffset.UtcNow).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.AppendAsync("inventory", 1, new("sku", 1), new(1, "sku", 1), DateTimeOffset.UtcNow, new CancellationToken(true)).AsTask()); + _ = await store.AppendAsync("inventory", 1, new("sku", 1), new(1, "sku", 1), DateTimeOffset.UtcNow); + await ScenarioExpect.ThrowsAsync(() => store.AppendAsync("inventory", 1, new("sku", 1), new(1, "sku", 1), DateTimeOffset.UtcNow).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.MarkPublishedAsync(1, DateTimeOffset.UtcNow, new CancellationToken(true)).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.MarkAttemptAsync(1, new CancellationToken(true)).AsTask()); + await ScenarioExpect.ThrowsAsync(() => store.MarkAttemptAsync(404).AsTask()); + await store.MarkPublishedAsync(1, DateTimeOffset.UtcNow); + + static InventoryChanged Map(InventoryMutation mutation, long sequence) => new(sequence, mutation.Sku, mutation.Quantity); + static ValueTask Publish(InventoryChanged _, CancellationToken __) => default; + } + + [Scenario("Change Data Capture propagates publisher cancellation")] + [Fact] + public async Task Change_Data_Capture_Propagates_Publisher_Cancellation() + { + using var cancellation = new CancellationTokenSource(); + var pipeline = ChangeDataCapturePipeline.Create("inventory-cdc") + .MapWith(static (mutation, sequence) => new InventoryChanged(sequence, mutation.Sku, mutation.Quantity)) + .PublishWith((_, token) => + { + cancellation.Cancel(); + throw new OperationCanceledException(token); + }) + .Build(); + + await pipeline.CaptureAsync(new("sku-1", 5)); + + await ScenarioExpect.ThrowsAsync(() => pipeline.PublishPendingAsync(cancellation.Token).AsTask()); + var pending = await pipeline.ReadPendingAsync(); + var entry = ScenarioExpect.Single(pending); + ScenarioExpect.Equal(0, entry.Attempts); + } + + [Scenario("Change Data Capture validates capture arguments and custom clock")] + [Fact] + public async Task Change_Data_Capture_Validates_Capture_Arguments_And_Custom_Clock() + { + var capturedAt = new DateTimeOffset(2026, 2, 3, 4, 5, 6, TimeSpan.Zero); + var pipeline = ChangeDataCapturePipeline.Create("inventory-cdc") + .MapWith(static (mutation, sequence) => new InventoryChanged(sequence, mutation.Sku, mutation.Quantity)) + .PublishWith(static (_, _) => default) + .WithClock(() => capturedAt) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => pipeline.CaptureAsync(null!).AsTask()); + await ScenarioExpect.ThrowsAsync(() => pipeline.CaptureAsync(new("sku-1", 5), new CancellationToken(true)).AsTask()); + var entry = await pipeline.CaptureAsync(new("sku-1", 5)); + + ScenarioExpect.Equal("inventory-cdc", pipeline.Name); + ScenarioExpect.Equal(capturedAt, entry.CapturedAt); + } + + private sealed record InventoryMutation(string Sku, int Quantity); + private sealed record InventoryChanged(long Sequence, string Sku, int Quantity); +}