using Insight.Database; using System.Data; using PlanTempus.Core.Database.ConnectionFactory; namespace PlanTempus.Database.Core.DDL; /// /// Sets up the outbox table for reliable message delivery (transactional outbox pattern). /// Messages are inserted in the same transaction as the business operation, /// then processed asynchronously by a background worker. /// public class SetupOutbox(IDbConnectionFactory connectionFactory) : IDbConfigure { public class Command { public required string Schema { get; init; } } private Command _command; public void With(Command command, ConnectionStringParameters parameters = null) { _command = command; using var conn = parameters is null ? connectionFactory.Create() : connectionFactory.Create(parameters); using var transaction = conn.OpenWithTransaction(); try { CreateOutboxTable(conn); CreateOutboxIndexes(conn); CreateNotifyTrigger(conn); transaction.Commit(); } catch (Exception ex) { transaction.Rollback(); throw new InvalidOperationException("Failed to SetupOutbox. Transaction is rolled back", ex); } } /// /// Creates the outbox table for storing pending messages /// void CreateOutboxTable(IDbConnection db) { var sql = @$" CREATE TABLE IF NOT EXISTS {_command.Schema}.outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), type VARCHAR(50) NOT NULL, payload JSONB NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, processed_at TIMESTAMPTZ NULL, retry_count INT NOT NULL DEFAULT 0, error_message TEXT NULL, CONSTRAINT chk_outbox_status CHECK (status IN ('pending', 'processing', 'sent', 'failed')) ); COMMENT ON TABLE {_command.Schema}.outbox IS 'Transactional outbox for reliable message delivery'; COMMENT ON COLUMN {_command.Schema}.outbox.type IS 'Message type (e.g. verification_email, welcome_email)'; COMMENT ON COLUMN {_command.Schema}.outbox.payload IS 'JSON payload with message-specific data'; COMMENT ON COLUMN {_command.Schema}.outbox.status IS 'pending -> processing -> sent/failed'; "; db.ExecuteSql(sql); } /// /// Creates indexes for efficient polling of pending messages /// void CreateOutboxIndexes(IDbConnection db) { var sql = @$" CREATE INDEX IF NOT EXISTS idx_outbox_pending ON {_command.Schema}.outbox(created_at) WHERE status = 'pending'; CREATE INDEX IF NOT EXISTS idx_outbox_failed_retry ON {_command.Schema}.outbox(created_at) WHERE status = 'failed' AND retry_count < 5; "; db.ExecuteSql(sql); } /// /// Creates a trigger that sends a NOTIFY when new messages are inserted /// void CreateNotifyTrigger(IDbConnection db) { var sql = @$" CREATE OR REPLACE FUNCTION {_command.Schema}.notify_outbox_insert() RETURNS TRIGGER AS $$ BEGIN PERFORM pg_notify('outbox_messages', json_build_object( 'id', NEW.id, 'type', NEW.type )::text); RETURN NEW; END; $$ LANGUAGE plpgsql; DROP TRIGGER IF EXISTS trg_outbox_notify ON {_command.Schema}.outbox; CREATE TRIGGER trg_outbox_notify AFTER INSERT ON {_command.Schema}.outbox FOR EACH ROW EXECUTE FUNCTION {_command.Schema}.notify_outbox_insert(); "; db.ExecuteSql(sql); } }