7

I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.

First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing. Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has. Then I read that foreachPartition applies a function to all the data within a partition and, hence, allows parallel processing.

My questions now are:

  1. When I apply a Python-function via foreachPartition, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?

  2. Is the data processed row-wise within foreachPartition (meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?

Thank you in advance for your input!


Edit:

A working in driver-solution I used before looks like this, taken from SO here:

for partition in rdd.mapPartitions(lambda partition: [list(partition)]).toLocalIterator():
    # Do stuff on the partition

As can be read from the docs rdd.toLocalIterator() provides the necessary functionality:

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

2 Answers 2

10

Luckily I stumbled upon this great explanation of mapPartitions from Mrinal (answered here).

mapPartitions applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes. While foreachPartition only applies a function (e.g. write your data in a .csv-file), mapPartitions also returns a new RDD. Therefore, using foreachPartition was the wrong choice for me.

In order to answer my second question: Functions like map or UDFs create a new Python-instance and pass data from the DataFrame/RDD row-by-row, resulting in a lot of overhead. foreachPartition and mapPartitions (both RDD-functions) transfer an entire partition to a Python-instance.

Additionally, using generators also reduces the amount of memory necessary for iterating over this transferred partition data (partitions are handled as iterator objects, while each row is then processed by iterating over this object).

An example might look like:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

Hope this helps somebody facing similar problems :)

1
  • 1
    This helped me tremendously today. Thank you.
    – mcsilvio
    Commented Mar 28, 2023 at 22:01
0

pySpark UDFs execute near the executors - i.e. in a sperate python instance, per executor, that runs side-by-side and passes data back and forth between the spark engine (scala) and the python interpreter.

the same is true for calls to udfs inside a foreachPartition

Edit - after looking at the sample code

  1. using RDDs is not an efficient way of using spark - you should move to datasets
  2. what makes your code sync all data to the driver is the collect()
  3. foreachParition will be similar to glom
0

Not the answer you're looking for? Browse other questions tagged or ask your own question.