Table of Contents
Intro
“CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems.”
You’ve probably come across a bunch of similar statements from the CloudEvents spec and the related documentation.
Still, when I was exploring the topic, it was somewhat hard for me to find any practical examples of how this would play with different message brokers and serialization formats.
Collecting information from various sources, I managed to get some examples up and running. So I decided to share the outcomes in a few articles.
With that said, this post is not about the theoretical foundations of CloudEvents. Instead, it aims at providing a good starting point for anyone looking into using CloudEvents with Kafka.
Concretely, I’ll present a simple Kafka producer and consumer using the C# CloudEvents SDK. I’ll use JSON for serialization, but I’m planning to demonstrate how to utilize Avro and Protobuf as well in a subsequent post.
You can find the code on GitHub.
Setting Up Kafka
To run the example from this article, I’m spinning up a two-broker Kafka cluster in Docker using a docker-compose file you can find here.
The Demo App
The code of the sample app is quite straightforward. There’s as little logic as possible not to get distracted from the main subject.
The Main
method creates a Kafka topic, then runs the producer and consumer in parallel:
public static async Task Main() { await CreateKafkaTopic(); var cts = new CancellationTokenSource(); var formatter = new JsonEventFormatter<User>(SerializationOptions, new JsonDocumentOptions()); var producer = Task.Run(() => StartProducer(formatter, cts.Token)); var consumer = Task.Run(() => StartConsumer(formatter, cts.Token)); Console.ReadKey(); cts.Cancel(); await Task.WhenAll(producer, consumer); }
Producer
Here’s the StartProducer
implementation:
private static async Task StartProducer(JsonEventFormatter formatter, CancellationToken ct) { var producerConfig = new ProducerConfig { BootstrapServers = BootstrapServers }; using var producer = new ProducerBuilder<string, byte[]>(producerConfig).Build(); var i = 1; while (!ct.IsCancellationRequested) { var userId = $"UserId_{i}"; var cloudEvent = new CloudEvent { Id = Guid.NewGuid().ToString(), Type = "event-type", Source = new Uri("https://cloudevents.io/"), Time = DateTimeOffset.UtcNow, DataContentType = "application/cloudevents+json", Data = new User { UserId = userId, Name = $"Name_{i}", } }; cloudEvent.SetPartitionKey(userId); var kafkaMessage = cloudEvent.ToKafkaMessage(ContentMode.Structured, formatter); await producer.ProduceAsync(Topic, kafkaMessage); i++; await Task.Delay(TimeSpan.FromSeconds(1)); } }
Let’s review some of the important aspects.
We create an instance of a CloudEvent
where we set the required properties with some dummy data. Some of these are Id
, Type
, Source
– they are all part of CE specification.
The Data
field is of object
type, and it’s meant to hold your concrete event data. In this example, I’m just creating a User
that contains only an Id
and a Name
.
To provide the Kafka partition key, we use the SetPartitionKey
method on the CloudEvent. Behind the scenes, this will set a specific ”extension” field called partitionkey
. This is later used in the ToKafkaMessage
method from the CloudNative.CloudEvents.Kafka package to properly apply the key of the Kafka message.
ToKafkaMessage
has two parameters.
One is the ContentMode
, which we set to Structured
, meaning we’ll encode the entire Cloud Event into the message body. The other option is to use a Binary
ContentMode
. In this case, all the CloudEvent fields, except for the data
, will be mapped to the Kafka message header. The data
should be passed as the message body as a byte sequence.
ToKafkaMessage
also accepts a formatter
that, in our case, is a JsonEventFormatter
. This is a simple wrapper around a standard .NET JSON serializer like System.Text.Json
or Newtonsoft.Json
. Which one will be used depends on the NuGet package you install – CloudNative.CloudEvents.SystemTextJson or CloudNative.CloudEvents.NewtonsoftJson.
I encourage you to inspect the ToKafkaMessage
code yourself as it’s relatively easy to comprehend:
public static Message<string?, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) { Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); Validation.CheckNotNull(formatter, nameof(formatter)); var headers = MapHeaders(cloudEvent); string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute]; byte[] value; string? contentTypeHeaderValue; switch (contentMode) { case ContentMode.Structured: value = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType)); // TODO: What about the non-media type parts? contentTypeHeaderValue = contentType.MediaType; break; case ContentMode.Binary: value = BinaryDataUtilities.AsArray(formatter.EncodeBinaryModeEventData(cloudEvent)); contentTypeHeaderValue = formatter.GetOrInferDataContentType(cloudEvent); break; default: throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}"); } if (contentTypeHeaderValue is object) { headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue)); } return new Message<string?, byte[]> { Headers = headers, Value = value, Key = key }; }
You can also check it out on GitHub.
Consumer
Let’s now review the StartConsumer
method as well:
private static void StartConsumer(JsonEventFormatter formatter, CancellationToken ct) { var consumerConfig = new ConsumerConfig { BootstrapServers = BootstrapServers, GroupId = "cgid" }; using var consumer = new ConsumerBuilder<string, byte[]>(consumerConfig).Build(); consumer.Subscribe(new[] { Topic }); while (!ct.IsCancellationRequested) { var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(100)); if (consumeResult is null) continue; var consumedMessage = consumeResult.Message; var cloudEventMessage = consumedMessage.ToCloudEvent(formatter); var data = (User)cloudEventMessage.Data; var partition = consumeResult.Partition; var key = consumeResult.Message.Key; var offset = consumeResult.Offset; var dataJson = JsonSerializer.Serialize(data, SerializationOptions); Console.WriteLine($"Partition: {partition} Key: {key} Offset: {offset} Data: {dataJson}"); } }
Not surprisingly, the logic above consumes Kafka messages of type Message<string, byte>
, then uses the CloudEvents SDK method ToCloudEvent
to convert them to CloudEvent objects. That’s the opposite of the ToKafkaMessage
method we reviewed in the previous section. Feel free to check its implementation as well.
Finally, we print the Data
object, which is of type User
, to the console as a JSON string.
Output
Running the program should output something like this:
This only prints the content of the data
field of the CloudEvent.
If you want to check the entire Kafka message, you can modify the program or use a tool like kafkacat:
Summary
In this article, I presented a practical example of using the C# CloudEvents SDK to produce and consume Kafka messages.
Thanks for reading, and see you next time!