DEV Community

Cover image for πŸš€ Event-Driven Excellence: Integrating Kafka with Avro in a .NET Hosted Service
Vijay Karajgikar
Vijay Karajgikar

Posted on

πŸš€ Event-Driven Excellence: Integrating Kafka with Avro in a .NET Hosted Service

Event-driven architecture is the future of resilient and scalable systems, and at its core, you'll often find Apache Kafka. But transmitting plain JSON or raw bytes over Kafka is a recipe for maintenance headaches. You need a contract. You need Apache Avro.

Avro provides a schema-based serialization format that's compact, fast, and, most importantly, handles schema evolution gracefully. This is critical for microservices that must communicate reliably across different deployment cycles.

This guide will show you how to build a robust, production-ready Kafka Producer and Consumer using a .NET Hosted Service, leveraging Avro schemas and the modern conventions of simple interface patterns and primary constructors. We’ll also cover how to make everything easily configurable and, crucially, how to unit test the core logic without needing a running Kafka cluster.


πŸ’‘ Why Kafka + Avro is Your Event Stream Dream Team

Choosing a messaging system is only half the battle; how you format the data is equally important. Pairing Kafka with Avro (managed by the Schema Registry) solves critical distributed system challenges.

1. Data Contract Enforcement (Schema Evolution)

  • The Problem with JSON: JSON payload producers and consumers often implicitly agree on a structure. If a producer adds a field, older consumers might crash or ignore it. If a producer removes a field, consumers relying on it break.

  • The Avro Solution: Avro schemas are registered with the Schema Registry before any data is sent. The Schema Registry performs compatibility checks (e.g., Backward, Forward, or Full), preventing producers from deploying breaking changes that would harm consumers. This ensures that new producers and old consumers (and vice versa) can coexist without fear of runtime deserialization errors.

2. Efficiency and Performance

  • Compactness: Avro serializes data into a binary format. Unlike verbose JSON, it doesn't transmit field names with every message. This dramatically reduces message size, leading to lower bandwidth consumption and higher throughput in Kafka.

  • Speed: Binary serialization and deserialization are typically much faster than text-based formats like JSON or XML, directly improving the latency of your event pipeline.

3. Language Agnosticism

The Avro schema is a language-independent definition of your data. A .NET producer using the C# UserCreatedEvent record can seamlessly communicate with a Java, Python, or Go consumer, provided they all deserialize the message using the same registered Avro schema. This makes multi-language microservice environments significantly easier to manage.


πŸ› οΈ Project Setup, Configuration, and Dependencies

Start with a new Worker Service project in .NET (e.g., .NET 8) and add the necessary packages.

1. Install NuGet Packages

We'll use Confluent's official .NET client, which integrates seamlessly with the Confluent Schema Registry.

dotnet add package Confluent.Kafka dotnet add package Confluent.SchemaRegistry dotnet add package Confluent.SchemaRegistry.Serdes.Avro dotnet add package Microsoft.Extensions.Hosting # For the Hosted Service pattern dotnet add package Microsoft.NET.Test.Sdk # For Unit Testing dotnet add package Moq # For Mocking dependencies dotnet add package NUnit # For Unit Testing framework 
Enter fullscreen mode Exit fullscreen mode

2. Configuration: The Options Pattern

For a production application, all connection and topic details must be externalized. We’ll use the .NET Options Pattern to bind configurations from appsettings.json to POCO classes.

// appsettings.json { "KafkaConfig": { "BootstrapServers": "localhost:9092", "SchemaRegistryUrl": "http://localhost:8081", "Topics": { "UserEvents": "user-events-topic" }, "ConsumerGroupId": "user-event-consumer-group-v1" } } 
Enter fullscreen mode Exit fullscreen mode

Now, define the corresponding configuration classes in C#:

namespace App.Configuration; public class KafkaConfig { public string BootstrapServers { get; set; } = "localhost:9092"; public string SchemaRegistryUrl { get; set; } = "http://localhost:8081"; public string Topic { get; set; } = "user-events"; public string ConsumerGroupId { get; set; } } 
Enter fullscreen mode Exit fullscreen mode

3. Avro Schema and Data Model

Define your contract in an Avro schema file (UserCreated.avsc). This is the language-agnostic truth.

// UserCreated.avsc { "type": "record", "name": "UserCreatedEvent", "namespace": "App.Events", "fields": [ { "name": "UserId", "type": "string", "doc": "The unique identifier for the user" }, { "name": "Username", "type": "string", "doc": "The display name of the user, chosen by the user"}, { "name": "UserEmail", "type": "string", "doc": "The email address of the user"}, { "name": "CreatedAt", "type": "long", "doc": "Unix timestamp in ms of creation" } ] } 
Enter fullscreen mode Exit fullscreen mode

The corresponding C# class derived from ISpecificRecord

public class UserCreatedEvent : ISpecificRecord { private static readonly Schema _schema = Schema.Parse(@" { ""type"": ""record"", ""name"": ""UserCreatedEvent"", ""fields"": [ {""name"": ""UserId"", ""type"": ""string""}, {""name"": ""UserName"", ""type"": ""string""}, {""name"": ""UserEmail"", ""type"": ""string""}, {""name"": ""CreatedAt"", ""type"": ""long""} ] }"); public string UserId { get; set; } public string UserName { get; set; } public string UserEmail { get; set; } public long CreatedAt { get; set; } public UserCreatedEvent() { } public UserCreatedEvent(string userId, string userName, string userEmail, long createdAt) { UserId = userId; UserName = userName; UserEmail = userEmail; CreatedAt = createdAt; } public global::Avro.Schema Schema => _schema; public object Get(int fieldPos) { return fieldPos switch { 0 => UserId, 1 => UserName, 2 => UserEmail, 3 => CreatedAt, _ => throw new Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()") }; } public void Put(int fieldPos, object fieldValue) { switch (fieldPos) { case 0: UserId = (string)fieldValue; break; case 1: UserName = (string)fieldValue; break; case 2: UserEmail = (string)fieldValue; break; case 3: CreatedAt = (long)fieldValue; break; default: throw new Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()"); } } } 
Enter fullscreen mode Exit fullscreen mode

πŸ§‘β€πŸ’» The Producer: Event Dispatcher

We implement the producer behind a simple interface, decoupling the caller from Kafka implementation details.

1. Producer Interface

namespace App.Services; public interface IEventProducer<TKey, TValue> where TValue : class { Task ProduceAsync(TKey key, TValue value); } 
Enter fullscreen mode Exit fullscreen mode

2. Producer Implementation

We inject the pre-configured IProducer and KafkaConfig.

using System.Collections.Concurrent; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using App.Configuration; using App.Events; namespace App.Services; public class UserCreatedProducer<TKey, TValue>( IProducer<TKey, TValue> producer, IOptions<UserEventsConfig> options, ILogger<UserCreatedProducer<TKey, TValue>> logger) : IEventProducer<TKey, TValue> where TValue : class { private readonly string topic = options.Value.KafkaConfig.Topic; public async Task ProduceAsync(TKey key, TValue value) { if (value == null) throw new ArgumentNullException(nameof(value), "Event value cannot be null"); try { var message = new Message<TKey, TValue> { Key = key, Value = value }; var deliveryResult = await producer.ProduceAsync(topic, message); logger.LogInformation( "Produced event {EventType} to topic '{Topic}', Partition: {Partition}, Offset: {Offset}", typeof(TValue).Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset); } catch (ProduceException<TKey, TValue> ex) { logger.LogError("Kafka produce error: {ErrorReason}", ex.Error.Reason); throw; } } } 
Enter fullscreen mode Exit fullscreen mode

πŸ‘‚ The Consumer: Hosted Service Listener

The consumer runs in the background using .NET’s built-in BackgroundService, which is ideal for long-running, non-blocking tasks.

1. Event Handler Interface

We isolate the business logic from the Kafka plumbing.

namespace App.Handlers; public interface IEventHandler<in TEvent> { Task HandleAsync(TEvent @event); } 
Enter fullscreen mode Exit fullscreen mode

2. Application Logic Handler

This is where the actual work happens.

using Microsoft.Extensions.Logging; using App.Events; using App.Handlers; namespace App.Handlers.Implementations; public class UserCreationHandler(ILogger<UserCreationHandler> logger) : IEventHandler<UserCreatedEvent> { public Task HandleAsync(UserCreatedEvent @event) { // --- Business Logic: e.g., Update Database, Send Welcome Email --- logger.LogInformation( "βœ… User Event Processed | ID: {UserId} | Username: {Username} | Created At: {Timestamp}", @event.UserId, @event.Username, DateTimeOffset.FromUnixTimeMilliseconds(@event.CreatedAtTimestamp)); return Task.CompletedTask; } } 
Enter fullscreen mode Exit fullscreen mode

3. The Kafka Consumer Hosted Service

This service subscribes, polls, and manages the consumer lifecycle, handling the AvroDeserializer magic.

using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using App.Configuration; using App.Events; using App.Handlers; namespace App.Services; public class UserEventsConsumerService( IConsumer<string, UserCreatedEvent> consumer, IEventHandler<UserCreatedEvent> userCreatedEventHandler, IEventProducer<string, UserCreatedEvent> producer, ILogger<UserEventsConsumerService> logger, IOptions<UserEventsConfig> options) : BackgroundService { private readonly string topic = options.Value.KafkaConfig.Topic; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var userId = "vk260"; var username = "vijay-karajgikar"; var userEmail = "dotsharpfx@gmail.com"; var createdAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await producer.ProduceAsync(userId, new UserCreatedEvent(userId, username, userEmail, createdAt)); await StartConsumerLoop(stoppingToken); } private async Task StartConsumerLoop(CancellationToken stoppingToken) { consumer.Subscribe(topic); logger.LogInformation("Subscribed to topic: {Topic}", topic); try { while (!stoppingToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(stoppingToken); var userEvent = consumeResult.Message.Value; await userCreatedEventHandler.HandleAsync(userEvent); logger.LogInformation("Consumed message with key: {Key}, UserId: {UserId}", consumeResult.Message.Key, userEvent.UserId); consumer.Commit(consumeResult); var userId = $"vk260-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; var username = "vijay-karajgikar"; var userEmail = "dotsharpfx@gmail.com"; var createdAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await producer.ProduceAsync(userId, new UserCreatedEvent(userId, username, userEmail, createdAt)); } catch (ConsumeException ex) { logger.LogError(ex, "Error occurred while consuming message"); } } } catch (OperationCanceledException) { logger.LogInformation("Consumer loop cancelled"); } finally { consumer.Close(); logger.LogInformation("Consumer closed"); } } } 
Enter fullscreen mode Exit fullscreen mode

πŸ—οΈ Dependency Injection (Program.cs)

This is where we wire up all configurations, builders, and services.

using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Configuration; using Serilog; using Microsoft.Extensions.DependencyInjection; using Confluent.SchemaRegistry; using Confluent.Kafka; using Microsoft.Extensions.Options; using Confluent.SchemaRegistry.Serdes; using Confluent.Kafka.SyncOverAsync; using App.Configuration; using App.Events; using App.Handlers; using App.Handlers.Implementations; using App.Services; // Configure Serilog logger Log.Logger = new LoggerConfiguration() .MinimumLevel.Debug() .WriteTo.Console() .CreateLogger(); var builder = Host.CreateDefaultBuilder(args) .ConfigureAppConfiguration((hostingContext, config) => { // --- 1. Load Configurations --- var appSettingsPath = Path.Combine(AppContext.BaseDirectory, "appsettings.json"); config.AddJsonFile(appSettingsPath, optional: false, reloadOnChange: true); config.AddEnvironmentVariables(); }) .UseSerilog((hostingContext, loggerConfiguration) => { // --- 2. Configure Serilog --- loggerConfiguration .ReadFrom.Configuration(hostingContext.Configuration) .Enrich.FromLogContext(); }) .ConfigureServices((hostingContext, services) => { // --- 3. Add Options pattern --- services.AddOptions<UserEventsConfig>(); services.Configure<UserEventsConfig>(hostingContext.Configuration); // --- 4. Schema Registry Client (Singleton) --- services.AddSingleton<ISchemaRegistryClient>(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = config.KafkaConfig.SchemaRegistryUrl }); }); // --- 5. Producer Registration (Singleton) --- services.AddSingleton(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; var schemaRegistryClient = provider.GetRequiredService<ISchemaRegistryClient>(); var producerConfig = new ProducerConfig { BootstrapServers = config.KafkaConfig.BootstrapServers }; return new ProducerBuilder<string, UserCreatedEvent>(producerConfig) .SetValueSerializer(new AvroSerializer<UserCreatedEvent>(schemaRegistryClient)) .Build(); }); services.AddSingleton< IEventProducer<string, UserCreatedEvent>, UserCreatedProducer<string, UserCreatedEvent>>(); // --- 6. Consumer Registration (Singleton) --- services.AddSingleton(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; var schemaRegistryClient = provider.GetRequiredService<ISchemaRegistryClient>(); // Confluent.Kafka Consumer Configuration  var consumerConfig = new ConsumerConfig { BootstrapServers = config.KafkaConfig.BootstrapServers, GroupId = config.KafkaConfig.ConsumerGroupId, AutoOffsetReset = AutoOffsetReset.Earliest // Start at the beginning if no offset exists }; return new ConsumerBuilder<string, UserCreatedEvent>(consumerConfig) .SetValueDeserializer(new AvroDeserializer<UserCreatedEvent>(schemaRegistryClient).AsSyncOverAsync()) .Build(); }); // --- 7. Application logic and Hosted Service --- services.AddTransient<IEventHandler<UserCreatedEvent>, UserCreationHandler>(); services.AddHostedService<UserEventsConsumerService>(); }); var host = builder.Build(); await host.RunAsync(); 
Enter fullscreen mode Exit fullscreen mode

πŸ§ͺ Unit Testing Core Logic

The use of the simple interface pattern makes unit testing the core logic trivial. We can mock IProducer and IConsumer to verify interactions without touching Kafka or the Schema Registry.

1. Testing the Producer Logic

using Xunit; using Moq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Confluent.Kafka; using App.Configuration; using App.Events; using App.Services; public class UserCreatedProducerTests { private readonly Mock<IProducer<string, UserCreatedEvent>> _mockProducer; private readonly Mock<ILogger<UserCreatedProducer<string, UserCreatedEvent>>> _mockLogger; private readonly IOptions<UserEventsConfig> _options; private readonly UserCreatedProducer<string, UserCreatedEvent> _producer; public UserCreatedProducerTests() { _mockProducer = new Mock<IProducer<string, UserCreatedEvent>>(); _mockLogger = new Mock<ILogger<UserCreatedProducer<string, UserCreatedEvent>>>(); _options = Options.Create(new UserEventsConfig { KafkaConfig = new KafkaConfig { Topic = "user-events", BootstrapServers = "localhost:9092", ConsumerGroupId = "test-group", SchemaRegistryUrl = "http://localhost:8081" } }); _producer = new UserCreatedProducer<string, UserCreatedEvent>( _mockProducer.Object, _options, _mockLogger.Object); } [Fact] public async Task ProduceAsync_WithValidEvent_ShouldSucceed() { // Arrange var userEvent = new UserCreatedEvent { UserId = "user-123", UserName = "john_doe", UserEmail = "john@example.com", CreatedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; _mockProducer .Setup(p => p.ProduceAsync(It.IsAny<string>(), It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>())) .ReturnsAsync(Mock.Of<DeliveryReport<string, UserCreatedEvent>>()); // Act await _producer.ProduceAsync("user-123", userEvent); // Assert _mockProducer.Verify( p => p.ProduceAsync( "user-events", It.Is<Message<string, UserCreatedEvent>>(m => m.Key == "user-123" && m.Value == userEvent), It.IsAny<CancellationToken>()), Times.Once); } [Fact] public async Task ProduceAsync_ShouldUseConfiguredTopic() { // Arrange var userEvent = new UserCreatedEvent { UserId = "user-123", UserName = "john_doe", UserEmail = "john@example.com", CreatedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; _mockProducer .Setup(p => p.ProduceAsync(It.IsAny<string>(), It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>())) .ReturnsAsync(Mock.Of<DeliveryReport<string, UserCreatedEvent>>()); // Act await _producer.ProduceAsync("user-123", userEvent); // Assert _mockProducer.Verify( p => p.ProduceAsync("user-events", It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>()), Times.Once); } } 
Enter fullscreen mode Exit fullscreen mode

πŸ”— Get the Code & Let's Build Together!

If you've followed along and are ready to see this whole Avro-Kafka-Hosted-Service puzzle piece together, you're in luck! The complete, runnable solution for this project is available on my GitHub repository.

This isn't just my projectβ€”it's ours. We're all about learning and building better systems.

  • Want to try a different approach?
  • See a bug or a missing feature (like integration tests using Testcontainers)?
  • Have an idea for optimization or a clean-up?

We highly encourage you and your team members to fork the repository, experiment, and send those Pull Requests (PRs) our way! Let's elevate this codebase together.

How to Contribute 🀝

  1. Fork the repository linked below.
  2. Create your feature branch (git checkout -b feature/awesome-new-thing).
  3. Commit your changes (git commit -m 'feat: Add awesome new thing').
  4. Push to the branch (git push origin feature/awesome-new-thing).
  5. Open a Pull Request and describe your changes clearly!

Complete Source Code: https://github.com/dotsharpfx-dotnet/UserEventsApp


🌟 The Takeaway

Integrating Kafka and Avro in .NET doesn't have to be a complex web of configuration files and low-level API calls. By embracing .NET's Hosted Service pattern, the simple interface pattern, and primary constructors, you build a system that is:

  • Reliable: Avro and Schema Registry guarantee data contract integrity.
  • Configurable: The Options pattern makes broker and topic management flexible across environments.
  • Testable: Interfaces and mocking allow you to unit test business logic and producer/consumer wrappers without touching the actual messaging infrastructure.

This approach gives you a clean separation of concerns, resulting in a codebase that is easier to maintain, debug, and scale. Now go build something amazing!


Ready to tackle the next phase of event-driven systems? Would you like a deep dive into using Testcontainers for integration testing this exact setup, or perhaps a guide on Saga patterns in .NET?

Top comments (0)