I use Pyspark with Spark 2.4 in the standalone mode on Linux for aggregating incoming data and writing these into a database using a Jupyter notebook (currently for testing) with the following stripped content:
spark = SparkSession.builder.appName("foo").getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:1234").option("subscribe", "bar”).load()
df2 = df.withWatermark("timestamp", "1 second").groupby(F.window('timestamp', "5 second")).agg(F.min("timestamp").alias('timestamp_window_min'))
def write_into_sink(df, epoch_id):
df.write.jdbc(table="foo_agg", mode="append", [...])
pass
query_write_sink = df2.writeStream.foreachBatch(write_into_sink).trigger(processingTime = "1 seconds").start()
After running for 2 hours in the tmp directory of Spark there are dozens of directories in tmp/temporary-[...]/state/0/
with a lot of small cec and delta files that adds up to 6 GB of disk space during the run. So, my problem is that I cannot run the script for few hours since the disk would be full; how could I run it for a longer time like days or even months? If I close/kill the python kernel, the directories are purged.
I already followed Apache Spark does not delete temporary directories and adusted conf/spark-env.sh
to SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true"
but is still does not help after a restart since it only deals with the files after the spark run and not during. I also tried SPARK_WORKER_OPTS="spark.worker.cleanup.enabled=true spark.worker.cleanup.appDataTtl=120"
in the same file with the same non existing result.
So, do you have an idea how to get rid of the tmp files of spark during the run?
spark.worker.cleanup.appDataTtl
mentioned in link which is in your question, set this with few hrs and check