Event-driven architecture is the future of resilient and scalable systems, and at its core, you'll often find Apache Kafka. But transmitting plain JSON or raw bytes over Kafka is a recipe for maintenance headaches. You need a contract. You need Apache Avro.
Avro provides a schema-based serialization format that's compact, fast, and, most importantly, handles schema evolution gracefully. This is critical for microservices that must communicate reliably across different deployment cycles.
This guide will show you how to build a robust, production-ready Kafka Producer and Consumer using a .NET Hosted Service, leveraging Avro schemas and the modern conventions of simple interface patterns and primary constructors. Weβll also cover how to make everything easily configurable and, crucially, how to unit test the core logic without needing a running Kafka cluster.
π‘ Why Kafka + Avro is Your Event Stream Dream Team
Choosing a messaging system is only half the battle; how you format the data is equally important. Pairing Kafka with Avro (managed by the Schema Registry) solves critical distributed system challenges.
1. Data Contract Enforcement (Schema Evolution)
The Problem with JSON: JSON payload producers and consumers often implicitly agree on a structure. If a producer adds a field, older consumers might crash or ignore it. If a producer removes a field, consumers relying on it break.
The Avro Solution: Avro schemas are registered with the Schema Registry before any data is sent. The Schema Registry performs compatibility checks (e.g., Backward, Forward, or Full), preventing producers from deploying breaking changes that would harm consumers. This ensures that new producers and old consumers (and vice versa) can coexist without fear of runtime deserialization errors.
2. Efficiency and Performance
Compactness: Avro serializes data into a binary format. Unlike verbose JSON, it doesn't transmit field names with every message. This dramatically reduces message size, leading to lower bandwidth consumption and higher throughput in Kafka.
Speed: Binary serialization and deserialization are typically much faster than text-based formats like JSON or XML, directly improving the latency of your event pipeline.
3. Language Agnosticism
The Avro schema is a language-independent definition of your data. A .NET producer using the C# UserCreatedEvent record can seamlessly communicate with a Java, Python, or Go consumer, provided they all deserialize the message using the same registered Avro schema. This makes multi-language microservice environments significantly easier to manage.
π οΈ Project Setup, Configuration, and Dependencies
Start with a new Worker Service project in .NET (e.g., .NET 8) and add the necessary packages.
1. Install NuGet Packages
We'll use Confluent's official .NET client, which integrates seamlessly with the Confluent Schema Registry.
dotnet add package Confluent.Kafka dotnet add package Confluent.SchemaRegistry dotnet add package Confluent.SchemaRegistry.Serdes.Avro dotnet add package Microsoft.Extensions.Hosting # For the Hosted Service pattern dotnet add package Microsoft.NET.Test.Sdk # For Unit Testing dotnet add package Moq # For Mocking dependencies dotnet add package NUnit # For Unit Testing framework 2. Configuration: The Options Pattern
For a production application, all connection and topic details must be externalized. Weβll use the .NET Options Pattern to bind configurations from appsettings.json to POCO classes.
// appsettings.json { "KafkaConfig": { "BootstrapServers": "localhost:9092", "SchemaRegistryUrl": "http://localhost:8081", "Topics": { "UserEvents": "user-events-topic" }, "ConsumerGroupId": "user-event-consumer-group-v1" } } Now, define the corresponding configuration classes in C#:
namespace App.Configuration; public class KafkaConfig { public string BootstrapServers { get; set; } = "localhost:9092"; public string SchemaRegistryUrl { get; set; } = "http://localhost:8081"; public string Topic { get; set; } = "user-events"; public string ConsumerGroupId { get; set; } } 3. Avro Schema and Data Model
Define your contract in an Avro schema file (UserCreated.avsc). This is the language-agnostic truth.
// UserCreated.avsc { "type": "record", "name": "UserCreatedEvent", "namespace": "App.Events", "fields": [ { "name": "UserId", "type": "string", "doc": "The unique identifier for the user" }, { "name": "Username", "type": "string", "doc": "The display name of the user, chosen by the user"}, { "name": "UserEmail", "type": "string", "doc": "The email address of the user"}, { "name": "CreatedAt", "type": "long", "doc": "Unix timestamp in ms of creation" } ] } The corresponding C# class derived from ISpecificRecord
public class UserCreatedEvent : ISpecificRecord { private static readonly Schema _schema = Schema.Parse(@" { ""type"": ""record"", ""name"": ""UserCreatedEvent"", ""fields"": [ {""name"": ""UserId"", ""type"": ""string""}, {""name"": ""UserName"", ""type"": ""string""}, {""name"": ""UserEmail"", ""type"": ""string""}, {""name"": ""CreatedAt"", ""type"": ""long""} ] }"); public string UserId { get; set; } public string UserName { get; set; } public string UserEmail { get; set; } public long CreatedAt { get; set; } public UserCreatedEvent() { } public UserCreatedEvent(string userId, string userName, string userEmail, long createdAt) { UserId = userId; UserName = userName; UserEmail = userEmail; CreatedAt = createdAt; } public global::Avro.Schema Schema => _schema; public object Get(int fieldPos) { return fieldPos switch { 0 => UserId, 1 => UserName, 2 => UserEmail, 3 => CreatedAt, _ => throw new Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()") }; } public void Put(int fieldPos, object fieldValue) { switch (fieldPos) { case 0: UserId = (string)fieldValue; break; case 1: UserName = (string)fieldValue; break; case 2: UserEmail = (string)fieldValue; break; case 3: CreatedAt = (long)fieldValue; break; default: throw new Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()"); } } } π§βπ» The Producer: Event Dispatcher
We implement the producer behind a simple interface, decoupling the caller from Kafka implementation details.
1. Producer Interface
namespace App.Services; public interface IEventProducer<TKey, TValue> where TValue : class { Task ProduceAsync(TKey key, TValue value); } 2. Producer Implementation
We inject the pre-configured IProducer and KafkaConfig.
using System.Collections.Concurrent; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using App.Configuration; using App.Events; namespace App.Services; public class UserCreatedProducer<TKey, TValue>( IProducer<TKey, TValue> producer, IOptions<UserEventsConfig> options, ILogger<UserCreatedProducer<TKey, TValue>> logger) : IEventProducer<TKey, TValue> where TValue : class { private readonly string topic = options.Value.KafkaConfig.Topic; public async Task ProduceAsync(TKey key, TValue value) { if (value == null) throw new ArgumentNullException(nameof(value), "Event value cannot be null"); try { var message = new Message<TKey, TValue> { Key = key, Value = value }; var deliveryResult = await producer.ProduceAsync(topic, message); logger.LogInformation( "Produced event {EventType} to topic '{Topic}', Partition: {Partition}, Offset: {Offset}", typeof(TValue).Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset); } catch (ProduceException<TKey, TValue> ex) { logger.LogError("Kafka produce error: {ErrorReason}", ex.Error.Reason); throw; } } } π The Consumer: Hosted Service Listener
The consumer runs in the background using .NETβs built-in BackgroundService, which is ideal for long-running, non-blocking tasks.
1. Event Handler Interface
We isolate the business logic from the Kafka plumbing.
namespace App.Handlers; public interface IEventHandler<in TEvent> { Task HandleAsync(TEvent @event); } 2. Application Logic Handler
This is where the actual work happens.
using Microsoft.Extensions.Logging; using App.Events; using App.Handlers; namespace App.Handlers.Implementations; public class UserCreationHandler(ILogger<UserCreationHandler> logger) : IEventHandler<UserCreatedEvent> { public Task HandleAsync(UserCreatedEvent @event) { // --- Business Logic: e.g., Update Database, Send Welcome Email --- logger.LogInformation( "β
User Event Processed | ID: {UserId} | Username: {Username} | Created At: {Timestamp}", @event.UserId, @event.Username, DateTimeOffset.FromUnixTimeMilliseconds(@event.CreatedAtTimestamp)); return Task.CompletedTask; } } 3. The Kafka Consumer Hosted Service
This service subscribes, polls, and manages the consumer lifecycle, handling the AvroDeserializer magic.
using Confluent.Kafka; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using App.Configuration; using App.Events; using App.Handlers; namespace App.Services; public class UserEventsConsumerService( IConsumer<string, UserCreatedEvent> consumer, IEventHandler<UserCreatedEvent> userCreatedEventHandler, IEventProducer<string, UserCreatedEvent> producer, ILogger<UserEventsConsumerService> logger, IOptions<UserEventsConfig> options) : BackgroundService { private readonly string topic = options.Value.KafkaConfig.Topic; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var userId = "vk260"; var username = "vijay-karajgikar"; var userEmail = "dotsharpfx@gmail.com"; var createdAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await producer.ProduceAsync(userId, new UserCreatedEvent(userId, username, userEmail, createdAt)); await StartConsumerLoop(stoppingToken); } private async Task StartConsumerLoop(CancellationToken stoppingToken) { consumer.Subscribe(topic); logger.LogInformation("Subscribed to topic: {Topic}", topic); try { while (!stoppingToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(stoppingToken); var userEvent = consumeResult.Message.Value; await userCreatedEventHandler.HandleAsync(userEvent); logger.LogInformation("Consumed message with key: {Key}, UserId: {UserId}", consumeResult.Message.Key, userEvent.UserId); consumer.Commit(consumeResult); var userId = $"vk260-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}"; var username = "vijay-karajgikar"; var userEmail = "dotsharpfx@gmail.com"; var createdAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); await producer.ProduceAsync(userId, new UserCreatedEvent(userId, username, userEmail, createdAt)); } catch (ConsumeException ex) { logger.LogError(ex, "Error occurred while consuming message"); } } } catch (OperationCanceledException) { logger.LogInformation("Consumer loop cancelled"); } finally { consumer.Close(); logger.LogInformation("Consumer closed"); } } } ποΈ Dependency Injection (Program.cs)
This is where we wire up all configurations, builders, and services.
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Configuration; using Serilog; using Microsoft.Extensions.DependencyInjection; using Confluent.SchemaRegistry; using Confluent.Kafka; using Microsoft.Extensions.Options; using Confluent.SchemaRegistry.Serdes; using Confluent.Kafka.SyncOverAsync; using App.Configuration; using App.Events; using App.Handlers; using App.Handlers.Implementations; using App.Services; // Configure Serilog logger Log.Logger = new LoggerConfiguration() .MinimumLevel.Debug() .WriteTo.Console() .CreateLogger(); var builder = Host.CreateDefaultBuilder(args) .ConfigureAppConfiguration((hostingContext, config) => { // --- 1. Load Configurations --- var appSettingsPath = Path.Combine(AppContext.BaseDirectory, "appsettings.json"); config.AddJsonFile(appSettingsPath, optional: false, reloadOnChange: true); config.AddEnvironmentVariables(); }) .UseSerilog((hostingContext, loggerConfiguration) => { // --- 2. Configure Serilog --- loggerConfiguration .ReadFrom.Configuration(hostingContext.Configuration) .Enrich.FromLogContext(); }) .ConfigureServices((hostingContext, services) => { // --- 3. Add Options pattern --- services.AddOptions<UserEventsConfig>(); services.Configure<UserEventsConfig>(hostingContext.Configuration); // --- 4. Schema Registry Client (Singleton) --- services.AddSingleton<ISchemaRegistryClient>(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = config.KafkaConfig.SchemaRegistryUrl }); }); // --- 5. Producer Registration (Singleton) --- services.AddSingleton(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; var schemaRegistryClient = provider.GetRequiredService<ISchemaRegistryClient>(); var producerConfig = new ProducerConfig { BootstrapServers = config.KafkaConfig.BootstrapServers }; return new ProducerBuilder<string, UserCreatedEvent>(producerConfig) .SetValueSerializer(new AvroSerializer<UserCreatedEvent>(schemaRegistryClient)) .Build(); }); services.AddSingleton< IEventProducer<string, UserCreatedEvent>, UserCreatedProducer<string, UserCreatedEvent>>(); // --- 6. Consumer Registration (Singleton) --- services.AddSingleton(provider => { var config = provider.GetRequiredService<IOptions<UserEventsConfig>>().Value; var schemaRegistryClient = provider.GetRequiredService<ISchemaRegistryClient>(); // Confluent.Kafka Consumer Configuration var consumerConfig = new ConsumerConfig { BootstrapServers = config.KafkaConfig.BootstrapServers, GroupId = config.KafkaConfig.ConsumerGroupId, AutoOffsetReset = AutoOffsetReset.Earliest // Start at the beginning if no offset exists }; return new ConsumerBuilder<string, UserCreatedEvent>(consumerConfig) .SetValueDeserializer(new AvroDeserializer<UserCreatedEvent>(schemaRegistryClient).AsSyncOverAsync()) .Build(); }); // --- 7. Application logic and Hosted Service --- services.AddTransient<IEventHandler<UserCreatedEvent>, UserCreationHandler>(); services.AddHostedService<UserEventsConsumerService>(); }); var host = builder.Build(); await host.RunAsync(); π§ͺ Unit Testing Core Logic
The use of the simple interface pattern makes unit testing the core logic trivial. We can mock IProducer and IConsumer to verify interactions without touching Kafka or the Schema Registry.
1. Testing the Producer Logic
using Xunit; using Moq; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Confluent.Kafka; using App.Configuration; using App.Events; using App.Services; public class UserCreatedProducerTests { private readonly Mock<IProducer<string, UserCreatedEvent>> _mockProducer; private readonly Mock<ILogger<UserCreatedProducer<string, UserCreatedEvent>>> _mockLogger; private readonly IOptions<UserEventsConfig> _options; private readonly UserCreatedProducer<string, UserCreatedEvent> _producer; public UserCreatedProducerTests() { _mockProducer = new Mock<IProducer<string, UserCreatedEvent>>(); _mockLogger = new Mock<ILogger<UserCreatedProducer<string, UserCreatedEvent>>>(); _options = Options.Create(new UserEventsConfig { KafkaConfig = new KafkaConfig { Topic = "user-events", BootstrapServers = "localhost:9092", ConsumerGroupId = "test-group", SchemaRegistryUrl = "http://localhost:8081" } }); _producer = new UserCreatedProducer<string, UserCreatedEvent>( _mockProducer.Object, _options, _mockLogger.Object); } [Fact] public async Task ProduceAsync_WithValidEvent_ShouldSucceed() { // Arrange var userEvent = new UserCreatedEvent { UserId = "user-123", UserName = "john_doe", UserEmail = "john@example.com", CreatedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; _mockProducer .Setup(p => p.ProduceAsync(It.IsAny<string>(), It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>())) .ReturnsAsync(Mock.Of<DeliveryReport<string, UserCreatedEvent>>()); // Act await _producer.ProduceAsync("user-123", userEvent); // Assert _mockProducer.Verify( p => p.ProduceAsync( "user-events", It.Is<Message<string, UserCreatedEvent>>(m => m.Key == "user-123" && m.Value == userEvent), It.IsAny<CancellationToken>()), Times.Once); } [Fact] public async Task ProduceAsync_ShouldUseConfiguredTopic() { // Arrange var userEvent = new UserCreatedEvent { UserId = "user-123", UserName = "john_doe", UserEmail = "john@example.com", CreatedAt = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; _mockProducer .Setup(p => p.ProduceAsync(It.IsAny<string>(), It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>())) .ReturnsAsync(Mock.Of<DeliveryReport<string, UserCreatedEvent>>()); // Act await _producer.ProduceAsync("user-123", userEvent); // Assert _mockProducer.Verify( p => p.ProduceAsync("user-events", It.IsAny<Message<string, UserCreatedEvent>>(), It.IsAny<CancellationToken>()), Times.Once); } } π Get the Code & Let's Build Together!
If you've followed along and are ready to see this whole Avro-Kafka-Hosted-Service puzzle piece together, you're in luck! The complete, runnable solution for this project is available on my GitHub repository.
This isn't just my projectβit's ours. We're all about learning and building better systems.
- Want to try a different approach?
- See a bug or a missing feature (like integration tests using Testcontainers)?
- Have an idea for optimization or a clean-up?
We highly encourage you and your team members to fork the repository, experiment, and send those Pull Requests (PRs) our way! Let's elevate this codebase together.
How to Contribute π€
- Fork the repository linked below.
- Create your feature branch (git checkout -b feature/awesome-new-thing).
- Commit your changes (git commit -m 'feat: Add awesome new thing').
- Push to the branch (git push origin feature/awesome-new-thing).
- Open a Pull Request and describe your changes clearly!
Complete Source Code: https://github.com/dotsharpfx-dotnet/UserEventsApp
π The Takeaway
Integrating Kafka and Avro in .NET doesn't have to be a complex web of configuration files and low-level API calls. By embracing .NET's Hosted Service pattern, the simple interface pattern, and primary constructors, you build a system that is:
- Reliable: Avro and Schema Registry guarantee data contract integrity.
- Configurable: The Options pattern makes broker and topic management flexible across environments.
- Testable: Interfaces and mocking allow you to unit test business logic and producer/consumer wrappers without touching the actual messaging infrastructure.
This approach gives you a clean separation of concerns, resulting in a codebase that is easier to maintain, debug, and scale. Now go build something amazing!
Ready to tackle the next phase of event-driven systems? Would you like a deep dive into using Testcontainers for integration testing this exact setup, or perhaps a guide on Saga patterns in .NET?
Top comments (0)