8

I want to do Spark Structured Streaming (Spark 2.4.x) from a Kafka source to a MariaDB with Python (PySpark).

I want to use the streamed Spark dataframe and not the static nor Pandas dataframe.

It seems that one has to use foreach or foreachBatch since there are no possible database sinks for streamed dataframes according to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks.

Here is my try:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
  .format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mytopic") \
  .load() \
  .selectExpr("Timestamp", "cast (value as string) as json") \
  .select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
  .selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
    # Process row
    row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreach(process_row).start()

I get an error:

AttributeError: write

Why?

3 Answers 3

13

tl;dr Replace foreach with foreachBatch.


Quoting the official documentation:

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

In other words, your writeStream.foreach(process_row) acts on a single row (of data) that has no write.jdbc available and hence the error.

Think of the row as a piece of data that you can save anywhere you want using any API you want.

If you really need support from Spark (and do use write.jdbc) you should actually use foreachBatch.

while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

4
  • 1
    does foreach or foreachBatch slow down the application? Commented Dec 30, 2020 at 7:48
  • 2
    No further (micro)batches are going to be started until the current one has finished. So, be as quick in foreach and foreachBatch as possible. Commented Dec 30, 2020 at 9:21
  • @JacekLaskowski - is watermark supported when we use foreachbatch ? Commented Mar 28, 2023 at 23:03
  • @KaranAlang Please ask a separate question 🙏 Commented Mar 31, 2023 at 10:47
6

With the support of Jacek, I could fix my example:

def process_row(df, epoch_id):
    df2.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
    pass
query = df2.writeStream.foreachBatch(process_row).start()

You also must put the epoch_id into the function parameters. Otherwise you get errors in the spark log file that are not shown in the jupyter notebook.

1
#
here working code spark Structured Streaming  (3.2.1)
from kafka to postgres

   

spark = SparkSession.builder.appName(stg).getOrCreate()                  
jdbcDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<>") \
    .option("subscribe", "<>") \
    .option("startingOffsets", "earliest") \
    .load()



jdbcDF = jdbcDF.withColumn("value",col("value").cast(StringType()))

df = jdbcDF


def foreach_batch_function(df, epoch_id):
    df.printSchema()
    df.show()
    df.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", url) \
    .option("dbtable", stg) \
    .option("user", pg_username) \
    .option("password", pg_password) \
    .option("truncate", True) \
    .save()

writing_sink = df.writeStream \
    .trigger(processingTime='12 seconds') \
    .foreachBatch(foreach_batch_function) \
    .start()
    

writing_sink.awaitTermination()

spark.stop()
  • need jar lib

commons-pool2-2.11.1.jar, kafka-clients-3.2.1.jar, postgresql-42.5.0.jar, spark-sql-kafka-0-10_2.12-3.2.1.jar, spark-token-provider-kafka-0-10_2.12-3.2.1.jar

1
  • Can you please elaborate where the variables url, stg, pg_username, pg_password inside the foreach_batch_function() are defined at? I am asking because the parameters of this function is fixed with (DataFrame, Long). Thanks.
    – geekyj
    Commented Feb 16, 2023 at 4:43

Not the answer you're looking for? Browse other questions tagged or ask your own question.