MessageChannel work for Seq Logging

This commit is contained in:
Janus Knudsen 2025-02-14 17:45:49 +01:00
parent e777135d62
commit bf50563ab7
6 changed files with 119 additions and 84 deletions

View file

@ -21,6 +21,7 @@
<ItemGroup> <ItemGroup>
<Folder Include="Configurations\AzureAppConfigurationProvider\" /> <Folder Include="Configurations\AzureAppConfigurationProvider\" />
<Folder Include="Configurations\PostgresqlConfigurationBuilder\" /> <Folder Include="Configurations\PostgresqlConfigurationBuilder\" />
<Folder Include="Logging\" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View file

@ -9,7 +9,7 @@ namespace Core.ModuleRegistry
{ {
builder.RegisterType<MessageChannel>() builder.RegisterType<MessageChannel>()
.As<IMessageChannel>() .As<IMessageChannel<Microsoft.ApplicationInsights.Channel.ITelemetry>>()
.SingleInstance(); .SingleInstance();
builder.RegisterType<SeqBackgroundService>() builder.RegisterType<SeqBackgroundService>()

View file

@ -1,9 +1,9 @@
using System.Threading.Channels; using System.Threading.Channels;
namespace Core.Telemetry namespace Core.Telemetry
{ {
public interface IMessageChannel : IDisposable public interface IMessageChannel<T> : IDisposable
{ {
ChannelWriter<HttpRequestMessage> Writer { get; } ChannelWriter<T> Writer { get; }
ChannelReader<HttpRequestMessage> Reader { get; } ChannelReader<T> Reader { get; }
} }
} }

View file

@ -1,22 +1,23 @@
using System.Threading.Channels; using Microsoft.ApplicationInsights.Channel;
using System.Threading.Channels;
namespace Core.Telemetry namespace Core.Telemetry
{ {
public class MessageChannel : IMessageChannel public class MessageChannel : IMessageChannel<ITelemetry>
{ {
private readonly Channel<HttpRequestMessage> _channel; private readonly Channel<ITelemetry> _channel;
public MessageChannel() public MessageChannel()
{ {
_channel = Channel.CreateUnbounded<HttpRequestMessage>(); _channel = Channel.CreateUnbounded<ITelemetry>();
} }
public ChannelWriter<HttpRequestMessage> Writer => _channel.Writer; public ChannelWriter<ITelemetry> Writer => _channel.Writer;
public ChannelReader<HttpRequestMessage> Reader => _channel.Reader; public ChannelReader<ITelemetry> Reader => _channel.Reader;
public void Dispose() public void Dispose()
{ {
_channel.Writer.Complete(); _channel.Writer.Complete();
} }
} }
} }

View file

@ -1,64 +1,101 @@
using Microsoft.ApplicationInsights; using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using System.Net.Http.Headers;
using System.Text;
namespace Core.Telemetry namespace Core.Telemetry
{ {
public class SeqBackgroundService : BackgroundService public class SeqBackgroundService : BackgroundService
{ {
private readonly IMessageChannel _messageChannel; private readonly IMessageChannel<ITelemetry> _messageChannel;
private readonly TelemetryClient _telemetryClient; private readonly TelemetryClient _telemetryClient;
private readonly HttpClient _httpClient; private readonly HttpClient _httpClient;
public SeqBackgroundService( public SeqBackgroundService(TelemetryClient telemetryClient,
TelemetryClient telemetryClient, IMessageChannel<ITelemetry> messageChannel,
IMessageChannel messageChannel, HttpClient httpClient)
HttpClient httpClient) {
{ _telemetryClient = telemetryClient;
_telemetryClient = telemetryClient; _messageChannel = messageChannel;
_messageChannel = messageChannel; _httpClient = httpClient;
_httpClient = httpClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken) _httpClient = new HttpClient()
{ {
try BaseAddress = new Uri("http://localhost:5341"),
{ Timeout = TimeSpan.FromSeconds(30)
while (!stoppingToken.IsCancellationRequested) };
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
try _httpClient.DefaultRequestHeaders.Accept.Clear();
{ _httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
//using var response = await _httpClient.SendAsync(message, stoppingToken);
//if (!response.IsSuccessStatusCode)
//{
// _telemetryClient.TrackTrace($"HTTP kald fejlede med status {response.StatusCode}", Microsoft.ApplicationInsights.DataContracts.SeverityLevel.Warning);
// continue;
//}
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
}
}
}
catch (Exception ex)
{
if (ex is not OperationCanceledException)
{
_telemetryClient.TrackException(ex);
throw;
}
_telemetryClient.TrackTrace("Service shutdown påbegyndt");
}
}
public override async Task StopAsync(CancellationToken cancellationToken) }
{
_messageChannel.Dispose(); protected override async Task ExecuteAsync(CancellationToken stoppingToken)
await base.StopAsync(cancellationToken); {
} try
} {
while (!stoppingToken.IsCancellationRequested)
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
try
{
var eventTelemetry = message as Microsoft.ApplicationInsights.DataContracts.EventTelemetry;
var level = "Information";
var seqEvent = new Dictionary<string, object>
{
{ "@t", DateTime.UtcNow.ToString("o") },
{ "@mt", eventTelemetry.Name },
{ "@l", level } // "Information", "Warning", "Error", etc.
};
foreach (var prop in eventTelemetry.Context.GlobalProperties)
{
seqEvent.Add(prop.Key, prop.Value);
}
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(seqEvent), Encoding.UTF8, "application/vnd.serilog.clef");
var key = "4XhWFtY4jJ0NBgohBAFF"; ;
//Gt8hS9ClGNfOCAdswDlW
var requestMessage = new HttpRequestMessage(HttpMethod.Post, $"/ingest/clef?apiKey={key}");
requestMessage.Content = content;
var response = await _httpClient.SendAsync(requestMessage, stoppingToken);
response.EnsureSuccessStatusCode();
//if (!response.IsSuccessStatusCode)
//{
// _telemetryClient.TrackTrace($"HTTP kald fejlede med status {response.StatusCode}", Microsoft.ApplicationInsights.DataContracts.SeverityLevel.Warning);
// continue;
//}
}
catch (Exception ex)
{
//_telemetryClient.TrackException(ex); this is disabled for now, we need to think about the channel structure first
}
}
}
catch (Exception ex)
{
if (ex is not OperationCanceledException)
{
_telemetryClient.TrackException(ex);
throw;
}
_telemetryClient.TrackTrace("Service shutdown started");
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_messageChannel.Dispose();
await base.StopAsync(cancellationToken);
}
}
} }

View file

@ -5,13 +5,15 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Core.Telemetry; using Core.Telemetry;
using Microsoft.ApplicationInsights; using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.DataContracts;
namespace Tests namespace Tests
{ {
[TestClass] [TestClass]
public class MessageChannelIntegrationTests : TestFixture public class MessageChannelIntegrationTests : TestFixture
{ {
private IMessageChannel _messageChannel; private IMessageChannel<ITelemetry> _messageChannel;
private SeqBackgroundService _service; private SeqBackgroundService _service;
private CancellationTokenSource _cts; private CancellationTokenSource _cts;
@ -23,36 +25,30 @@ namespace Tests
var httpClient = new HttpClient(new TestMessageHandler()); var httpClient = new HttpClient(new TestMessageHandler());
_service = new SeqBackgroundService(telemetryClient, _messageChannel, httpClient); _service = new SeqBackgroundService(telemetryClient, _messageChannel, httpClient);
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
} }
[TestMethod] [TestMethod]
public async Task Messages_ShouldBeProcessedFromQueue() public async Task Messages_ShouldBeProcessedFromQueue()
{ {
// Arrange
var processedMessages = new List<HttpRequestMessage>(); var processedMessages = new List<HttpRequestMessage>();
// Start service
var serviceTask = _service.StartAsync(_cts.Token); var serviceTask = _service.StartAsync(_cts.Token);
// Act
// Send nogle beskeder til køen
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
var message = new HttpRequestMessage(HttpMethod.Post, $"http://test.com/{i}"); var eventTelemetry = new EventTelemetry("SomeEvent");
await _messageChannel.Writer.WriteAsync(message); await _messageChannel.Writer.WriteAsync(eventTelemetry);
} }
// Vent lidt for at sikre processing // wait for processing
await Task.Delay(5000); await Task.Delay(5000);
// Stop servicen
_cts.Cancel(); _cts.Cancel();
await _service.StopAsync(CancellationToken.None); await _service.StopAsync(CancellationToken.None);
// Assert
// Check at køen er tom
bool hasMoreMessages = await _messageChannel.Reader.WaitToReadAsync(); bool hasMoreMessages = await _messageChannel.Reader.WaitToReadAsync();
Assert.IsFalse(hasMoreMessages, "Køen burde være tom"); Assert.IsFalse(hasMoreMessages, "Queue should be empty after 5 seconds");
} }
private class TestMessageHandler : HttpMessageHandler private class TestMessageHandler : HttpMessageHandler