WIP
This commit is contained in:
parent
54b057886c
commit
7fc1ae0650
204 changed files with 4345 additions and 134 deletions
|
|
@ -1,115 +0,0 @@
|
|||
using Insight.Database;
|
||||
using System.Data;
|
||||
using PlanTempus.Core.Database.ConnectionFactory;
|
||||
|
||||
namespace PlanTempus.Database.Core.DDL;
|
||||
|
||||
/// <summary>
|
||||
/// Sets up the outbox table for reliable message delivery (transactional outbox pattern).
|
||||
/// Messages are inserted in the same transaction as the business operation,
|
||||
/// then processed asynchronously by a background worker.
|
||||
/// </summary>
|
||||
public class SetupOutbox(IDbConnectionFactory connectionFactory) : IDbConfigure<SetupOutbox.Command>
|
||||
{
|
||||
public class Command
|
||||
{
|
||||
public required string Schema { get; init; }
|
||||
}
|
||||
|
||||
private Command _command;
|
||||
|
||||
public void With(Command command, ConnectionStringParameters parameters = null)
|
||||
{
|
||||
_command = command;
|
||||
|
||||
using var conn = parameters is null ? connectionFactory.Create() : connectionFactory.Create(parameters);
|
||||
using var transaction = conn.OpenWithTransaction();
|
||||
|
||||
try
|
||||
{
|
||||
CreateOutboxTable(conn);
|
||||
CreateOutboxIndexes(conn);
|
||||
CreateNotifyTrigger(conn);
|
||||
|
||||
transaction.Commit();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
transaction.Rollback();
|
||||
throw new InvalidOperationException("Failed to SetupOutbox. Transaction is rolled back", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates the outbox table for storing pending messages
|
||||
/// </summary>
|
||||
void CreateOutboxTable(IDbConnection db)
|
||||
{
|
||||
var sql = @$"
|
||||
CREATE TABLE IF NOT EXISTS {_command.Schema}.outbox (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
type VARCHAR(50) NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
processed_at TIMESTAMPTZ NULL,
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
error_message TEXT NULL,
|
||||
CONSTRAINT chk_outbox_status CHECK (status IN ('pending', 'processing', 'sent', 'failed'))
|
||||
);
|
||||
|
||||
COMMENT ON TABLE {_command.Schema}.outbox IS 'Transactional outbox for reliable message delivery';
|
||||
COMMENT ON COLUMN {_command.Schema}.outbox.type IS 'Message type (e.g. verification_email, welcome_email)';
|
||||
COMMENT ON COLUMN {_command.Schema}.outbox.payload IS 'JSON payload with message-specific data';
|
||||
COMMENT ON COLUMN {_command.Schema}.outbox.status IS 'pending -> processing -> sent/failed';
|
||||
";
|
||||
|
||||
db.ExecuteSql(sql);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates indexes for efficient polling of pending messages
|
||||
/// </summary>
|
||||
void CreateOutboxIndexes(IDbConnection db)
|
||||
{
|
||||
var sql = @$"
|
||||
CREATE INDEX IF NOT EXISTS idx_outbox_pending
|
||||
ON {_command.Schema}.outbox(created_at)
|
||||
WHERE status = 'pending';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_outbox_failed_retry
|
||||
ON {_command.Schema}.outbox(created_at)
|
||||
WHERE status = 'failed' AND retry_count < 5;
|
||||
";
|
||||
|
||||
db.ExecuteSql(sql);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a trigger that sends a NOTIFY when new messages are inserted
|
||||
/// </summary>
|
||||
void CreateNotifyTrigger(IDbConnection db)
|
||||
{
|
||||
var sql = @$"
|
||||
CREATE OR REPLACE FUNCTION {_command.Schema}.notify_outbox_insert()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('outbox_messages', json_build_object(
|
||||
'id', NEW.id,
|
||||
'type', NEW.type
|
||||
)::text);
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
DROP TRIGGER IF EXISTS trg_outbox_notify ON {_command.Schema}.outbox;
|
||||
|
||||
CREATE TRIGGER trg_outbox_notify
|
||||
AFTER INSERT ON {_command.Schema}.outbox
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION {_command.Schema}.notify_outbox_insert();
|
||||
";
|
||||
|
||||
db.ExecuteSql(sql);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue