spark-state-store-data-dedu.../README.md

47 lines
2.5 KiB
Markdown
Raw Normal View History

2023-10-07 08:01:18 -04:00
[![Tests](https://barrelsofdata.com/api/v1/git/action/status/fetch/barrelsofdata/spark-state-store-data-deduplication/tests)](https://git.barrelsofdata.com/barrelsofdata/spark-state-store-data-deduplication/actions?workflow=workflow.yaml)
[![Build](https://barrelsofdata.com/api/v1/git/action/status/fetch/barrelsofdata/spark-state-store-data-deduplication/build)](https://git.barrelsofdata.com/barrelsofdata/spark-state-store-data-deduplication/actions?workflow=workflow.yaml)
2020-07-19 10:19:55 -04:00
# Spark Structured Streaming Data Deduplication using State Store
2023-10-07 08:01:18 -04:00
This is a project showing how the spark in-built HDFS backed state store can be used to deduplicate data in a stream. The related blog post can be found at [https://barrelsofdata.com/data-deduplication-spark-state-store](https://barrelsofdata.com/data-deduplication-spark-state-store)
2020-07-19 09:30:42 -04:00
## Build instructions
From the root of the project execute the below commands
- To clear all compiled classes, build and log directories
```shell script
./gradlew clean
```
- To run tests
```shell script
./gradlew test
```
- To build jar
```shell script
2023-10-07 08:01:18 -04:00
./gradlew build
2020-07-19 09:30:42 -04:00
```
## Run
2023-10-07 08:01:18 -04:00
Ensure your local hadoop cluster is running ([hadoop cluster tutorial](https://barrelsofdata.com/apache-hadoop-pseudo-distributed-mode)) and start two kafka brokers ([kafka tutorial](https://barrelsofdata.com/apache-kafka-setup)).
2020-07-19 10:19:55 -04:00
- Create kafka topic
2020-07-19 09:30:42 -04:00
```shell script
2020-07-19 10:19:55 -04:00
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic streaming-data
```
- Start streaming job
```shell script
2023-10-07 08:01:18 -04:00
spark-submit --master yarn --deploy-mode cluster build/libs/spark-state-store-data-deduplication-1.0.0.jar <KAFKA_BROKER> <KAFKA_TOPIC> <FULL_OUTPUT_PATH> <DEDUPLICATED_OUTPUT_PATH> <WINDOW_SIZE_SECONDS>
Example: spark-submit --master yarn --deploy-mode client build/libs/spark-state-store-data-deduplication-1.0.0.jar localhost:9092 streaming-data fullOutput deduplicatedOutput 5
2020-07-19 10:19:55 -04:00
```
- You can feed simulated data to the kafka topic
- Open new terminal and run the shell script located at src/test/resources/dataProducer.sh
- Produces the two instances of the following json structure every 1 second: {"ts":1594307307,"usr":"user1","tmp":98}
```shell script
cd src/test/resources
./dataProducer.sh localhost:9092 streaming-data
```
### View Results
Open a spark-shell and use the following code, do change the paths to where the outputs are stored.
```scala
spark.read.parquet("fullOutput").orderBy("user","eventTime").show(truncate = false)
spark.read.parquet("deduplicatedOutput").orderBy("user","eventTime").show(truncate = false)
2020-07-19 09:30:42 -04:00
```