4

I use Pyspark with Spark 2.4 in the standalone mode on Linux for aggregating incoming data and writing these into a database using a Jupyter notebook (currently for testing) with the following stripped content:

spark = SparkSession.builder.appName("foo").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:1234").option("subscribe", "bar”).load()
df2 = df.withWatermark("timestamp", "1 second").groupby(F.window('timestamp', "5 second")).agg(F.min("timestamp").alias('timestamp_window_min'))

def write_into_sink(df, epoch_id):
    df.write.jdbc(table="foo_agg", mode="append", [...])
    pass
query_write_sink = df2.writeStream.foreachBatch(write_into_sink).trigger(processingTime = "1 seconds").start()

After running for 2 hours in the tmp directory of Spark there are dozens of directories in tmp/temporary-[...]/state/0/ with a lot of small cec and delta files that adds up to 6 GB of disk space during the run. So, my problem is that I cannot run the script for few hours since the disk would be full; how could I run it for a longer time like days or even months? If I close/kill the python kernel, the directories are purged.

I already followed Apache Spark does not delete temporary directories and adusted conf/spark-env.sh to SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true" but is still does not help after a restart since it only deals with the files after the spark run and not during. I also tried SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true spark.worker.cleanup.appDataTtl=120" in the same file with the same non existing result.

So, do you have an idea how to get rid of the tmp files of spark during the run?

4
  • did you try this spark.worker.cleanup.appDataTtl mentioned in link which is in your question, set this with few hrs and check
    – Neo-coder
    Commented Apr 24, 2020 at 8:53
  • I cannot run it for hours. So I set it to 120 [s] but and ran it for about 1 hour
    – tardis
    Commented Apr 24, 2020 at 10:37
  • you can write clean up code inside your spark listeners say every 30 mins or 60 mins
    – s.polam
    Commented Apr 27, 2020 at 16:59
  • @Srinivas : How do I do that? Can you please provide an example as an answer?
    – tardis
    Commented Apr 28, 2020 at 7:24

1 Answer 1

-1

You could have a cron entry cleaning up the relevant directory (perhaps, delete files older than 30 mins every 30 mins) something along the following lines:

0/30 * * * * find /path_to_spark_directory/* -mmin +30 -exec rm -rf {} \; 
2
  • I also thought about that workaround. But how do I know that I do not remove important stuff? Can I be sure that the temp directories always get a similar name? There must be a solution within Spark...
    – tardis
    Commented Apr 23, 2020 at 13:19
  • I suspect that the 'work' directory is the culprit for lack of disk space. I typically have a cron entry for SPARK_HOME/work/* which works fine even for long running Spark jobs (24 to 48 hours). Refer Spark Docs: spark.apache.org/docs/latest/…
    – Srikant
    Commented Apr 23, 2020 at 13:37

Not the answer you're looking for? Browse other questions tagged or ask your own question.