#nullable enable using System.Data; using System.Text.Json; using Insight.Database; using PlanTempus.Core.Database; namespace PlanTempus.Core.Outbox; public class OutboxService(IDatabaseOperations databaseOperations) : IOutboxService { public async Task EnqueueAsync(string type, object payload, IDbConnection? connection = null, IDbTransaction? transaction = null) { var sql = @" INSERT INTO system.outbox (type, payload) VALUES (@Type, @Payload::jsonb)"; var parameters = new { Type = type, Payload = JsonSerializer.Serialize(payload) }; if (connection != null) { await connection.ExecuteSqlAsync(sql, parameters); } else { using var db = databaseOperations.CreateScope(nameof(OutboxService)); await db.Connection.ExecuteSqlAsync(sql, parameters); } } public async Task> GetPendingAsync(int batchSize = 10) { using var db = databaseOperations.CreateScope(nameof(OutboxService)); var sql = @" UPDATE system.outbox SET status = 'processing' WHERE id IN ( SELECT id FROM system.outbox WHERE status = 'pending' ORDER BY created_at LIMIT @BatchSize FOR UPDATE SKIP LOCKED ) RETURNING id, type, payload, status, created_at, processed_at, retry_count, error_message"; var results = await db.Connection.QuerySqlAsync(sql, new { BatchSize = batchSize }); return results.Select(r => new OutboxMessage { Id = r.Id, Type = r.Type, Payload = JsonSerializer.Deserialize(r.Payload), Status = r.Status, CreatedAt = r.CreatedAt, ProcessedAt = r.ProcessedAt, RetryCount = r.RetryCount, ErrorMessage = r.ErrorMessage }).ToList(); } public async Task MarkAsSentAsync(Guid id) { using var db = databaseOperations.CreateScope(nameof(OutboxService)); var sql = @" UPDATE system.outbox SET status = 'sent', processed_at = NOW() WHERE id = @Id"; await db.Connection.ExecuteSqlAsync(sql, new { Id = id }); } public async Task MarkAsFailedAsync(Guid id, string errorMessage) { using var db = databaseOperations.CreateScope(nameof(OutboxService)); var sql = @" UPDATE system.outbox SET status = 'failed', processed_at = NOW(), retry_count = retry_count + 1, error_message = @ErrorMessage WHERE id = @Id"; await db.Connection.ExecuteSqlAsync(sql, new { Id = id, ErrorMessage = errorMessage }); } private class OutboxMessageDto { public Guid Id { get; set; } public string Type { get; set; } = ""; public string Payload { get; set; } = ""; public string Status { get; set; } = ""; public DateTime CreatedAt { get; set; } public DateTime? ProcessedAt { get; set; } public int RetryCount { get; set; } public string? ErrorMessage { get; set; } } }