Project Reactor’s flatMap And Backpressure
In the previous post, we looked at the basics of how flatMap works. This post can be considered an addendum to that post.
flatMap has two other overloaded versions:
- flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
- flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)
In this post, I’ll be covering the first variant (the 2nd one is easy to understand once you understand the first variant). Concurrency is the number of inner streams flatMap subscribes to simultaneously. This is also the number of events that flatMap requests from the upstream, and defaults to 256. Consider the code below:
Here’s how console log:
19:02:03.447 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:02:03.472 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:02:03.474 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
19:02:03.476 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)
19:02:03.476 [main] INFO reactor.Flux.Range.1 - | request(256)
19:02:03.476 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:02:03.526 [main] INFO reactor.Flux.FlatMap.2 - onNext(1)
19:02:03.527 [main] INFO reactor.Flux.Range.1 - | onNext(2)
19:02:03.527 [main] INFO reactor.Flux.FlatMap.2 - onNext(2)
19:02:03.527 [main] INFO reactor.Flux.Range.1 - | onNext(3)
19:02:03.527 [main] INFO reactor.Flux.FlatMap.2 - onNext(3)
The first request from downstream (the subscriber) is unbounded. But flatMap limits it to 256 (many reactive operators alter the downstream demand like this, for example, buffer).
Let us change this demand:
Here’s the console log:
19:25:36.943 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:25:36.972 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:25:36.975 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
19:25:36.976 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)
19:25:36.977 [main] INFO reactor.Flux.Range.1 - | request(10)
19:25:36.978 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:25:37.017 [main] INFO reactor.Flux.FlatMap.2 - onNext(1)
19:25:37.018 [main] INFO reactor.Flux.Range.1 - | onNext(2)
19:25:37.018 [main] INFO reactor.Flux.FlatMap.2 - onNext(2)
19:25:37.018 [main] INFO reactor.Flux.Range.1 - | onNext(3)
19:25:37.018 [main] INFO reactor.Flux.FlatMap.2 - onNext(3)
As you can see, the demand changes to 10.
A publisher can publish events only after a demand has been signalled from downstream. This fact along with the concurrency parameter can be used to implement a stop-and-wait protocol. Consider a use-case where the stream comprises batches, and we have to consume the next batch only after the current batch has been processed successfully.
Here’s the console log:
19:46:59.699 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
19:46:59.705 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
19:46:59.706 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)
19:46:59.706 [main] INFO reactor.Flux.Iterable.1 - | request(1)
19:46:59.707 [main] INFO reactor.Flux.Iterable.1 - | onNext([1, 2, 3])
19:47:00.293 [parallel-1] INFO reactor.Flux.FlatMap.2 - onNext(1)
19:47:00.798 [parallel-2] INFO reactor.Flux.FlatMap.2 - onNext(2)
19:47:01.300 [parallel-3] INFO reactor.Flux.FlatMap.2 - onNext(3)
19:47:01.301 [parallel-3] INFO reactor.Flux.Iterable.1 - | request(1)
19:47:01.301 [parallel-3] INFO reactor.Flux.Iterable.1 - | onNext([4, 5, 6])
19:47:01.805 [parallel-4] INFO reactor.Flux.FlatMap.2 - onNext(4)
19:47:02.310 [parallel-5] INFO reactor.Flux.FlatMap.2 - onNext(5)
19:47:02.813 [parallel-6] INFO reactor.Flux.FlatMap.2 - onNext(6)
19:47:02.813 [parallel-6] INFO reactor.Flux.Iterable.1 - | request(1)
19:47:02.813 [parallel-6] INFO reactor.Flux.Iterable.1 - | onNext([7, 8, 9])
19:47:02.815 [parallel-6] INFO reactor.Flux.Iterable.1 - | onComplete()
19:47:03.318 [parallel-7] INFO reactor.Flux.FlatMap.2 - onNext(7)
19:47:03.824 [parallel-8] INFO reactor.Flux.FlatMap.2 - onNext(8)
19:47:04.328 [parallel-1] INFO reactor.Flux.FlatMap.2 - onNext(9)
19:47:04.328 [parallel-1] INFO reactor.Flux.FlatMap.2 - onComplete()
As you can see, a request is made for 1 batch. Only after that batch has been processed (by emitting onComplete in the inner stream) is when flatMap makes the next request and receives the next batch.
This pattern can be used in cases for where publisher is, for example, a generator function that consumes from an event bus like Kafka, and you don’t want to interweave events from two consecutive batches because of strict ordering requirements.
One final thing about flatMap is that it uses a prefetch-and-replinish strategy, requesting a new batch after 75% of the initial demand has been met. Consider the following example:
Here’s the console log (trimmed):
19:55:41.317 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:55:41.321 [main] INFO reactor.Flux.Range.1 - | request(10)
19:55:41.322 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:55:41.374 [main] INFO reactor.Flux.Range.1 - | onNext(2)
19:55:41.374 [main] INFO reactor.Flux.Range.1 - | onNext(3)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(4)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(5)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(6)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(7)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(8)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | request(8)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(9)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(10)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(11)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(12)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(13)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(14)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(15)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(16)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | request(8)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(17)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(18)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(19)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(20)
19:55:41.375 [main] INFO reactor.Flux.Range.1 - | onNext(21)
19:55:41.376 [main] INFO reactor.Flux.Range.1 - | onNext(22)
You see, after 8 (10 * 0.75) events have been published, the next batch is requested, with a demand of 8. Then again after 15, and so on. From Reactor’s docs, “This is a heuristic optimization made so that these operators proactively anticipate the upcoming requests.”.
Hopefully this article helped you understand the subtlities of flatMap. You can read the first part here.
References: