20

I'm creating a data sample from some dataframe df with

rdd = df.limit(10000).rdd

This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.

However, when I now work on rdd, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?

What is the reason behind it?

Update: Here is a reproduction on Spark 1.5.2

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

The output is

499500
19955500
49651500

I'm surprised that .rdd doesn't fix the data.

EDIT: To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

Basically, whenever you use limit your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).

2
  • How do you create df?
    – zero323
    Commented May 11, 2016 at 18:11
  • df is read directly from an HDFS parquet file.
    – Gere
    Commented May 14, 2016 at 8:19

3 Answers 3

12

Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.

Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.

7
  • Sure, between different runs I will get different results the first time I read it. But here I assign it to rdd1 and apparently this get recalculated. This basically implies that everything can be potential wrong once you use limit somewhere.
    – Gere
    Commented Jan 10, 2017 at 6:17
  • You're right that rdd1 will be recalculated if it's not cached. My point is that your only guarantee with the limit(N) function is that you will get at most that N elements from the RDD. There are no guarantees about which elements you'll get or, as you're noticing, that those elements will be the same. You can't base the logic of your program on a guarantee of consistency. If you really need to get the same elements every time, you'll have to use something else, like a filter or a global sort, which do have consistency guarantees.
    – santon
    Commented Jan 10, 2017 at 23:20
  • That's clear. But here is something: I could make a single call which will give incorrect results. I just have to use rdd1 in two parts of thr DAG and make a final call in the end. Maybe should update the question to show the implications.
    – Gere
    Commented Jan 11, 2017 at 7:15
  • 2
    Can you be more specific by "make a single call"? You can define the RDD once (e.g. rdd = df.limit(10000).rdd). But spark evaluates lazily, so it's not until you call something like rdd.first() that the computation takes place. At that point, spark more or less interprets this as "Give me the first element out of 10000 random rows from df". When you call rdd.first() later, you kick off another computation, which can give different results than the first time.
    – santon
    Commented Jan 11, 2017 at 19:09
  • 2
    I think there is still a misunderstanding. Don't think of an RDD as ever actually being materialized. That is, don't think of rdd1 as 12,345 integers. Think of it as a description of a computation that guarantees a list of at most 12,345 integers, but there is no specificity of what those integers are. It doesn't matter when or how or how often you reference the RDD, it's just a description of a computation. When you reference rdd1 multiple times, you're asking for the output of the computation twice with no guarantees of consistency.
    – santon
    Commented Jan 12, 2017 at 22:42
8

Spark is lazy, so each action you take recalculates the data returned by limit(). If the underlying data is split across multiple partitions, then every time you evaluate it, limit might be pulling from a different partition (i.e. if your data is stored across 10 Parquet files, the first limit call might pull from file 1, the second from file 7, and so on).

0
7

From the Spark docs:

The LIMIT clause is used to constrain the number of rows returned by the SELECT statement. In general, this clause is used in conjunction with ORDER BY to ensure that the results are deterministic.

So you need to sort the rows beforehand if you want the call to .limit() to be deterministic. But there is a catch! If you sort by a column that doesn't have unique values for every row, the so called "tied" rows (rows with same sorting key value) will not be deterministically ordered, thus the .limit() might still be nondeterministic.

You have two options to work around this:

  • Make sure you include the a unique row id in the sorting call.
    For example df.orderBy('someCol', 'rowId').limit(n).
    You can define the rowId like so:
    df = df.withColumn('rowId', func.monotonically_increasing_id())
  • If you only need deterministic result in the single run, you could simply cache the results of limit df.limit(n).cache() so that at least the results from that limit do not change due to the consecutive action calls that would otherwise recompute the results of limit and mess up the results.

Not the answer you're looking for? Browse other questions tagged or ask your own question.