{"id":230,"date":"2024-01-19T19:34:19","date_gmt":"2024-01-19T19:34:19","guid":{"rendered":"https:\/\/devperfops.smcwpsites.com\/?p=230"},"modified":"2024-01-19T19:34:26","modified_gmt":"2024-01-19T19:34:26","slug":"opentelemetry-and-kafka-better-observability-of-your-tests","status":"publish","type":"post","link":"https:\/\/devperfops.smcwpsites.com\/opentelemetry-and-kafka-better-observability-of-your-tests\/","title":{"rendered":"OpenTelemetry and Kafka – better observability of your tests"},"content":{"rendered":"\n
Author’s notes: The code produced in this tutorial represents my journey of learning Golang by developing a complete, though basic, application. If you think it can be improved – let me know. <\/p>\n\n\n\n
If you’ve read my previous article, you’re likely familiar with how to instrument your JMeter tests using OpenTelemetry. That’s awesome! Can we get even more of them telemetries? Well, hold on to your socks! <\/p>\n\n\n\n
The problem of problems<\/strong><\/p>\n\n\n\n Some time ago, I asked my performance community how they run and measure their Kafka and MQ flows. The answers did not surprise me really but the state of understanding these protocols, did. According to the comments under my post, most of you prefer to create a set of producers and consumers, whose sole purpose is to generate the load and perform some basic assertions. What about the latency? “We measure that in the application”<\/p>\n\n\n\n Well according to this<\/a> article (highly and shamelessly recommended), here’s a picture of what you can actually measure from your application*<\/p>\n\n\n\n *unless you’re sneaking in some timestamps and measurements inside your payload\/headers<\/p>\n\n\n\n What are you actually missing<\/strong><\/p>\n\n\n\n The mentioned approach of consumers-producers only has a few significant flaws:<\/p>\n\n\n\n Test application<\/strong><\/p>\n\n\n\n I took some time to learn practical Go development (thanks, ChatGPT, for not assisting with my null pointer and channel overloading errors), and it has been a fun journey.<\/p>\n\n\n\n Because the first application you write has to be meaningful, practical and world-changing, I designed an app that can wait for 25 seconds in just half a second. That’s right – I made the waits faster with the help of concurrency. So now, whenever you can’t decide whether you should use thread.sleep() or thread.wait() with notification, you can just send your request to my application and it does all the waiting for you. All it takes is a :<\/p>\n\n\n\n The architecture diagram of the app is very simple. I implemented the message bus pattern to simulate the behaviour of a commonly known Service Bus application, but also to allow high availability of my service in the future.<\/p>\n\n\n\n Golang channels can effectively act as queues, with a small exception that once the channel is full, the goroutine trying to put new object to the channel will be blocked until the channel’s capacity is freed.<\/p>\n\n\n\n Number of consumers, producers, waiter threads and channels capacity is parameterised, which makes it a perfect application to practice your performance improvements on.<\/p>\n\n\n\n But enough about the app – you’re here for the good stuff.<\/p>\n\n\n\n Apache Kafka – what do you need to know<\/strong><\/p>\n\n\n\n Apache Kafka is a distributed message broker system emphasizing message persistence and high scalability. Messages in a pre-defined format (serialized), are sent to the kafka broker via Kafka Producer. Once the message is persisted on disk (or many disks), it can be read, using Kafka Consumer (and then deserialized).<\/p>\n\n\n\n Producers and consumers never communicate directly and they’re independent entities. Behaviour of consumers will not affect the behaviour of producers and vice versa – with the exception of producing a message. The consumer is not obliged to read a message and the producer will not wait until the last message it sent will be consumed.<\/p>\n\n\n\n Each topic (message channel) can be split into partitions. These partitions are the main scalability factor as they allow processing concurrency. The most common way to handle multiple partitions is by using consumer groups. Consumer group is a synchronisation mechanism between the consumers to guarantee at least once delivery of messages. Each topic partition is assigned to exactly one consumer from a group and can be read from.<\/p>\n\n\n\n Kafka broker stores the offset of the consumer group for each partition. This offset is a bookmark of where the consumer has finished reading its partition and commited it’s state. Once the partition is assigned to a new consumer, it will pick up the messages from the last commit offset. Offset from the topic end (last message put to the topic partition) is commonly referred to as topic lag. Topic lag is one of the key metrics to monitor during Kafka operations. If the lag is too high, it usually means problems.<\/p>\n\n\n\n Instrumenting Producers (Test)<\/strong><\/p>\n\n\n\n Producer in my example is very straightforward. The scenario assumes one producer per thread so all the producer calls are synchronous, i.e. each request waits for acknowledgment before sending next one. It’s important to note that Kafka Producers are by nature asynchronous and allow batching of requests before they’re sent to broker. When designing your kafka test it’s important to understand how your customers will be making use of it because your e-2-e for each transaction will change drastically. If they batch too many messages for a single partition, it’ll most likely generate a lag on your consumer side. If your broker\/replication is too slow, the delay will be notable on the producer side.<\/p>\n\n\n\n <\/p>\n\n\n\n Once the Producer is created, it’s ready to send messages. Using the code from my previous article, I’m adding pre- and post-processors to create a span – now it’s accessible to me as “currentSpan” object in JMeterVariables (vars)<\/p>\n\n\n\n Send Kafka Request code:<\/strong><\/p>\n\n\n\n Let’s break this code down<\/p>\n\n\n\n we refer to the producer we created in the first step. We now have exclusive access to this producer, given our test configuration<\/p>\n\n\n\n Then we need to create a ProducerRecord we want to send. I’m creating a random UUID as the record key, the value of my record (message body) is my name. Not very GDPR compliant but I’m not going to complain.<\/p>\n\n\n\n Next I’m adding two headers – one of them, very important for our excercise- traceparent. This is a W3C compliant header of Context Propagators API<\/p>\n\n\n\n<\/figure>\n\n\n\n
\n
\n
import org.apache.kafka.clients.producer.KafkaProducer;\nimport org.apache.kafka.clients.producer.Producer;\nimport java.util.Properties;\n\n\/\/ Set up the producer configuration\nProperties props = new Properties();\nprops.put(\"bootstrap.servers\", \"localhost:29092\"); \/\/ Kafka broker address\nprops.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\nprops.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\nprops.put(\"acks\", \"all\");\n\n\/\/ Create the producer\nProducer<String, String> producer = new KafkaProducer<>(props);\n\nvars.putObject(\"producer\", producer);<\/code><\/pre>\n<\/div>\n\n\n\n
<\/figure>\n\n\n\n
import org.apache.kafka.clients.producer.ProducerRecord;\nimport org.apache.kafka.clients.producer.RecordMetadata;\nimport io.opentelemetry.api.trace.Span;\n\ndef producer = vars.getObject(\"producer\");\ndef record = new ProducerRecord<>(vars.get(\"requestTopic\"), UUID.randomUUID().toString(), \"Kuba\");\nrecord.headers().add(\"traceparent\", vars.get(\"traceparent\").getBytes());\nrecord.headers().add(\"responseTopic\", vars.get(\"responseTopic\").getBytes());\ndef span = vars.getObject(\"currentSpan\");\nspan.addEvent(\"send-to-topic-start\");\ndef recordFuture = producer.send(record);\nspan.addEvent(\"send-to-topic-end\");\n\n\/\/this piece here waits for acks\nrecordFuture.get();\nspan.addEvent(\"send-to-topic-ack\");\n<\/code><\/pre>\n\n\n\n
def producer = vars.getObject(\"producer\");<\/code><\/pre>\n\n\n\n