Reactive Kafka Poller and Awaitility

Prashant Pandey
9 min readJul 14, 2019

I have been working with Kafka for the past few months. Recently, I programmed a critical part of our system — a Kafka poller that polls the Fulfilment Service (FS). Writing a robust poller while ensuring that records are processed as per the business needs (and parallelising wherever possible) and testing such an asynchronous system is I would say, a bit complex and requires a thorough understanding of how Kafka works (at least its API, if not its architecture). Through this post, I have tried to summarise what I learnt, what worked for me, and what didn’t. It has been a very rewarding experience.

The FS sends us three kinds of events — Order Suspended, Order Concluded and Order Released. A fulfilment is suspended when there is a fraud that is detected in the order. It is released when the fulfilment starts, and concluded when the order reaches the customer. Let S = {Order Released, Order Concluded, Order Suspended}. Let us define a binary relation over S such that and for any a, b in this set, a ≤ b iff a is released before or when b is released. This is a POSET with the following relationships:

Order Released Order Concluded

What this means is that a Order Released event comes before a Order Concluded event, and thus, has to be processed first. There are some critical business flows that get executed while processing the Order Released events, including moving the money to our company’s account. This sanity check is done while processing a Order Concluded event, so if we process it before the corresponding Released event, it would fail.

Since Kafka guarantees intra-partition ordering, it was decided that the FS would send all events for a particular order in a single partition, in-order (the orderId would be used as the record key). Due to these reasons, it became imperative that we process each record in a partition in-order.

Here’re some important points to consider when you are consuming from Kafka:

1. Apache’s Kafka Java client is not thread-safe. So if you thinking of processing the events in parallel, make sure you synchronise access to the consumer. Ideally, all accesses should be synchronised. The easiest way to do this is to use Java’s synchronisation primitive (keyword ‘synchronised’). Make sure all accesses to the consumer are guarded by the same lock.

2. Most of the time, make sure you do not commit as soon as you consume: if the consumer dies in b/w and the record was not processed totally, you won’t be able to get that record again since you had already committed.

3. Kafka has two versions on commit: commitSync and commitAsync. If your use-case demands high throughput (and if you can afford reprocessing records), then use commitAsync. The reason is simple: The async version does not wait for the commit to be successful, and won't retry (it does accept a callback though, if you want to retry). Retrying this is dangerous - what is you fail to commit, but the next commit for that partition was successful. Now if you retry the previous commit, you'll move the offset back. When a new consumer is assigned that partition, he'll reprocess some messages. The sync version on the other had retries until the commit is successful - this will block your thread, leading to reducing your processing throughput but will ensure that the chances of you reprocessing a message are low.

4. Important Kafka configurations (especially used for testing): session.timeout.ms and max.poll.interval.ms. Kafka has a heartbeat thread and a processing thread. Using the heartbeat thread, a consumer can send a heartbeat to the broker b/w two consecutive polls. So if the whole consumer dies, it takes just session.timeout.ms to detect is. If just the processing thread dies, then it takes max.poll.interval.ms to detect it. maxBlockTime is the time a producer waits for a message to publish successfully (update metadata) before it throws an exception. If it is too high, your processing thread may get stuck and a partition rebalancing could take place. I faced this issue during test, where I had to fine tune these variables for my tests to behave as I expected.

5. Please check if you can parallelise processing across partitions. Kafka gives no guarantee of inter-partition ordering anyway — better process each on a separate thread. But keep in mind that you might have to process records in a a partition in order. So you can’t process them in parallel. This was exactly our requirement while writing the FS Kafka poller.

5. For FS’ Kafka listener, here’s what we wrote in the polling loop:

private void consume(FsKafkaConsumerProperties properties) {
while (true) {
ConsumerRecords<String, String> records;
records = fsKafkaConsumer.poll(Duration.ofMillis(properties.getPollTimeout()));
Flux.fromIterable(records.partitions())
.flatMap(
topicPartition -> Mono.just(records.records(topicPartition)).subscribeOn(Schedulers.parallel())) //1
.flatMap(this::processPartitionInSequence)
.publishOn(Schedulers.parallel())
.collectList()
.doOnNext(a -> logBatchProcessingFinish(records))
.block(); // don't consume the next batch until you process this one
}
}

private Flux<ConsumerRecord<String, String>> processPartitionInSequence(
List<ConsumerRecord<String, String>> partitionRecords) {
// concatMap ensures the list is processed in sequence
return Flux.fromIterable(partitionRecords).concatMap(this::process);
}

We wanted to process a partition in order, but different partition can be processed in parallel. So, we emit each partition on a separate thread (1), and then each record on that thread is processed in order (by using concatMap instead of flatMap). Note that we don’t consume the next batch unless we process each record of the current batch (that’s why we used a .block()). Using .block() also allows us to call fsKafkaConsumer.poll(..) without guarding any access - we are sure that when the consumer is being used for polling, no other thread would be using it.

6. What happens to those messages that failed processing for some reason? This depends upon the business requirement — if you cannot afford losing that, then push it to a DLQ (Dead Letter Queue) and commit and move ahead. Resolving the DLQ generally requires manual intervention. Here’s Uber’s excellent article on such an architecture: https://eng.uber.com/reliable-reprocessing/. A more mature architecture would be to have separate DLQs based on what service failed: Identity, Payment, Price, Fulfilment, etc. Each queue can have a processor. In the end, there can be a DLQ that is resolved manually. Records can be pushed into it when the previous DLQ processors fail again.

7. Make sure that the retention period of the DLQ is high enough. If you lose messages here, they’re gone.

8. Here’s our processing chain:

Mono.just(model)
.flatMap(this::processRecord)
.onErrorResume(
err -> {
logProcessingError(err);
model.err(err);
return Mono.just(model);
})
.flatMap(this::publishEventToDLQ)
.onErrorResume(err -> Mono.just(record))
.doOnNext(this::commit)
.onErrorResume(
err -> {
logCommitFailure(err);
return Mono.just(record);
})
.subscriberContext(context);

We try to process the record. In case some error happens, log it and publish the record to the DLQ. If publishing fails, then still move ahead and try to commit. If commit fails, log and move ahead. Perhaps we can raise an alert whenever more than 10 messages are pushed into the DLQ in a short duration. This depends upon the business.

9. Create an identifier (for example, a random UUID)to make a distinction b/w each record that you consume, even if they are duplicates, and put it into the subscriber context and the MDC. Do not depend upon the event to send a unique identifier (for example, ‘eventId’). In our case, the FS was sending different events with the same ‘traceId’ and the same ‘eventId. It was leading to very messy logging. I can’t emphasize on how important it is — a God-send while debugging.

10. We can maintain metadata of all the events that have been processed in a persistent file storage. In our case, each order document contains a metadata section. When we process a message successfully, we update the metdata with the ‘eventId’. As soon as we start processing the event, we do a sanity check if the metadata already contains the ‘eventId’. If yes, then the event has already been processed. We don’t process it.

10. A very clean way to handle all the mess that comes with deserialising events (in case you are consuming a large variety of events) — write a custom deserialiser. GSON lets us write your own deserialiser and register with it. Do whatever sphagetti code you want in there.

Testing

Testing asynchronous systems can be difficult. Let us see how we tested the Order Released events flows (L1 integration). Here’s a brief summary of the flow:

1. An Order Released event is consumed.

2. A sanity check if done to see if the event has been processed already. If yes, then don’t process it.

3. A ‘getPayment’ call is made and a sanity check is done to verify that the payment has been charged.

4. The payment is finalised (moved in the company’s account)

5. The corresponding invoice is marked as ‘Paid’.

6. The order document is updated with the ‘eventId’ in its metadata section. This means that the event has been processed.

7. The updated order is persisted in the database.

8. If any of the above steps fail, the event is pushed into the DLQ.

This is how we tested the happyPath:

// send the event
// Wait till the document metadata is updated
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.until(
() ->
isMetadataUpdated(
"orderId",
"eventId"));

assertPaymentFinalised("e8756088-7959-46c8-8f14-eef5e6c74cae");
assertInvoicePaid("93042a7c-28a3-4195-8523-b45104d7ad8b");

// assert that the message is not published to the DLQ
assertFalse(failedEventIds.contains(eventId));

We have a Kafka poller running on a separate thread that listens to the DLQ and populates the events into a a list (failedEventIds). The last step in the happy path is the persist, so we wait for at most 30 seconds and check if the metadata section of the order document has been updated. For this, we use a library called 'Awaitality'. But for this library, we would have done a Thread.sleep(30000). The downside to this approach is would make the thread sleep for 30 seconds every time - even if the metadata section was updated in 5 seconds. On the other hand, Awaitality continually polls the given boolean condition and breaks out as soon as it is satisfied.

Now since it has been asserted that the document metadata section was updated, we can assert that the payment was finalised and the invoice was marked paid and the event was not pushed into the DLQ. Note that metadata is updated in the last step, so all the other steps are guaranteed to have been executed before it.

Let us take a negative scenario, when the order is in an invalid state (the order needs to be ‘Checkedout’ for the processor for process this event).

// Wait till the dead letter is published
Awaitility.await()
.atMost(new org.awaitility.Duration(5, TimeUnit.MINUTES))
.until(() -> failedEventIds.contains(eventId));

/* verify that the order is not processed by asserting that document metadata does not contain
the event id*/
assertTrue(isMetadataNotUpdated(orderId, eventId));

Here, since the processing failed, we wait for the event to be pushed into the DLQ. Then we assert that the document metadata should not be updated, since the event processing failed.

Let us test our actual Kafka poller. Here’s what happens in it:

1. The event is consumed.

2. The event is deserialised into one of the known event types.

3. It is processed.

4. If the processing fails, it is pushed into the DLQ, and a commit is done.

5. The publish to the DLQ fails, still the event is committed.

Here’s how we test the scenario corresponding to step 5 (DLQ publish itself fails).

// FS uses the orderId as the partition key to Kafka
ListenableFuture<SendResult<String, String>> sendResultListenableFuture =
kafkaTemplate.sendDefault(
"orderId", eventToSend);

SendResult<String, String> sendResult = sendResultListenableFuture.get();
int recordPartition = sendResult.getRecordMetadata().partition();
long offset = sendResult.getRecordMetadata().offset();

// consumer.committed(....) returns null if there was no prior commit to a partition. So wait
// till there is a commit
Awaitility.await()
.atMost(new Duration(30, TimeUnit.SECONDS))
.until(
() ->
null
!= testConsumer.committed(
new TopicPartition("FULFILMENT_ORDER", recordPartition)));

OffsetAndMetadata offsetAndMetadata =
testConsumer.committed(new TopicPartition("FULFILMENT_ORDER", recordPartition));

// assert that the last committed offset was equal to the offset of the consumed message
assertEquals(offset, offsetAndMetadata.offset());

Our Kafka poller commits a message even when it could not be processed. So, wait till it is committed (for at most 30 seconds). Then assert that the last committed offset for that partition was equal to the offset of the message that was consumed. Note that we’ll have to simulate a DLQ publish error. This can be done by giving the wrong server location in the configuration file.

That’s all folks, for this post. Let me know if you have any suggestions to improve upon the poller.

--

--