diff --git a/README.md b/README.md index c1d5848..7d6f8c6 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# Spark Boilerplate -This is a boilerplate project for Apache Spark. The related blog post can be found at [https://www.barrelsofdata.com/spark-boilerplate-using-scala](https://www.barrelsofdata.com/spark-boilerplate-using-scala) +# Spark Structured Streaming Data Deduplication using State Store +This is a project showing how the spark in-built HDFS backed state store can be used to deduplicate data in a stream. The related blog post can be found at [https://www.barrelsofdata.com/data-deduplication-spark-state-store](https://www.barrelsofdata.com/data-deduplication-spark-state-store) ## Build instructions From the root of the project execute the below commands @@ -21,6 +21,28 @@ From the root of the project execute the below commands ``` ## Run +Ensure your local hadoop cluster is running ([hadoop cluster tutorial](https://www.barrelsofdata.com/apache-hadoop-pseudo-distributed-mode)) and start two kafka brokers ([kafka tutorial](https://www.barrelsofdata.com/apache-kafka-setup)). +- Create kafka topic ```shell script -spark-submit --master yarn --deploy-mode cluster build/libs/spark-boilerplate-1.0.jar +kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic streaming-data +``` +- Start streaming job +```shell script +spark-submit --master yarn --deploy-mode cluster build/libs/spark-state-store-data-deduplication-1.0.jar +Example: spark-submit --master yarn --deploy-mode client build/libs/spark-state-store-data-deduplication-1.0.jar localhost:9092 streaming-data fullOutput deduplicatedOutput 5 +``` +- You can feed simulated data to the kafka topic +- Open new terminal and run the shell script located at src/test/resources/dataProducer.sh +- Produces the two instances of the following json structure every 1 second: {"ts":1594307307,"usr":"user1","tmp":98} +```shell script +cd src/test/resources +./dataProducer.sh localhost:9092 streaming-data +``` + +### View Results +Open a spark-shell and use the following code, do change the paths to where the outputs are stored. +```scala +spark.read.parquet("fullOutput").orderBy("user","eventTime").show(truncate = false) + +spark.read.parquet("deduplicatedOutput").orderBy("user","eventTime").show(truncate = false) ``` \ No newline at end of file diff --git a/build.gradle b/build.gradle index 77a8885..bcbadff 100644 --- a/build.gradle +++ b/build.gradle @@ -15,6 +15,7 @@ dependencies { compileOnly group: "org.apache.spark", name: "spark-core_${scalaMajorVersion}", version: "${apacheSparkVersion}" compileOnly group: "org.apache.spark", name: "spark-sql_${scalaMajorVersion}", version: "${apacheSparkVersion}" + implementation group: "org.apache.spark", name: "spark-sql-kafka-0-10_${scalaMajorVersion}", version: "${apacheSparkVersion}" testImplementation group: "org.scalatest", name: "scalatest_${scalaMajorVersion}", version: "${scalaTestVersion}" } diff --git a/settings.gradle b/settings.gradle index d63964b..d875e2d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'spark-boilerplate' +rootProject.name = 'spark-state-store-data-deduplication' diff --git a/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala b/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala index 3b323b2..603cb43 100644 --- a/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala +++ b/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala @@ -1,21 +1,72 @@ package com.barrelsofdata.sparkexamples import org.apache.log4j.Logger -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{col, from_json} +import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StreamingQuery, Trigger} +import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object Driver { - val JOB_NAME: String = "Boilerplate" + val JOB_NAME: String = "Structured Streaming Data Deduplication" val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) + def run(spark: SparkSession, kafkaBroker: String, kafkaTopic: String, fullOutputPath: String, deduplicatedOutputPath: String, windowSeconds: String): Unit = { + import spark.implicits._ + spark.sparkContext.setLogLevel("ERROR") + + val inputSchema: StructType = new StructType().add("ts", TimestampType).add("usr",StringType).add("tmp",IntegerType) + + val df:DataFrame = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers",kafkaBroker) + .option("subscribe",kafkaTopic) + .load() + + val data: Dataset[UserData] = df + .select(col("value").cast("STRING")) + .select(from_json(col("value"), inputSchema).as("jsonConverted")) + .select(col("jsonConverted.usr").as("user"), col("jsonConverted.tmp").as("temperature"), col("jsonConverted.ts").as("eventTime")) + .as[UserData] + + val deduplicated: Dataset[UserData] = data + .groupByKey(_.user) + .flatMapGroupsWithState[UserData, UserData](OutputMode.Append(), GroupStateTimeout.NoTimeout)(StateOperations.deduplicate) + + val deduplicatedQuery: StreamingQuery = deduplicated + .writeStream + .format("parquet") + .option("path", deduplicatedOutputPath) + .trigger(Trigger.ProcessingTime(s"$windowSeconds seconds")) + .outputMode(OutputMode.Append()) + .option("checkpointLocation", s"${deduplicatedOutputPath}_checkpoint") + .start() + + val fullQuery: StreamingQuery = data + .writeStream + .format("parquet") + .option("path", fullOutputPath) + .trigger(Trigger.ProcessingTime(s"$windowSeconds seconds")) + .outputMode(OutputMode.Append()) + .option("checkpointLocation", s"${fullOutputPath}_checkpoint") + .start() + + deduplicatedQuery.awaitTermination() + + } + def main(args: Array[String]): Unit = { + if(args.length != 5) { + println("Invalid usage") + println("Usage: spark-submit --master spark-structured-streaming-deduplication-1.0.jar ") + LOG.error(s"Invalid number of arguments, arguments given: [${args.mkString(",")}]") + System.exit(1) + } + val spark: SparkSession = SparkSession.builder().appName(JOB_NAME) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate() - val spark: SparkSession = SparkSession.builder().appName(JOB_NAME).getOrCreate() - spark.sql("SELECT 'hello' AS col1").show() + run(spark, args(0), args(1), args(2), args(3), args(4)) - LOG.info("Dummy info message") - LOG.warn("Dummy warn message") - LOG.error("Dummy error message") } } diff --git a/src/main/scala/com/barrelsofdata/sparkexamples/StateOperations.scala b/src/main/scala/com/barrelsofdata/sparkexamples/StateOperations.scala new file mode 100644 index 0000000..88dbfa8 --- /dev/null +++ b/src/main/scala/com/barrelsofdata/sparkexamples/StateOperations.scala @@ -0,0 +1,32 @@ +package com.barrelsofdata.sparkexamples + +import java.sql.Timestamp + +import org.apache.spark.sql.streaming.GroupState + +import scala.math.Ordering + +object StateOperations { + + implicit def ordered: Ordering[Timestamp] = (x: Timestamp, y: Timestamp) => x compareTo y + + def deduplicate(user: String, + currentBatchData: Iterator[UserData], + state: GroupState[UserData]): Iterator[UserData] = { + + var currentState: Option[UserData] = state.getOption + + val sortedBatch = currentBatchData.toSeq.sortBy(_.eventTime) + + val results = for { + userData <- sortedBatch + if currentState.isEmpty || currentState.get.temperature != userData.temperature + } yield { + currentState = Some(userData) + userData + } + state.update(currentState.get) + results.toIterator + } + +} diff --git a/src/main/scala/com/barrelsofdata/sparkexamples/UserData.scala b/src/main/scala/com/barrelsofdata/sparkexamples/UserData.scala new file mode 100644 index 0000000..ad5eb1b --- /dev/null +++ b/src/main/scala/com/barrelsofdata/sparkexamples/UserData.scala @@ -0,0 +1,5 @@ +package com.barrelsofdata.sparkexamples + +import java.sql.Timestamp + +case class UserData (user: String, temperature: Int, eventTime: Timestamp) diff --git a/src/test/resources/dataProducer.sh b/src/test/resources/dataProducer.sh new file mode 100755 index 0000000..88c97a3 --- /dev/null +++ b/src/test/resources/dataProducer.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +display_usage() { + echo "Usage: $0 " +} + +if [ "$#" -ne 2 ]; then + display_usage + exit 1 +fi + +KAFKA_BROKER=$1 +KAFKA_TOPIC=$2 + +TEMP=(96 97 98) +USERS=("user1" "user2" "user3") + +while sleep 1; do + tmp=${TEMP[$RANDOM % ${#TEMP[@]}]} + usr=${USERS[$RANDOM % ${#USERS[@]}]} + epochSeconds=$(date '+%s') + echo "{\"ts\":$epochSeconds,\"usr\":\"${usr}\",\"tmp\":$tmp}" | kafka-console-producer.sh --broker-list ${KAFKA_BROKER} --topic ${KAFKA_TOPIC} + echo "{\"ts\":$epochSeconds,\"usr\":\"${usr}\",\"tmp\":$tmp}" | kafka-console-producer.sh --broker-list ${KAFKA_BROKER} --topic ${KAFKA_TOPIC} +done \ No newline at end of file diff --git a/src/test/resources/log4j-test.properties b/src/test/resources/log4j-test.properties deleted file mode 100644 index bb8c065..0000000 --- a/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,16 +0,0 @@ -log4j.rootLogger=INFO, console -log4j.logger.com.barrelsofdata.sparkexamples=INFO, console -log4j.additivity.com.barrelsofdata.sparkexamples=false - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.immediateFlush=true -log4j.appender.console.encoding=UTF-8 - -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.conversionPattern=%d{yyyy/MM/dd HH:mm:ss} %p %c: %m%n - -log4j.logger.org.apache=ERROR -log4j.logger.org.spark_project=ERROR -log4j.logger.org.sparkproject=ERROR -log4j.logger.parquet=ERROR \ No newline at end of file diff --git a/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala b/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala deleted file mode 100644 index 4fe948a..0000000 --- a/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.barrelsofdata.sparkexamples - -import java.util.Properties - -import org.apache.log4j.{LogManager, Logger, PropertyConfigurator} -import org.apache.spark.sql.SparkSession -import org.scalatest.BeforeAndAfterAll -import org.scalatest.funsuite.AnyFunSuite - -class DriverTest extends AnyFunSuite with BeforeAndAfterAll { - - val JOB_NAME: String = "Driver Test Job" - val LOGGER_PROPERTIES: String = "log4j-test.properties" - val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) - - var spark: SparkSession = _ - - def setupLogger(): Unit = { - val properties = new Properties - properties.load(getClass.getClassLoader.getResource(LOGGER_PROPERTIES).openStream()) - LogManager.resetConfiguration() - PropertyConfigurator.configure(properties) - } - - override def beforeAll: Unit = { - setupLogger() - LOG.info("Setting up spark session") - spark = SparkSession.builder().appName(JOB_NAME).master("local[*]").getOrCreate() - } - - test("Check if spark session is working") { - LOG.info("Testing spark job") - assertResult("hello")(spark.sql("SELECT 'hello'").collect().head.get(0)) - } - - override def afterAll: Unit = { - LOG.info("Closing spark session") - spark.close() - } - -} \ No newline at end of file