220

Is there a Java 8 stream operation that limits a (potentially infinite) Stream until the first element fails to match a predicate?

In Java 9 we can use takeWhile as in the example below to print all the numbers less than 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

As there is no such operation in Java 8, what's the best way of implementing it in a general way?

5

19 Answers 19

169

Operations takeWhile and dropWhile have been added to JDK 9. Your example code

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

will behave exactly as you expect it to when compiled and run under JDK 9.

JDK 9 has been released. It is available for download here: JDK 9 Releases.

8
  • 3
    Direct link to the preview docs for JDK9 Stream, with takeWhile/dropWhile: download.java.net/jdk9/docs/api/java/util/stream/Stream.html
    – Miles
    Commented Dec 16, 2015 at 1:11
  • 2
    Is there any reason why they're called takeWhile and dropWhile rather than limitWhile and skipWhile, for consistency with existing API?
    – Lukas Eder
    Commented Jan 7, 2016 at 0:13
  • 11
    @LukasEder takeWhile and dropWhile are pretty widespread, occurring in Scala, Python, Groovy, Ruby, Haskell, and Clojure. The asymmetry with skip and limit is unfortunate. Maybe skip and limit ought to have been called drop and take, but those aren't as intuitive unless you're already familiar with Haskell. Commented Jan 8, 2016 at 2:46
  • 3
    @StuartMarks: I understand that dropXXX and takeXXX are more popular terms but I can personally live with the more SQL-esque limitXXX and skipXXX. I find this new asymmetry much more confusing than the individual choice of terms... :) (btw: Scala also has drop(int) and take(int))
    – Lukas Eder
    Commented Jan 8, 2016 at 9:38
  • 4
    yeah let me just upgrade to Jdk 9 in production. Many devs are still on Jdk8, such a feature should have been included with Streams from the start.
    – wilmol
    Commented Jun 30, 2019 at 22:52
85

Such an operation ought to be possible with a Java 8 Stream, but it can't necessarily be done efficiently -- for example, you can't necessarily parallelize such an operation, as you have to look at elements in order.

The API doesn't provide an easy way to do it, but what's probably the simplest way is to take Stream.iterator(), wrap the Iterator to have a "take-while" implementation, and then go back to a Spliterator and then a Stream. Or -- maybe -- wrap the Spliterator, though it can't really be split anymore in this implementation.

Here's an untested implementation of takeWhile on a Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}
4
  • 8
    In theory, parallelizing takeWhile with a stateless predicate is easy. Evaluate the condition in parallel batches (assuming the predicate doesn't throw or have a side-effect if executed a few extra times). The problem is doing it in the context of recursive decomposition (fork/join framework) that Streams use. Really, it's Streams that are horribly inefficient. Commented Dec 25, 2013 at 20:19
  • 98
    Streams would have been a lot better if they weren't so preoccupied with automagical parallelism. Parallelism is needed in only a tiny fraction of places where Streams can be used. Besides, if Oracle cared so much about perfoance, they could have made the JVM JIT autovectorize, and gotten a much bigger performamce boost, without bothering the developers. Now that is automagical parallelism done right. Commented Dec 25, 2013 at 20:24
  • You should update this answer now that Java 9 is released.
    – Radiodef
    Commented Aug 4, 2018 at 23:15
  • 10
    No, @Radiodef. The question asks specifically for a Java 8 solution. Commented Aug 16, 2018 at 19:44
58

allMatch() is a short-circuiting function, so you can use it to stop processing. The main disadvantage is that you have to do your test twice: once to see if you should process it, and again to see whether to keep going.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);
3
  • 5
    This seemed unintuitive to me at first (given the method name), but the docs confirm that Stream.allMatch() is a short-circuiting operation. So this will complete even on an infinite stream like IntStream.iterate(). Of course, in retrospect, this is a sensible optimization. Commented Nov 25, 2015 at 22:39
  • 3
    This is neat, but I don't think it communicates very well that its intent is the body of the peek. If I encountered it next month, I would take a minute to wonder why the programmer before me checked if allMatch and then ignored the answer. Commented Mar 22, 2017 at 19:56
  • 14
    The disadvantage of this solution is that it returns a boolean so you cannot collect the results of the stream as you normally would.
    – neXus
    Commented Oct 6, 2017 at 11:55
37

As a follow-up to @StuartMarks answer. My StreamEx library has the takeWhile operation which is compatible with current JDK-9 implementation. When running under JDK-9 it will just delegate to the JDK implementation (via MethodHandle.invokeExact which is really fast). When running under JDK-8, the "polyfill" implementation will be used. So using my library the problem can be solved like this:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);
2
  • Why have you not implemented it for the StreamEx class?
    – th0masb
    Commented Apr 9, 2018 at 17:04
  • @Someguy I did implement it. Commented Apr 12, 2018 at 3:07
14

takeWhile is one of the functions provided by the protonpack library.

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));
0
12

Update: Java 9 Stream now comes with a takeWhile method.

No needs for hacks or other solutions. Just use that!


I am sure this can be greatly improved upon: (someone could make it thread-safe maybe)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

A hack for sure... Not elegant - but it works ~:D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}
8

You can use java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));
7

Actually there are 2 ways to do it in Java 8 without any extra libraries or using Java 9.

If you want to print numbers from 2 to 20 on the console you can do this:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

or

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

The output is in both cases:

2
4
6
8
10
12
14
16
18
20

No one mentioned anyMatch yet. This is the reason for this post.

6

This is the source copied from JDK 9 java.util.stream.Stream.takeWhile(Predicate). A little difference in order to work with JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}
4

Here is a version done on ints - as asked in the question.

Usage:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Here's code for StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}
3

Go to get library abacus-common. It provides the exact API you want and more:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Declaration: I'm the developer of AbacusUtil.

1
    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

instead of peak you can use mapToObj to return final object or message

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);
1
  • This should be the accepted answer, if that works consistently
    – Whimusical
    Commented Jan 15, 2021 at 10:16
0

You can't abort a stream except by a short-circuiting terminal operation, which would leave some stream values unprocessed regardless of their value. But if you just want to avoid operations on a stream you can add a transform and filter to the stream:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

That transforms the stream of things to nulls when the things meet some condition, then filters out nulls. If you're willing to indulge in side effects, you could set the condition value to true once some thing is encountered, so all subsequent things are filtered out regardless of their value. But even if not you can save a lot of (if not quite all) processing by filtering values out of the stream that you don't want to process.

2
  • It's lame that some anonymous rater downrated my answer without saying why. So neither I nor any other reader knows what's wrong with my answer. In the absence of their justification I'll consider their criticism invalid, and my answer as posted to be correct.
    – Matthew
    Commented Mar 25, 2017 at 16:53
  • You answer does not solve the OPs problem, which is dealing with infinite streams. It also seems to needlessly complicate things as you can write the condition in the filter() call itself, without needing map(). The question already has an example code, just try to apply your answer to that code and you will see the program will loop forever.
    – SenoCtar
    Commented Apr 4, 2017 at 7:42
0

Even I was having a similar requirement -- invoke the web-service, if it fails, retry it 3 times. If it fails even after these many trials, send an email notification. After googling a lot, anyMatch() came as a saviour. My sample code as follows. In the following example, if webServiceCall method returns true in the first iteration itself, stream does not iterate further as we have called anyMatch(). I believe, this is what you are looking for.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}
0

If you know the exact amount of repititions that will be performed, you can do

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);
1
  • 1
    While this might answer the authors question, it lacks some explaining words and links to documentation. Raw code snippets are not very helpful without some phrases around it. You may also find how to write a good answer very helpful. Please edit your answer.
    – hellow
    Commented Sep 24, 2018 at 6:47
-2

If you have different problem, different solution may be needed but for your current problem, I would simply go with:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);
-2

Might be a bit off topic but this is what we have for List<T> rather than Stream<T>.

First you need to have a take util method. This methods takes first n elements:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

it just works like scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

now it will be fairly simple to write a takeWhile method based on take

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

it works like this:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

this implementation iterate the list partially for a few times but it won't add add O(n^2) operations. Hope that's acceptable.

-3

I have another quick solution by implementing this (which is rly unclean in fact, but you get the idea):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}
1
  • 2
    You're evaluating the whole stream in advance! And if current never .equals(e), you'll get an endless loop. Both even if you subsequently apply e.g. .limit(1). That's far worse than 'unclean'.
    – charlie
    Commented Jun 28, 2016 at 10:31
-3

Here is my attempt using just Java Stream library.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();
1

Not the answer you're looking for? Browse other questions tagged or ask your own question.