The Gatherer API
Introducing Gatherers
Starting with the JDK 24 you can use a specific API to model your intermediate operations in the Stream API, called the Gatherers API. This model is built in the same kind of way as the Collector API for terminal Stream operations.
Why does the Stream API needed such a feature? The Steam API is a very rich API, that gives you many ways of processing in-memory data, following the map-filter-reduce pattern. The Stream API also exposes the Spliterator API, to further enrich what the API is doing. The versality of these two patterns gives many possibilities, if not every possibility. The only drawback is that the Spliterator API is not easy to use and does not lead to simple code nor readable. Plus, if you need to leverage parallel streams, it may become very tricky to use.
Organization of the Gatherer API
The Gatherer API brings simpler patterns than the Spliterator API, with excellent support for parallelism. In fact, you can use a gatherer that does not support parallelism, in a parallel stream, and still benefit from the performance parallel streams brings you. This is something that the Spliterator API does not give you.
The Gatherer API is built on two main elements: a Gatherer interface and a Gatherers factory class. There are also a number of interfaces used to interact with gatherers, as well as classes to implement these.
A Gatherer is an object that you can pass to a method of the Stream interface: gather(). This gather() method is an intermediate operation of the Stream API, and this object models what this intermediate operation is doing.
When to Use Gatherers?
The Gatherer API is not a simple API, and you should not use it unless you have good reasons to do so. Even if it can model the simplest Stream operations, like map, filter, or flat-map, building a gatherer for that would be complex, and would lead to hard to understand code. Your good old map() method does a mapping in a simple way, so this is what you should choose for your application.
The Gatherer API is there to create complex operations, that are not already available in the Stream API. Here are some examples.
- Create a stream made of fixed sized lists of consecutive elements from that stream.
- Create a distinct-like operation, with a custom way of comparing elements.
- Create a complex map-filter operation, maybe involving some flat-mapping and some optional handling, that you want to fuse in a single operation, properly named, to make your code more readable.
There are a number of things that the Gatherer API can do, that you are going to discover one by one in this section. Let us start with how you can integrate elements to a downstream, given what you get from a upstream.
Integrating Elements in a Downstream
Learning how the Gatherer API is working is probably a frustrating process, as you need to go through a series of simple examples that you should not use in your application. The first examples that this section covers are about mapping, filtering, or flat-mapping streams, operations that you can conduct with the classical Stream.map(), Stream.filter(), or Stream.flatMap() methods. These examples have no other goals than to show you the different elements that compose a gatherer in a simple way, and learn how you can use them.
Fixing Some Vocabulary
Let us fix some vocabulary first. A gatherer models an intermediate operations on a stream. As such it operates on a stream, consumes the elements this stream produces, do something with them, and pushes (or not) elements to a downstream.
Let us call the first stream the elements are coming from the upstream, and the stream this gatherer pushes elements to the downstream.
Using an Integrator to Map a Stream
Let us start with a simple first example: the mapping of a stream of strings of characters.
Writing a gatherer is about implementing the Gatherer interface, which turns to be a functional interface. You can implement it with a lambda, and you will see how to do it in a few minutes. But you can also create a gatherer with one of the few factory methods available on this interface.
For a simple gatherer you can use the Gatherer.of() method, that takes an Integrator as a parameter. This method has overloads that are covered later in this part. So to write the gatherer you need, you first need to write this integrator.
The role of an integrator is to consume elements from the upstream, and to push elements to the downstream. These elements could be the same, of the same type, or not.
At this point, you can see that an integrator needs two elements to work with: an element from the upstream, and an object to model the downstream, with a way to push elements to it. It turns out that an integrator works a third element, which is an internal state, that you can define within your gatherer implementation. This state is covered later in this section, we are going to leave it for now.
The following code shows you how you can define an integrator, a gatherer, and how you can use it in a stream. This gatherer does not do anything, but it shows you how you can put all these elements together.
Running this code produces the following.
result = [one, two, three]
The first line is the following.
It is an implementation of the abstract method you can find on the Integrator.
The example you wrote defines an integrator of type <Void, String, String>. The first type is the type of the internal state that we are not using here. The second type is the type of the upstream. The third type is the type of the downstream.
Note that this method returns a boolean. Fortunately the downstream.push() method also returns a boolean, reason why you can compile and execute the example. This boolean is very important, and its exact role is covered later in this section.
You can now write a mapper that does some mapping (instead of the identity function). Let us put this gatherer in a method, so that you can pass your mapper as an argument.
Running this code produces the following result.
result = [ONE, TWO, THREE]
You can change the function on line 8 to the following. Note that, thanks to the use of var you do not need to change the type of the gatherer you produce.
The result then becomes the following.
result = [3, 3, 5]
Using a Gatherer to Filter a Stream
A gatherer can decide to push an element to a downstream or not. In that case, it can act as a filtering operation. Writing such a gatherer is a slight modification of the mapping gatherer you wrote in the previous section.
Running this code produces the following.
result = [one, two]
Note that there is a return true on line 7 of this code. This may not be quite right. You will see the correct code you need to put here later in this section.
Using a Gatherer to Flat-Map a Stream
The third basic operation that you may have in mind is of course the flat-map operation. Mapping a stream does not change the number of elements this stream processes. Filtering it, can reduce this number, even push it to 0, where the flat-mapping can either reduce it, or increase it.
Suppose that your flat-map operation is about creating a stream of the letters of a string of characters. You can write it in this way.
Running this code produces the following.
result = [o, n, e, t, w, o, t, h, r, e, e]
Note that the line 5 of this code: return true looks very suspicious. And indeed, this is not how you should manage the boolean returned by the downstream::push call. You will see why this code is wrong, and how to fix it later in this section.
Fusing Mappings, Filterings and Flat-Mappings
So far all the gatherers you saw are examples that would be way too complex to use in production. The equivalent methods from the Stream API are much simpler and will make your code much more readable.
If you are really after performance, fusing these operations into a single gatherer can give you better results than chaining several method calls on the Stream API. The readability of your code may suffer, so this is really a matter of choice, taking into account the context of your application.
Let us fuse the three operations from this section into a single gatherer.
Running this code produces the following result.
result = [O, N, E, T, W, O]
Once again, take this example with a grain of salt, as the return true on line 11 is not the right code. More on that later in this section.
Why could this code run faster? Because it creates fewer objects than the equivalent stream code. Each stream method call creates a new Stream object, that represents an overhead. In this code, there are only two Stream objects created, which may be an optimization.
In any case, you need to measure if this kind of code brings you any gain, on a production environment, before choosing to use it.
Managing an Internal Mutable State
The second thing a Gatherer can do is managing an internal mutable state. Several intermediate operations use a mutable state to work in the Stream API.
limit()has to count the number of elements that it processed before interrupting the stream.distinct()needs to store the elements it already saw in a hashset.dropWhile()has to switch an internal state to know if it is open or not.
There is a principle in the Stream API, which is that you should never mutate an external mutable state from within a stream. There are two reasons for that. First: doing so could prevent some optimizations to be applied to streams. And second, it would prevent you from using parallel streams, as it would create race conditions if you are not cautious.
Ignoring the First Elements of a Stream
Let us first create a dropWhile() like behavior with a gatherer.
You may have seen that we ignored the first parameter of the integrators we wrote in the previous section. Well, now is the time to take them into account. A gatherer that manages an internal mutable state needs to know this current state to decide what to do. So the integrator takes this state as its first parameter. Since this state is mutable, an integrator can mutate it, and if it does so, the next time it is called will see this modification.
But that is not all: you also need a way to create the initial instance of this mutable state.
If what you need is a mutable collection for instance, then you can just use an implementation provided by the JDK. It does not have to be thread safe, so ArrayList can do. If what you need is a counter, or a boolean, then you need to create your own mutable wrapper on this counter. Using an atomic variable would be overkill, as you do not need any thread safety. In this case, having just an integrator is not enough: you also need a supplier that the Gatherer implementation can call to create this initial mutable container. In the Gatherer API, this supplier is called an initializer.
Let us create a gatherer that can open the gate to let the elements flow on a predicate.
Running the previous code prints the following.
result = [4, 5, 4, 3, 2, 1]
The first element you can see in this dropWhile() method is the definition of your mutable box. It's a simple local class, that you can declare within your method. In a next step you will see how you can make it an anonymous class. Then you can write a Supplier to create an instance of this Box class. This supplier is called by the implementation of your gatherer, just once.
The implementation of your integrator is different, and receives the instance of Box that has been created. Since it is mutable, you can do whatever you need to do with it. Here, we just switch the boolean it wraps when the predicate becomes true, and in that case push the elements to the downstream. Note that this integrator always return true, which is actually not the right thing to do. We will fix this later.
The biggest difference is that you now need to call the Gatherer.ofSequential() factory method instead of the simple Gatherer.of() factory method. Using ofSequential() instead of of() tells the API that this gatherer cannot be used in parallel. You will learn more about parallel gatherers later in this section, and why you cannot use this gatherer in parallel.
You can write this gatherer in a different way, using anonymous classes, and leveraging non-denotable types.
Running the previous code gives you the same as previously.
result = [4, 5, 4, 3, 2, 1]
This code is working because an anonymous class is compiled as a non-denotable type, which is preserved as long as you do not put this variable in a non-inferred typed variable. Here, the type of the box argument on line 4 is inferred by the compiler, this type is preserved, and you can access the field on your anonymous class.
Removing the Duplicate in a Stream
Let me take you through a second example, which consists in implementing a distinct() like behavior.
Everytime you consume an element, you need to check if it has already been seen or not. And in that case, you should not push it to the downstream. You can use a regular HashSet as your mutable state, with no need to wrap it, nor make it thread-safe as it will be used in single thread.
One of the nice features of the Set.add() method is that it returns true if the object has been added, meaning it was not already in the set, and false if it had not. So you can use this feature in that case.
Let us create this gatherer.
Running the previous example prints the following.
result = [1, 2, 3, 4, 5]
Once again, the return true on line 8 is not the right code, you will fix it later.
Interrupting a Stream
Interrupting a stream is something that can be done by several intermediate operations, including limit() and takeWhile().
Suppose that you have a list with one million elements, and you call the following code.
It would be very inefficient to process all the elements from ints, reason why this limit() call has the capacity to tell its upstream that it is not going to process any more element. It turns out that this upstream is returned by a map() call. The implementation of this operation should manage this interruption and push it to its upstream. This upstream pulls elements from the list, so it should stop doing that.
The Gatherer API supports this behavior. It is able to check if its downstream still accepts more elements, and to transmit this information to its upstream. If this gatherer decides that it should interrupt the stream, it can also pass this information to its upstream.
You can check if your downstream still accepts elements in two ways.
- You can call
downstream.isRejecting()that returnstrueif the downstream does not accept any more element. - You can also check the returned value of
downstream.push(). It if it is false, then your downstream does not accept any more element.
Limiting the Number of Elements Pushed to the Downstream
Let us implement a limit() operation. Implementing such a feature requires a mutable counter, and an integrator that will both increment this counter, and check if the limit is reached. You can write the following code.
We are done returning true. Now you can see that this code correctly processes what the downstream is telling it.
Running this code produces the following.
result = [1, 2, 3]
On line 8, you first need to check if the downstream is still accepting more elements or not. If it does not, there is nothing you need to do, and you can just return false to notify your upstream. Note that a downstream should not switch its state from rejecting to non-rejecting.
Then on line 12, you push one more element, get the returned value of the downstream.push() call, and immediately transmit it to your upstream.
Then if you reach line 14, it means that you have pushed enough elements to your downstream, and that you are done with it. So you return false no matter what.
Fixing the Distinct Gatherer
With this in mind, you can now fix the previous examples that were left with some return true as placeholders. Here is the correct version of the Distinct example.
Running this example produces the following.
result = [1, 2, 3, 4, 5]
It follows the principles of the Limit gatherer.
- Check if your downstream is accepting any more elements. If not, then return
false, and don't do anything. - Then if you decide to push something to the downstream, and return the value that is given to you.
- If you decide not to push anything, but that you may push something in the future, then return true. If not, then return false.
In any case, you need to keep in mind that once you returned false, you should not decide to return true again. As the downstream of your upstream, you need to follow the rule that a downstream can never switch from the non-accepting to the accepting state.
Bytheway, if everything is working properly, once it returned false, your integrator should not be called anymore.
Pushing to a Rejecting Downstream
What can happen if you ignore what your downstream is telling you, and continue pushing elements to a downstream? Well nothing actually. The elements you push are silently ignored. No exception is raised if you do so. It is just a waste of resources, and an overhead you could avoid. In some cases this overhead may be very costly.
Creating Greedy Integrators
You have two factory methods on the Integrator interface.
Integrator.of()that takes a plainGatherer.Integratoras a parameter.Integrator.ofGreedy()that takes aGatherer.Integrator.Greedyas a parameter.
There are two things that you need to keep in mind:
Gatherer.Integrator.Greedyis an extension ofGatherer.Integrator.- Both interfaces can be implemented with the same lambda expressions.
So you can write the following. These two integrators are strictly equivalent.
And you can also write the following. These two integrators are also strictly equivalent.
Moreover, these four integrators work the same. From a behavior point of view, there is no difference between them.
Defining an integrator as greedy means that this integrator never chooses to interrupt a stream on its own. It always transmits the rejecting state of the downstream it pushes elements to. So internally, the Gatherer API can take that into account and can activate some optimizations to execute your gatherer. If you do not follow the contract of a greedy integrator your code may fail. If you have a plain integrator that has a greedy behavior, your code will not fail, but you may miss some optimization opportunities.
So in a first step, you could choose to ignore this greedy behavior, and stick to plain integrators. Once you have set up your data processing pipeline you can then revisit each step and make the corresponding integrators greedy.
Adding a Finisher
There are cases where your gatherer needs to wait for all the elements of its upstream to start pushing elements to its downstream. This is the case if you need to create a gatherer that sorts the elements of your upstream. You cannot start producing elements until you have seen them all.
Such a gatherer needs to add all the elements it sees to an internal buffer. Then, when there is no more element pushed to its integrator, push these elements to the downstream. So far you have not seen any solution to do that. Nothing is telling your gatherer that no more elements are coming from the upstream. Actually, the upstream pushes elements to the integrator of your gatherer, and when there is no more element, nothing happens.
This is why the Gatherer API gives you another element, called a finisher, that is called by the implementation when no more elements are to be pushed to your integrator.
This finisher can be seen as a simplified integrator.
- It takes the current state of your gatherer, if you defined one by providing an initializer.
- It does not take the current element of the stream, just because when it is called, there is no more element, precisely.
- It takes the downstream so that you can push elements to it.
- It does not return anything, since after this finisher is called, nothing can happen anyway.
So a finisher takes two parameters: your state and the downstream, and does not return anything. It is modeled by a BiConsumer<State, Downstream>.
Creating a Finisher with no State
Let us write a simple finisher, in a case where your gatherer does not define any internal mutable state.
Here we define a mapping gatherer, and, for some reason, we need to add "DONE" at the end of the stream. The finisher you have is a biconsumer, but since this gatherer does not define any state, the first parameter is of type Void, and is ignored.
Running the previous code produces the following.
result = [ONE, TWO, THREE, DONE]
Creating a Sorting Gatherer with a Finisher
Sorting a stream requires to add all the elements to a buffer before starting to push them to the downstream. This buffer is the internal mutable state of your gatherer. The role of the integrator is to store all the elements to this buffer. Pushing the elements is then done by the finisher.
You can then write the following code.
Running this code prints the following.
result = [one, six, two, five, four, seven, three]
Several points are worth noting in this gatherer.
- The elements are added to a
TreeSetso that they are kept sorted. - The integrator checks if the downstream is accepting elements. If not, there is no need to buffer anything. Since this integrator does not push any element, it probably means that the downstream is a call to
limit(0L)or some other weird thing like that. - The integrator does not push any element to its downstream.
- The finisher has to check if the downstream is still accepting elements after each call to
downstream.push(). UsingtakeWhile()proves very useful here.
Parallel Gatherers
Executing Gatherers in Parallel Streams
One of the amazing features of the Stream API is that you can distribute your computations among all the cores of your CPU by just calling parallel() on an existing stream.
Gatherers support parallel stream in two ways.
When you create a gatherer with one of the Gatherer.of() factory methods, you create a gatherer that can be called in parallel.
On the other hand, when you use the Gatherer.ofSequential(), then you create a gatherer that does not support parallelization.
What does it mean to be called in parallel, or to not support parallelization? In the Stream API, if you decide to make your stream a parallel stream, then all the operations of your stream will be executed in different threads, in parallel. In a nutshell: a parallel stream splits your source of data into several chunks, and each chunk is processed in its own thread, on the different cores of your CPU. Once everything has been processed, you end up with partial results that need to be merged into one. So each intermediate operation is executed in parallel, without having to synchronize on what is happenning in other threads. There are exceptions to that, as some intermediate operations do need to synchronize. This is the case for limit(), takeWhile(), disctinct(), sorted(), and some others.
This does not hold for the Gatherer API. If you use a gatherer in a parallel stream, two things can happen.
- Your gatherer does support parallelism. In that case, everything is executed in parallel, as you expect.
- Your gatherer does not support parallelism, it is called a sequential gatherer. You created it using one of the
Gatherer.ofSequential()methods, and such gatherers can only be used sequentially. In that case, all the operations before your call togather()are executed in parallel, just as if it was a regular parallel stream, then your gatherer is executed in a single thread and pushes elements to a single downstream, and the rest of the stream is again executed in parallel.
So even if you have a gatherer that does not support parallelism for some reason, you can still benefit from the performance brought to you by parallel streams. This feature is unique to the Gatherer API.
Creating Parallel Gatherers
You have three patterns to choose from to create a parallel gatherer.
The first two are equivalent to the ones you have in sequential gatherers.
Gatherer.of(integrator). You already saw this pattern earlier in this section. Your gatherer does not rely on any internal mutable state, and it does not need any finisher. There is a sequential version of this method:Gatherer.ofSequential(integrator).Gatherer.of(integrator, finisher). This gatherer does not need any mutable state, and it has a finisher. This is also a pattern that you saw earlier in this section. There is also a sequential version of this method:Gatherer.ofSequential(integrator, finisher).
The third pattern declares a mutable state. In the Gatherer API this mutable state is not shared among the different threads of your parallel stream. So everytime the Stream API runs a gatherer in a thread, it creates a new instance of this mutable state for this gatherer to work with. On the one hand it makes your life easier, because you do not need to bother with a thread safe mutable state, since it is never shared.
But on the other hand, you need a way to merge these different instances together, into a single one, once you are done with your computation. It makes the corresponding factory method a little more complex, as you need to provide a fourth parameter, called a combiner, that can combine two instances of your mutable state.
So there are now four parameters for this factory method.
- The initializer, which is a
Supplier. - The integrator, which takes the current mutable state, the element you need to integrate, and the downstream. This method returns a boolean to tell if it is done accepting elements or not.
- The combiner, which takes two instances of the mutable state and returns one. It is modeled by a
BinaryOperator. It can return one of the two instances, or create a new one. This combiner can be called any number of times, depending on the number of cores you have on your CPU. - The finisher, which is the
BiConsumerthat we already talked about. This finisher is called only once, at the very end of your computation.
If for some reason, you need an internal mutable state, but you cannot create a combiner, then the Stream API cannot run your gatherer in parallel, since it would not be able to combine the instances of this mutable state together. You would then need to use a sequential gatherer.
Let us make our sorting gatherer a parallel gatherer.
Running the previous code prints the following.
result = [one, six, two, five, four, seven, three]
Chaining Gatherers
The Gatherer API exposes a andThen() method. You can chain existing gatherers to create more gatherers with this method.
You can see chaining in action on some of the gatherers you wrote previously in this section.
Running the previous example prints the following.
result = [THREE, FOUR, FIVE, SEVEN]
As you can imagine, there are constraints on the gatherers you want to chain. The type of the elements produced by the first need to match the type of the elements accepted by the second.
Creating Gatherers with the Gatherers Factory Class
The last element of the Stream API you need to be aware of is the Gatherers factory class. There are five factory methods in it, that create ready to use gatherers that you can create in your application.
Folding
The fold() method implements a reduce operation, implemented from left to right, useful for cases where you cannot write a combiner.
Folding looks like the reduction of a stream. But the reduction operation requires your operator to have several properties, among them associativity, and in some cases, an identity element. When your reduction operation does not have an identity element the corresponding reduce() method returns an optional, that is empty in case you were reducing an empty stream.
Folding does not have these restrictions. A folding just applies an operator (in this case a BiFunction) on your elements to produce a result. It also takes a Supplier to initialize the process.
Note that this gatherer implements your folding as an intermediate operation. So you end up with a stream that can only produce a single element. If you need to get this element, you need to call findFirst().orElseThrow() on it. If you need to process your data further, then maybe using a collector is a good solution.
You can execute the following code to see this gatherer in action.
Running the previous code prints the following.
result = {1234}
Scanning
The scan() factory method creates a gatherer that works with two elements.
- A supplier, that creates an initial value.
- And a
BiFunction.
Each time the upstream pushes a new element to this gatherer, it applies the BiFunction to the previous element and this next element from the upstream, and then pushes the result to the downstream.
You can build a similar example than the fold example. Even if the two gatherers fold and scan look similar, there is one major difference, which is that the fold gatherer only pushes the last element to the downstream. So it acts as a reducer within an intermediate operation. The scan gatherer pushes all the elements as it processes them.
Running the previous code prints the following.
result = [{1}, {12}, {123}, {1234}]
Mapping Concurrently
The mapConcurrent() factory method creates a gatherer that can map your data concurrently. The gatherer it produces is not a parallel gatherer, as it is internally created with the Gatherer.ofSequential() factory method.
This gatherer consumes all the elements it can, then maps them using the Function you pass as an argument. Each mapping is executed in a virtual thread launched specifically for the mapping of this element. The number of active virtual threads at a given time is controlled by the maxConcurrency argument. So it manages an internal Semaphore to make sure that this number of threads does not go over the limit.
Windowing Gatherers
The last two factory methods create gatherers that work with windows. A window is an interval of indexes on your stream. Calling a windowing gatherer on a non-ordered stream (in the ORDERED sense) does not make sense, as several runs on the same data could lead to different, inconsistent results.
There are many strategies to create such windows. The API gives you two.
- The first one, called fixed window gives you a stream of disjoint indexes:
[0,1,2], [3,4,5], [6,7,8], .... So no element is repeated from one window to the following. The last window can be incomplete, as the number of element the upstream can produce may not be exactly dividible by the size of the window you need. - The second one, called sliding window, gives you a stream where the first index of each window is incremented by 1:
[0,1,2], [1,2,3], [2,3,4], .... All the windows of this stream have the same length.
Fixed Window
The windowFixed(windowSize) factory method creates a gatherer that stores the elements pushed by the upstream in non-modifiable lists of size windowSize. The index of the first element of each list is incremented by windowSize. So a given element from the upstream is stored only once in a non-modifiable list. Each list is pushed to the downstream of this gatherer when it is ready. The last list contains all the remaining elements pushed by the upstream. If you are lucky there will be the right amount (windowSize), but your code should not rely on that.
Let us write the following example to see this gatherer in action.
Running the previous code prints the following. As you can see the last windows is shorter that the others, as there are only five elements produced by this stream.
result = [[one, two], [three, four], [five]]
Sliding Window
The windowSliding(windowSize) factory method creates a gatherer that stores the elements pushed by the upstream in non-modifiable lists of size windowSize. The index of the first element of each list is incremented by 1. So a given element from the upstream is stored in windowSize non-modifiable lists. This rule does not hold for the last elements of the upstream. Each list is pushed to the downstream of this gatherer when it is ready. The last list contains the same amount of elements as all the other lists. So the last element pushed by the upstream is present only in the last non-modifiable list pushed to the downstream.
You can write a first, basic example to see this gatherer in action.
Running the previous example prints the following.
result = [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8], [7, 8, 9]]
Note that for this gatherer, all the windows produce have the same size, as long as the upstream produces more elements that the size of the window you chose.
As the elements of the gathered stream are themselves lists, you can further process them using the Stream API. Let us do that by creating a gatherer that computes the average of each window, by composing two gatherers.
Running the previous code prints the following.
result = [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
Last update: September 14, 2021