Questions tagged [spark-structured-streaming]
Spark Structured Streaming allows processing live data streams using DataFrame and Dataset APIs.
spark-structured-streaming
2,497
questions
1
vote
0
answers
31
views
Join after groupby in Spark structured streaming
When I run the following code:
Dataset<Row> aggStreamA = df
.withWatermark("dateTime", "2 days")
.groupBy(
window(col("dateTime"), ...
-1
votes
0
answers
22
views
How to do CDC when source is a Delta table? (Detected a data update and this is currently not supported)
I have a source table in Delta format and it's a SCD1 table which means it's data can be updated.
I wanna do CDC and write output into another Delta table.
I used readStream and writeStream and the ...
0
votes
0
answers
8
views
Spark Streaming: Periodic Latency Spike w/ ElasticSearch/OpenSearch Connector using Spark DataSource V2
I developed Spark Structured Streaming on Spark 3.1.2.
It reads streaming data and join with static DataFrame, which refreshes with a period using Delta foramt.
It sinks to ElasticSearch(or OpenSearch)...
0
votes
0
answers
10
views
How to monitor Kafka consumption / lag when working with spark structured streaming?
I have just find out spark structured streaming do not commit offset to kafka but use its internal checkpoint system and that there is no way to visualize its consumption lag in typical kafka UI
...
0
votes
0
answers
17
views
Spark Structured Streaming does not work on Cluster Mode
I developed spark streaming application using Spark Structured Streaming v3.1.2
When i try to run Spark application on yarn Cluster mode, it returns an error with an exception trying to access HDFS ...
2
votes
1
answer
29
views
Graceful Shutdown for PySpark Structured Streaming Job Throws Py4JNetworkError
I am working on a PySpark Structured Streaming job that reads data from a Kafka topic and processes it in real-time. I want to implement a graceful shutdown using signal handling, but I'm encountering ...
0
votes
0
answers
30
views
Unable to write Spark Streaming data from Kafka
I am trying to write Spark Streaming data fed from Kafka locally as csv. The data folder shows blank.
The Folder Structure is added in the screen shot.
I am adding the Kafka Producer code as follows
...
0
votes
0
answers
16
views
spark structured streaming window
i want to process all the data in the 60 seconds window, but i found data which belongs to the previous window, how to avoid this?
here is the code( run in local mode with spark 3.5.1):
from typing ...
2
votes
2
answers
52
views
Issue with Multiple Spark Structured Streaming Jobs Consuming Same Kafka Topic
I have two separate Python scripts (job1.py and job2.py) that use Spark Structured Streaming to consume data from the Kafka topic test1. Both scripts are configured with the same Kafka consumer group (...
1
vote
0
answers
19
views
Propagate information from worker to master
The problem i want to solve is:
The script execution must return error code anytime tests fails
Each subsequent execution of the script must process only the data difference from the last execution
...
0
votes
0
answers
38
views
Spark SQL 3.5.1 How to consume MQTT data in real time, is there any existing library? Do I need a custom data source? How to customize it?
I am using Spark SQL 3.5.1 to consume MQTT live data, and the latest version uses DataSet unbounded tables to process live data, but there is no up-to-date tutorial in the documentation. How do I ...
0
votes
0
answers
13
views
Pyspark Streaming through socket into console but getting error
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Streaming")
.master("local[*]")
.getOrCreate()
)
...
0
votes
0
answers
23
views
Spark Structed Streaming GroupState Gets TimedOut unexpected in flatMapGroupsWithState
I am writing a Spark Structed Streaming job but find something unexpected.
Here is the sample code.
/* in main func */
val res = ds
.groupByKey(c => c.PartitionKey)
.flatMapGroupsWithState[...
0
votes
0
answers
30
views
Spark Structured streaming facing issue with using exceptAll function
could someone please help me with this code :-
input parameter df is a spark structured streaming dataframe
def apply_duplicacy_check(df, duplicate_check_columns):
if len(duplicate_check_columns) == 0:...
0
votes
1
answer
45
views
Speed up the data save to S3 buckets using spark scala
I am looking out for some pointers by which I can fasten the speed at which data is being persisted to S3. So I am currently persisting data to s3 buckets based on the below example path
s3://...