PlanTempusApp/Core/Outbox/OutboxService.cs

105 lines
2.7 KiB
C#
Raw Normal View History

#nullable enable
using System.Data;
using System.Text.Json;
using Insight.Database;
using PlanTempus.Core.Database;
namespace PlanTempus.Core.Outbox;
public class OutboxService(IDatabaseOperations databaseOperations) : IOutboxService
{
public async Task EnqueueAsync(string type, object payload, IDbConnection? connection = null, IDbTransaction? transaction = null)
{
var sql = @"
INSERT INTO system.outbox (type, payload)
VALUES (@Type, @Payload::jsonb)";
var parameters = new
{
Type = type,
Payload = JsonSerializer.Serialize(payload)
};
if (connection != null)
{
await connection.ExecuteSqlAsync(sql, parameters);
}
else
{
using var db = databaseOperations.CreateScope(nameof(OutboxService));
await db.Connection.ExecuteSqlAsync(sql, parameters);
}
}
public async Task<List<OutboxMessage>> GetPendingAsync(int batchSize = 10)
{
using var db = databaseOperations.CreateScope(nameof(OutboxService));
var sql = @"
UPDATE system.outbox
SET status = 'processing'
WHERE id IN (
SELECT id FROM system.outbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT @BatchSize
FOR UPDATE SKIP LOCKED
)
RETURNING id, type, payload, status, created_at, processed_at, retry_count, error_message";
var results = await db.Connection.QuerySqlAsync<OutboxMessageDto>(sql, new { BatchSize = batchSize });
return results.Select(r => new OutboxMessage
{
Id = r.Id,
Type = r.Type,
Payload = JsonSerializer.Deserialize<JsonElement>(r.Payload),
Status = r.Status,
CreatedAt = r.CreatedAt,
ProcessedAt = r.ProcessedAt,
RetryCount = r.RetryCount,
ErrorMessage = r.ErrorMessage
}).ToList();
}
public async Task MarkAsSentAsync(Guid id)
{
using var db = databaseOperations.CreateScope(nameof(OutboxService));
var sql = @"
UPDATE system.outbox
SET status = 'sent', processed_at = NOW()
WHERE id = @Id";
await db.Connection.ExecuteSqlAsync(sql, new { Id = id });
}
public async Task MarkAsFailedAsync(Guid id, string errorMessage)
{
using var db = databaseOperations.CreateScope(nameof(OutboxService));
var sql = @"
UPDATE system.outbox
SET status = 'failed',
processed_at = NOW(),
retry_count = retry_count + 1,
error_message = @ErrorMessage
WHERE id = @Id";
await db.Connection.ExecuteSqlAsync(sql, new { Id = id, ErrorMessage = errorMessage });
}
private class OutboxMessageDto
{
public Guid Id { get; set; }
public string Type { get; set; } = "";
public string Payload { get; set; } = "";
public string Status { get; set; } = "";
public DateTime CreatedAt { get; set; }
public DateTime? ProcessedAt { get; set; }
public int RetryCount { get; set; }
public string? ErrorMessage { get; set; }
}
}