Producing and Consuming Kafka Messages in CloudEvents Format Using the C# SDK

Intro

“CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems.”

You’ve probably come across a bunch of similar statements from the CloudEvents spec and the related documentation.

Still, when I was exploring the topic, it was somewhat hard for me to find any practical examples of how this would play with different message brokers and serialization formats.

Collecting information from various sources, I managed to get some examples up and running. So I decided to share the outcomes in a few articles.

With that said, this post is not about the theoretical foundations of CloudEvents. Instead, it aims at providing a good starting point for anyone looking into using CloudEvents with Kafka.

Concretely, I’ll present a simple Kafka producer and consumer using the C# CloudEvents SDK. I’ll use JSON for serialization, but I’m planning to demonstrate how to utilize Avro and Protobuf as well in a subsequent post.

You can find the code on GitHub.

Setting Up Kafka

To run the example from this article, I’m spinning up a two-broker Kafka cluster in Docker using a docker-compose file you can find here.

The Demo App

The code of the sample app is quite straightforward. There’s as little logic as possible not to get distracted from the main subject.

The Main method creates a Kafka topic, then runs the producer and consumer in parallel:

public static async Task Main()
{
    await CreateKafkaTopic();

    var cts = new CancellationTokenSource();
    var formatter = new JsonEventFormatter<User>(SerializationOptions, new JsonDocumentOptions());

    var producer = Task.Run(() => StartProducer(formatter, cts.Token));
    var consumer = Task.Run(() => StartConsumer(formatter, cts.Token));

    Console.ReadKey();
    cts.Cancel();
    await Task.WhenAll(producer, consumer);
}

Producer

Here’s the StartProducer implementation:

private static async Task StartProducer(JsonEventFormatter formatter, CancellationToken ct)
{
    var producerConfig = new ProducerConfig { BootstrapServers = BootstrapServers };
    using var producer = new ProducerBuilder<string, byte[]>(producerConfig).Build();

    var i = 1;
    while (!ct.IsCancellationRequested)
    {
        var userId = $"UserId_{i}";
        var cloudEvent = new CloudEvent
        {
            Id = Guid.NewGuid().ToString(),
            Type = "event-type",
            Source = new Uri("https://cloudevents.io/"),
            Time = DateTimeOffset.UtcNow,
            DataContentType = "application/cloudevents+json",
            Data = new User
            {
                UserId = userId,
                Name = $"Name_{i}",
            }
        };
        cloudEvent.SetPartitionKey(userId);
        var kafkaMessage = cloudEvent.ToKafkaMessage(ContentMode.Structured, formatter);
        await producer.ProduceAsync(Topic, kafkaMessage);
            
        i++;

        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

Let’s review some of the important aspects.

We create an instance of a CloudEvent where we set the required properties with some dummy data. Some of these are Id, Type, Source – they are all part of CE specification.

The Data field is of object type, and it’s meant to hold your concrete event data. In this example, I’m just creating a User that contains only an Id and a Name.

To provide the Kafka partition key, we use the SetPartitionKey method on the CloudEvent. Behind the scenes, this will set a specific ”extension” field called partitionkey. This is later used in the ToKafkaMessage method from the CloudNative.CloudEvents.Kafka package to properly apply the key of the Kafka message.

ToKafkaMessage has two parameters.

One is the ContentMode, which we set to Structured, meaning we’ll encode the entire Cloud Event into the message body. The other option is to use a Binary ContentMode. In this case, all the CloudEvent fields, except for the data, will be mapped to the Kafka message header. The data should be passed as the message body as a byte sequence.

ToKafkaMessage also accepts a formatter that, in our case, is a JsonEventFormatter. This is a simple wrapper around a standard .NET JSON serializer like System.Text.Json or Newtonsoft.Json. Which one will be used depends on the NuGet package you install – CloudNative.CloudEvents.SystemTextJson or CloudNative.CloudEvents.NewtonsoftJson.

I encourage you to inspect the ToKafkaMessage code yourself as it’s relatively easy to comprehend:

        public static Message<string?, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
        {
            Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
            Validation.CheckNotNull(formatter, nameof(formatter));

            var headers = MapHeaders(cloudEvent);
            string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
            byte[] value;
            string? contentTypeHeaderValue;

            switch (contentMode)
            {
                case ContentMode.Structured:
                    value = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType));
                    // TODO: What about the non-media type parts?
                    contentTypeHeaderValue = contentType.MediaType;
                    break;
                case ContentMode.Binary:
                    value = BinaryDataUtilities.AsArray(formatter.EncodeBinaryModeEventData(cloudEvent));
                    contentTypeHeaderValue = formatter.GetOrInferDataContentType(cloudEvent);
                    break;
                default:
                    throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
            }
            if (contentTypeHeaderValue is object)
            {
                headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue));
            }
            return new Message<string?, byte[]>
            {
                Headers = headers,
                Value = value,
                Key = key
            };
        }

You can also check it out on GitHub.

Consumer

Let’s now review the StartConsumer method as well:

private static void StartConsumer(JsonEventFormatter formatter, CancellationToken ct)
{
    var consumerConfig = new ConsumerConfig
    {
        BootstrapServers = BootstrapServers,
        GroupId = "cgid"
    };
    using var consumer = new ConsumerBuilder<string, byte[]>(consumerConfig).Build();
    consumer.Subscribe(new[] { Topic });

    while (!ct.IsCancellationRequested)
    {
        var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100));
        if (consumeResult is null) continue;
            
        var consumedMessage = consumeResult.Message;
        var cloudEventMessage = consumedMessage.ToCloudEvent(formatter);
        var data = (User)cloudEventMessage.Data;

        var partition = consumeResult.Partition;
        var key = consumeResult.Message.Key;
        var offset = consumeResult.Offset;
        var dataJson = JsonSerializer.Serialize(data, SerializationOptions);

        Console.WriteLine($"Partition: {partition} Key: {key} Offset: {offset} Data: {dataJson}");    
    }
}

Not surprisingly, the logic above consumes Kafka messages of type Message<string, byte>, then uses the CloudEvents SDK method ToCloudEvent to convert them to CloudEvent objects. That’s the opposite of the ToKafkaMessage method we reviewed in the previous section. Feel free to check its implementation as well.

Finally, we print the Data object, which is of type User, to the console as a JSON string.

Output

Running the program should output something like this:

This only prints the content of the data field of the CloudEvent.

If you want to check the entire Kafka message, you can modify the program or use a tool like kafkacat:

Summary

In this article, I presented a practical example of using the C# CloudEvents SDK to produce and consume Kafka messages.

Thanks for reading, and see you next time!

Resources

  1. GitHub Repo
  2. Cloud Events
  3. Cloud Events Spec
  4. Cloud Events C# SDK

Site Footer

Subscribe To My Newsletter

Email address