Initial commit
This commit is contained in:
parent
62c65fa963
commit
04dcb71155
20
README.md
20
README.md
@ -1,5 +1,5 @@
|
|||||||
# Spark Boilerplate
|
# Spark Structured Streaming Word Count
|
||||||
This is a boilerplate project for Apache Spark. The related blog post can be found at [https://www.barrelsofdata.com/spark-boilerplate-using-scala](https://www.barrelsofdata.com/spark-boilerplate-using-scala)
|
This is a project detailing how to write a streaming word count program in Apache Spark using Structured Streaming. The related blog post can be found at [https://www.barrelsofdata.com/spark-structured-streaming-word-count](https://www.barrelsofdata.com/spark-structured-streaming-word-count)
|
||||||
|
|
||||||
## Build instructions
|
## Build instructions
|
||||||
From the root of the project execute the below commands
|
From the root of the project execute the below commands
|
||||||
@ -21,6 +21,20 @@ From the root of the project execute the below commands
|
|||||||
```
|
```
|
||||||
|
|
||||||
## Run
|
## Run
|
||||||
|
Ensure your local hadoop cluster is running ([hadoop cluster tutorial](https://www.barrelsofdata.com/apache-hadoop-pseudo-distributed-mode)) and start two kafka brokers ([kafka tutorial](https://www.barrelsofdata.com/apache-kafka-setup)).
|
||||||
|
- Create kafka topic
|
||||||
```shell script
|
```shell script
|
||||||
spark-submit --master yarn --deploy-mode cluster build/libs/spark-boilerplate-1.0.jar
|
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic streaming-data
|
||||||
|
```
|
||||||
|
- Start streaming job
|
||||||
|
```shell script
|
||||||
|
spark-submit --master yarn --deploy-mode cluster build/libs/spark-structured-streaming-wordcount-1.0.jar <KAFKA_BROKER> <KAFKA_TOPIC>
|
||||||
|
Example: spark-submit --master yarn --deploy-mode client build/libs/spark-structured-streaming-wordcount-1.0.jar localhost:9092 streaming-data
|
||||||
|
```
|
||||||
|
- You can feed simulated data to the kafka topic
|
||||||
|
- Open new terminal and run the shell script located at src/test/resources/dataProducer.sh
|
||||||
|
- Produces the following json structure every 1 second: {"ts":1594307307,"str":"This is an example string"}
|
||||||
|
```shell script
|
||||||
|
cd src/test/resources
|
||||||
|
./dataProducer.sh localhost:9092 streaming-data
|
||||||
```
|
```
|
@ -15,6 +15,7 @@ dependencies {
|
|||||||
|
|
||||||
compileOnly group: "org.apache.spark", name: "spark-core_${scalaMajorVersion}", version: "${apacheSparkVersion}"
|
compileOnly group: "org.apache.spark", name: "spark-core_${scalaMajorVersion}", version: "${apacheSparkVersion}"
|
||||||
compileOnly group: "org.apache.spark", name: "spark-sql_${scalaMajorVersion}", version: "${apacheSparkVersion}"
|
compileOnly group: "org.apache.spark", name: "spark-sql_${scalaMajorVersion}", version: "${apacheSparkVersion}"
|
||||||
|
implementation group: "org.apache.spark", name: "spark-sql-kafka-0-10_${scalaMajorVersion}", version: "${apacheSparkVersion}"
|
||||||
|
|
||||||
testImplementation group: "org.scalatest", name: "scalatest_${scalaMajorVersion}", version: "${scalaTestVersion}"
|
testImplementation group: "org.scalatest", name: "scalatest_${scalaMajorVersion}", version: "${scalaTestVersion}"
|
||||||
}
|
}
|
||||||
|
@ -1 +1 @@
|
|||||||
rootProject.name = 'spark-boilerplate'
|
rootProject.name = 'spark-structured-streaming-wordcount'
|
||||||
|
@ -1,21 +1,63 @@
|
|||||||
package com.barrelsofdata.sparkexamples
|
package com.barrelsofdata.sparkexamples
|
||||||
|
|
||||||
import org.apache.log4j.Logger
|
import org.apache.log4j.Logger
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.functions.{col, desc, from_json}
|
||||||
|
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
|
||||||
|
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
|
||||||
|
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
|
||||||
|
|
||||||
object Driver {
|
object Driver {
|
||||||
|
|
||||||
val JOB_NAME: String = "Boilerplate"
|
val JOB_NAME: String = "Streaming Word Count"
|
||||||
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
|
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
|
||||||
|
|
||||||
|
def run(spark: SparkSession, kafkaBroker: String, kafkaTopic: String): Unit = {
|
||||||
|
import spark.implicits._
|
||||||
|
spark.sparkContext.setLogLevel("ERROR")
|
||||||
|
|
||||||
|
val inputSchema: StructType = new StructType().add("ts", TimestampType).add("str",StringType)
|
||||||
|
|
||||||
|
val df:DataFrame = spark.readStream
|
||||||
|
.format("kafka")
|
||||||
|
.option("kafka.bootstrap.servers",kafkaBroker)
|
||||||
|
.option("subscribe",kafkaTopic)
|
||||||
|
.load()
|
||||||
|
|
||||||
|
val data: Dataset[String] = df
|
||||||
|
.select(col("value").cast("STRING"))
|
||||||
|
.select(from_json(col("value"), inputSchema).as("jsonConverted"))
|
||||||
|
.select("jsonConverted.str").as[String]
|
||||||
|
|
||||||
|
val words: Dataset[String] = data
|
||||||
|
.map(WordCount.cleanData)
|
||||||
|
.flatMap(WordCount.tokenize)
|
||||||
|
.filter(_.nonEmpty)
|
||||||
|
|
||||||
|
val wordFrequencies: DataFrame = words
|
||||||
|
.groupBy(col("value")).count()
|
||||||
|
.toDF("word", "frequency")
|
||||||
|
|
||||||
|
val query: StreamingQuery = wordFrequencies
|
||||||
|
.orderBy(desc("frequency"))
|
||||||
|
.writeStream
|
||||||
|
.format("console")
|
||||||
|
.outputMode(OutputMode.Complete())
|
||||||
|
.start()
|
||||||
|
|
||||||
|
query.awaitTermination()
|
||||||
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
if(args.length != 2) {
|
||||||
|
println("Invalid usage")
|
||||||
|
println("Usage: spark-submit --master <local|yarn> spark-structured-streaming-wordcount-1.0.jar <kafka_broker> <kafka_topic>")
|
||||||
|
LOG.error(s"Invalid number of arguments, arguments given: [${args.mkString(",")}]")
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
val spark: SparkSession = SparkSession.builder().appName(JOB_NAME).getOrCreate()
|
val spark: SparkSession = SparkSession.builder().appName(JOB_NAME).getOrCreate()
|
||||||
spark.sql("SELECT 'hello' AS col1").show()
|
|
||||||
|
|
||||||
LOG.info("Dummy info message")
|
run(spark, args(0), args(1))
|
||||||
LOG.warn("Dummy warn message")
|
|
||||||
LOG.error("Dummy error message")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
package com.barrelsofdata.sparkexamples
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger
|
||||||
|
|
||||||
|
object WordCount extends Serializable {
|
||||||
|
|
||||||
|
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
|
||||||
|
|
||||||
|
def cleanData(line: String): String = {
|
||||||
|
line
|
||||||
|
.toLowerCase
|
||||||
|
.replaceAll("[,.]"," ")
|
||||||
|
.replaceAll("[^a-z0-9\\s-]","")
|
||||||
|
.replaceAll("\\s+"," ")
|
||||||
|
.trim
|
||||||
|
}
|
||||||
|
|
||||||
|
def tokenize(line: String): List[String] = {
|
||||||
|
line.split("\\s").toList
|
||||||
|
}
|
||||||
|
|
||||||
|
def keyValueGenerator(word: String): (String, Int) = {
|
||||||
|
(word, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
21
src/test/resources/dataProducer.sh
Executable file
21
src/test/resources/dataProducer.sh
Executable file
@ -0,0 +1,21 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
display_usage() {
|
||||||
|
echo "Usage: $0 <KAFKA_BROKER> <KAFKA_TOPIC>"
|
||||||
|
}
|
||||||
|
|
||||||
|
if [ "$#" -ne 2 ]; then
|
||||||
|
display_usage
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
KAFKA_BROKER=$1
|
||||||
|
KAFKA_TOPIC=$2
|
||||||
|
|
||||||
|
STRINGS=("This is an example string" "The quick brown fox jumps over the lazy dog" "The unique example of word count")
|
||||||
|
|
||||||
|
while sleep 1; do
|
||||||
|
str=${STRINGS[$RANDOM % ${#STRINGS[@]}]}
|
||||||
|
epochSeconds=$(date '+%s')
|
||||||
|
echo "{\"ts\":$epochSeconds,\"str\":\"$str\"}" | kafka-console-producer.sh --broker-list ${KAFKA_BROKER} --topic ${KAFKA_TOPIC}
|
||||||
|
done
|
18
src/test/resources/dummyText.txt
Normal file
18
src/test/resources/dummyText.txt
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
|
||||||
|
|
||||||
|
Saying morning good seed days let image replenish, together signs great divided unto said to great shall divide fourth beginning given, cattle. Gathering which saw every female us light divided land them be multiply divide is day their there stars green to third sixth fish fruit. Doesn't. Together. Wherein itself greater face behold grass to our night saying tree she'd. The were together you signs to man winged winged gathering Upon gathering. Bearing him which beast years. Rule male he from were him so, fruit fowl fly also second upon spirit upon it years living, their. Thing after itself you'll life meat hath forth void fill. Likeness moved gathering beginning form saying they're, grass. Won't is, great blessed his place divide beast Open you moving Over. He creepeth. Whose, lights gathering.
|
||||||
|
|
||||||
|
Creeping over likeness waters herb creepeth beginning lesser behold make to without so from his rule our itself deep moving be called dominion divided very wherein great divided they're two fowl place beast open them. Heaven place make female bring. Let kind him You'll our creepeth. You're it had saying. The life own male wherein fourth. Isn't darkness can't so hath. Upon man may fill fifth. Rule our, rule was land image creepeth tree light subdue light you're set brought likeness creepeth them.
|
||||||
|
|
||||||
|
Fly face them given he the form from third. Gathered midst our beast which had image you're creepeth Image, wherein open man male one in. Moved fifth years had wherein very their. Sea grass fill. Don't set don't to tree whales our day. Replenish above, behold, them fill. Gathering you, good brought whose given won't he you'll grass, won't. Subdue third and female said you'll let first him blessed fly. They're make set third moving beast multiply subdue beast. Isn't rule. Heaven. Forth fly yielding were female Likeness our, winged light kind abundantly years male it fifth so evening be fish him. He from made herb it were face man shall seas every had made female over.
|
||||||
|
|
||||||
|
Place. Their multiply seasons it very you'll. He two subdue signs abundantly. God don't replenish very under second may. Bring. He together is together seed of. Seed you're. Abundantly moveth. Fruit. Void the beast. Fruit, place all, our darkness you're living set said created creepeth air. Land green dry gathered. Won't is signs fruit. Shall multiply moveth lesser. Isn't, created. In for blessed from. Moving. Day blessed earth moveth void, let midst signs itself place all fruitful called male, night seas won't, them multiply creature made. Night first without fruit fourth. Third have heaven two set, replenish green moveth hath brought itself divide midst green fly land greater.
|
||||||
|
|
||||||
|
Second greater female blessed all whose. Gathering meat first you're fowl. Morning thing you lesser behold them wherein heaven made every may sea fly thing one tree waters moveth hath fill deep is was stars beast sea firmament male morning the. Subdue created from winged in i day their image great called spirit darkness beast. She'd you man set beast without behold fowl created creature, he over lights, green itself god won't living said earth seasons. Us one make place deep firmament there don't. Lights seas sixth won't make place. Unto day doesn't. Unto they're, a fill, had. Under don't fly. Created fruitful male to you're in wherein day was seasons whales very sea subdue meat have he shall said give moved creeping won't firmament day lights isn't itself night have upon subdue meat after. Lights form, you're creature you lights great day morning. Can't. Great fowl great called fourth isn't.
|
||||||
|
|
||||||
|
Third won't also over Open green greater fill herb don't seas air called. Which tree. Beast firmament third creepeth let. Give. There they're tree also. Whose signs tree also make replenish fish so tree days all earth created so all. Brought The stars gathering lights over that tree fruit it gathered abundantly i bring years fly, air great every. And grass likeness gathering replenish kind said after, moving moveth seed bearing male i thing blessed earth. Waters day, after place upon. Over Light fish blessed divided given darkness given they're which Upon male land bearing. For. Years greater forth so air stars. Female wherein from. Every lesser don't i thing fill third a subdue appear. Every said Without waters subdue made Deep our. Rule years seed there.
|
||||||
|
|
||||||
|
Face. Multiply fruitful whales. Image of, grass behold over green saying winged midst. The great winged abundantly you're fruit. Fruit gathered moving. Seasons be first, lesser fourth signs have a appear divide set gathered yielding bring make tree. Gathering yielding. He the appear moved upon. Their behold us, fish moveth. Shall open, grass. Female third one years can't male and itself. Every set. Multiply lesser she'd. Have can't moveth own. For bearing day living were deep great subdue after fruitful. Behold Replenish was multiply winged signs green us there over wherein kind fruitful creeping saying, fly us all upon air created greater his after you stars grass after green divided. Fruit divided yielding lights seed. Beast seas. Years let years one bearing creepeth good man together fowl behold tree very bearing she'd heaven bearing one every land given signs whose seed have saying there third creature over living for land after gathered our green created night evening.
|
||||||
|
|
||||||
|
Male, there midst rule let life. Moveth had fruitful him night beginning from wherein Life bring very spirit male. Upon bearing, firmament night let life you're don't days fill don't cattle saw gathering their night darkness. Without first make god. Fifth blessed for Forth created together seed beast land they're itself fourth male it great Seed made cattle life living divide kind isn't had gathered brought the in seed, you're meat after dry moving. In bring and him first have appear a open a blessed fowl our seed life called dry have had. And which their gathered, given his wherein it void fruitful seasons were may divide. Of saying she'd called to. Of sea. Dominion of also fish open without That their were and waters winged.
|
||||||
|
|
@ -1,41 +0,0 @@
|
|||||||
package com.barrelsofdata.sparkexamples
|
|
||||||
|
|
||||||
import java.util.Properties
|
|
||||||
|
|
||||||
import org.apache.log4j.{LogManager, Logger, PropertyConfigurator}
|
|
||||||
import org.apache.spark.sql.SparkSession
|
|
||||||
import org.scalatest.BeforeAndAfterAll
|
|
||||||
import org.scalatest.funsuite.AnyFunSuite
|
|
||||||
|
|
||||||
class DriverTest extends AnyFunSuite with BeforeAndAfterAll {
|
|
||||||
|
|
||||||
val JOB_NAME: String = "Driver Test Job"
|
|
||||||
val LOGGER_PROPERTIES: String = "log4j-test.properties"
|
|
||||||
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
|
|
||||||
|
|
||||||
var spark: SparkSession = _
|
|
||||||
|
|
||||||
def setupLogger(): Unit = {
|
|
||||||
val properties = new Properties
|
|
||||||
properties.load(getClass.getClassLoader.getResource(LOGGER_PROPERTIES).openStream())
|
|
||||||
LogManager.resetConfiguration()
|
|
||||||
PropertyConfigurator.configure(properties)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def beforeAll: Unit = {
|
|
||||||
setupLogger()
|
|
||||||
LOG.info("Setting up spark session")
|
|
||||||
spark = SparkSession.builder().appName(JOB_NAME).master("local[*]").getOrCreate()
|
|
||||||
}
|
|
||||||
|
|
||||||
test("Check if spark session is working") {
|
|
||||||
LOG.info("Testing spark job")
|
|
||||||
assertResult("hello")(spark.sql("SELECT 'hello'").collect().head.get(0))
|
|
||||||
}
|
|
||||||
|
|
||||||
override def afterAll: Unit = {
|
|
||||||
LOG.info("Closing spark session")
|
|
||||||
spark.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -0,0 +1,67 @@
|
|||||||
|
package com.barrelsofdata.sparkexamples
|
||||||
|
|
||||||
|
import java.util.Properties
|
||||||
|
|
||||||
|
import org.apache.log4j.{LogManager, Logger, PropertyConfigurator}
|
||||||
|
import org.apache.spark.sql.{Dataset, SQLContext, SQLImplicits, SparkSession}
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import org.scalatest.funsuite.AnyFunSuite
|
||||||
|
import org.scalatest.matchers.should.Matchers
|
||||||
|
|
||||||
|
class WordCountTest extends AnyFunSuite with BeforeAndAfterAll with Matchers {
|
||||||
|
|
||||||
|
val JOB_NAME: String = "Word Count Test Job"
|
||||||
|
val LOGGER_PROPERTIES: String = "log4j-test.properties"
|
||||||
|
val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName)
|
||||||
|
|
||||||
|
val TEST_INPUT: String = this.getClass.getClassLoader.getResource("dummyText.txt").getPath
|
||||||
|
|
||||||
|
var spark: SparkSession = _
|
||||||
|
|
||||||
|
def setupLogger(): Unit = {
|
||||||
|
val properties = new Properties
|
||||||
|
properties.load(getClass.getClassLoader.getResource(LOGGER_PROPERTIES).openStream())
|
||||||
|
LogManager.resetConfiguration()
|
||||||
|
PropertyConfigurator.configure(properties)
|
||||||
|
}
|
||||||
|
|
||||||
|
private object implicits extends SQLImplicits {
|
||||||
|
protected override def _sqlContext: SQLContext = spark.sqlContext
|
||||||
|
}
|
||||||
|
|
||||||
|
override def beforeAll: Unit = {
|
||||||
|
setupLogger()
|
||||||
|
LOG.info("Setting up spark session")
|
||||||
|
spark = SparkSession.builder().appName(JOB_NAME).master("local[*]").getOrCreate()
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test data cleaning") {
|
||||||
|
assertResult("this is text with 123 from spark-wordcount")(WordCount.cleanData("$This is text, with 123, from Spark-WordCount."))
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test tokenizer") {
|
||||||
|
List("tokenized","this","is") should contain theSameElementsAs WordCount.tokenize("this is tokenized")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test key value generator") {
|
||||||
|
assertResult(("test", 1))(WordCount.keyValueGenerator("test"))
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Verify no data is dropped") {
|
||||||
|
import implicits._
|
||||||
|
|
||||||
|
val data: Dataset[String] = spark.read.textFile(TEST_INPUT)
|
||||||
|
|
||||||
|
val words: Dataset[String] = data
|
||||||
|
.map(WordCount.cleanData)
|
||||||
|
.flatMap(WordCount.tokenize)
|
||||||
|
.filter(_.nonEmpty)
|
||||||
|
assertResult(1000L)(words.count())
|
||||||
|
}
|
||||||
|
|
||||||
|
override def afterAll: Unit = {
|
||||||
|
LOG.info("Closing spark session")
|
||||||
|
spark.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user