31

How can I write a method for combining a Stream of Consumers into a single Consumer using Consumer.andThen(Consumer)?

My first version was:

<T> Consumer<T> combine(Stream<Consumer<T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

<T> Consumer<T> noOpConsumer() {
    return value -> { /* do nothing */ };
}

This version compiles with JavaC and Eclipse. But it is too specific: The Stream cannot be a Stream<SpecialConsumer>, and if the Consumers are not exactly of type T but a super type of it, it cannot be used:

Stream<? extends Consumer<? super Foo>> consumers = ... ;
combine(consumers);

That won't compile, rightfully. The improved version would be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

But neither Eclipse nor JavaC compile that:
Eclipse (4.7.3a):

The type Consumer does not define andThen(capture#7-of ? extends Consumer<? super T>, capture#7-of ? extends Consumer<? super T>) that is applicable here

JavaC (1.8.0172):

error: incompatible types: invalid method reference
.reduce(Consumer::andThen)
incompatible types: Consumer<CAP#1> cannot be converted to Consumer<? super CAP#2>
where T is a type-variable:
T extends Object declared in method <T>combine(Stream<? extends Consumer<? super T>>)
where CAP#1,CAP#2 are fresh type-variables:
CAP#1 extends Object super: T from capture of ? super T
CAP#2 extends Object super: T from capture of ? super T

But it should work: Every subclass of Consumer can be used as a Consumer, too. And every Consumer of a super-type of X can consume Xs, too. I tried to add type parameters to each line of the stream version, but that won't help. But if I write it down with a traditional loop, it compiles:

<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) {
    Consumer<T> result = noOpConsumer()
    for (Consumer<? super T> consumer : consumers) {
        result = result.andThen(consumer);
    }
    return result;
}

(Filtering out the null values is left out for conciseness.)

Therefore, my question is: How can I convince JavaC and Eclipse that my Code is correct? Or, if it is not correct: Why is the loop-version correct but not the Stream Version?

0

3 Answers 3

27

You use a one-argument Stream.reduce(accumulator) version that has the following signature:

Optional<T> reduce(BinaryOperator<T> accumulator);

The BinaryOperator<T> accumulator can only accept elements of type T, but you have:

<? extends Consumer<? super T>>

I propose you to use a three-argument version of the Stream.reduce(...) method instead:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator
             BinaryOperator<U> combiner);

The BiFunction<U, ? super T, U> accumulator can accept parameters of two different types, has a less restrictive bound and is more suitable for your situation. A possible solution could be:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers.filter(Objects::nonNull)
                    .reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}

The third argument BinaryOperator<U> combiner is called only in the parallel streams, but anyway it would be wise to provide a correct implementation of it.

In addition, for a better understanding one could represent the above code as follows:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {

    Consumer<T> identity = t -> {};
    BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
    BinaryOperator<Consumer<T>> combiner = Consumer::andThen;

    return consumers.filter(Objects::nonNull)
                    .reduce(identity, acc, combiner);
}

Now you can write:

Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);
0
1

You have forgotten a small thing in your method definition. It currently is:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {}

But you're returing Consumer<? super T>. So by changing the return type it almost works. Now you accept an argument consumers of type Stream<? extends Consumer<? super T>>. Currently it doesn't work, because you're working with possibly different subclasses and implementations of Consumer<? super T> (because of the upperbounded wildcard extends). You can overcome this, by casting every ? extends Consumer<? super T> in your Stream to a simple Consumer<? super T>. Like the following:

<T> Consumer<? super T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
        .filter(Objects::nonNull)
        .map(c -> (Consumer<? super T>) c)
        .reduce(Consumer::andThen)
        .orElse(noOpConsumer());
}

This should now work

4
  • 1
    How does the loop version allows to return Consumer<T>?
    – Thiyagu
    Commented Jun 18, 2018 at 12:07
  • @user7 I am sorry, but I don't understand your question. Do you ask how it currently is possible (so the reason). Or how to fix it that it is possible?
    – Lino
    Commented Jun 18, 2018 at 12:11
  • The type cast to (Consumer<? super T>) is equivalent to the assignment to the enhanced for loop variable. But the normal for loop version of combine enables us to return just Consumer<T>. I was wondering how
    – Thiyagu
    Commented Jun 18, 2018 at 12:13
  • @user7 I understand now, but is this even desired? Why would you want to return Consumer<T> in the first place? And if you want to return an unbounded Consumer you can simply cast it again with (Consumer<T>), which is unchecked but will work
    – Lino
    Commented Jun 18, 2018 at 12:19
0

If you have a lot of consumers, applying Consumer.andThen() will create a huge tree of consumer wrappers that is processed recursively to call each original consumer.

It might thus be more efficient to simply build a list of the consumers, and create a simple consumer that iterates over them:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    List<Consumer<? super T>> consumerList = consumers
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    return t -> consumerList.forEach(c -> c.accept(t));
}

Alternatively, if you can guarantee that the resulting consumer will only be called once, and that the Stream will still be valid at that time, you can simply iterate directly over the stream:

return t -> consumers
        .filter(Objects::nonNull)
        .forEach(c -> c.accept(t));
4
  • Couldn't c -> c.accept(t) be replaced with Consumer::accept?
    – Alexander
    Commented Jun 18, 2018 at 16:31
  • @Alexander No, forEach Expects a consumer, i.e. something that takes 1 parameter and returns nothing. Consumer::accept would require 2 parameters: the Consumer itself (the “this”) and the object to consume. In fact you can easily notice it would not be equivalent because t would become unused with that method reference.
    – Didier L
    Commented Jun 18, 2018 at 17:29
  • Oh yeah, that makes sense
    – Alexander
    Commented Jun 18, 2018 at 17:30
  • @Alexander also note that forEach actually expects a Consumer<Consumer<T>> here
    – Didier L
    Commented Jun 18, 2018 at 17:33

Not the answer you're looking for? Browse other questions tagged or ask your own question.