Initial commit

This commit is contained in:
2020-07-19 10:19:55 -04:00
parent 9f66e1036c
commit c701d472df
9 changed files with 146 additions and 68 deletions

View File

@ -1,21 +1,72 @@
package com.barrelsofdata.sparkexamples
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object Driver {
val JOB_NAME: String = "Boilerplate"
val JOB_NAME: String = "Structured Streaming Data Deduplication"
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
def run(spark: SparkSession, kafkaBroker: String, kafkaTopic: String, fullOutputPath: String, deduplicatedOutputPath: String, windowSeconds: String): Unit = {
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val inputSchema: StructType = new StructType().add("ts", TimestampType).add("usr",StringType).add("tmp",IntegerType)
val df:DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBroker)
.option("subscribe",kafkaTopic)
.load()
val data: Dataset[UserData] = df
.select(col("value").cast("STRING"))
.select(from_json(col("value"), inputSchema).as("jsonConverted"))
.select(col("jsonConverted.usr").as("user"), col("jsonConverted.tmp").as("temperature"), col("jsonConverted.ts").as("eventTime"))
.as[UserData]
val deduplicated: Dataset[UserData] = data
.groupByKey(_.user)
.flatMapGroupsWithState[UserData, UserData](OutputMode.Append(), GroupStateTimeout.NoTimeout)(StateOperations.deduplicate)
val deduplicatedQuery: StreamingQuery = deduplicated
.writeStream
.format("parquet")
.option("path", deduplicatedOutputPath)
.trigger(Trigger.ProcessingTime(s"$windowSeconds seconds"))
.outputMode(OutputMode.Append())
.option("checkpointLocation", s"${deduplicatedOutputPath}_checkpoint")
.start()
val fullQuery: StreamingQuery = data
.writeStream
.format("parquet")
.option("path", fullOutputPath)
.trigger(Trigger.ProcessingTime(s"$windowSeconds seconds"))
.outputMode(OutputMode.Append())
.option("checkpointLocation", s"${fullOutputPath}_checkpoint")
.start()
deduplicatedQuery.awaitTermination()
}
def main(args: Array[String]): Unit = {
if(args.length != 5) {
println("Invalid usage")
println("Usage: spark-submit --master <local|yarn> spark-structured-streaming-deduplication-1.0.jar <kafka_broker> <kafka_topic> <full_output_path> <deduplicated_output_path> <window_seconds>")
LOG.error(s"Invalid number of arguments, arguments given: [${args.mkString(",")}]")
System.exit(1)
}
val spark: SparkSession = SparkSession.builder().appName(JOB_NAME)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()
val spark: SparkSession = SparkSession.builder().appName(JOB_NAME).getOrCreate()
spark.sql("SELECT 'hello' AS col1").show()
run(spark, args(0), args(1), args(2), args(3), args(4))
LOG.info("Dummy info message")
LOG.warn("Dummy warn message")
LOG.error("Dummy error message")
}
}

View File

@ -0,0 +1,32 @@
package com.barrelsofdata.sparkexamples
import java.sql.Timestamp
import org.apache.spark.sql.streaming.GroupState
import scala.math.Ordering
object StateOperations {
implicit def ordered: Ordering[Timestamp] = (x: Timestamp, y: Timestamp) => x compareTo y
def deduplicate(user: String,
currentBatchData: Iterator[UserData],
state: GroupState[UserData]): Iterator[UserData] = {
var currentState: Option[UserData] = state.getOption
val sortedBatch = currentBatchData.toSeq.sortBy(_.eventTime)
val results = for {
userData <- sortedBatch
if currentState.isEmpty || currentState.get.temperature != userData.temperature
} yield {
currentState = Some(userData)
userData
}
state.update(currentState.get)
results.toIterator
}
}

View File

@ -0,0 +1,5 @@
package com.barrelsofdata.sparkexamples
import java.sql.Timestamp
case class UserData (user: String, temperature: Int, eventTime: Timestamp)