diff --git a/README.md b/README.md index c1d5848..2750a82 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# Spark Boilerplate -This is a boilerplate project for Apache Spark. The related blog post can be found at [https://www.barrelsofdata.com/spark-boilerplate-using-scala](https://www.barrelsofdata.com/spark-boilerplate-using-scala) +# Spark Word Count with Unit Tests +This is a project detailing how to write word count program in Apache Spark along with unit test cases. The related blog post can be found at [https://www.barrelsofdata.com/spark-word-count-with-unit-tests](https://www.barrelsofdata.com/spark-word-count-with-unit-tests) ## Build instructions From the root of the project execute the below commands @@ -22,5 +22,5 @@ From the root of the project execute the below commands ## Run ```shell script -spark-submit --master yarn --deploy-mode cluster build/libs/spark-boilerplate-1.0.jar +spark-submit --master yarn --deploy-mode cluster build/libs/spark-wordcount-1.0.jar hdfs://path/to/input/file.txt hdfs://path/to/output/directory ``` \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index d63964b..82b7c9f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'spark-boilerplate' +rootProject.name = 'spark-wordcount' diff --git a/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala b/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala index 3b323b2..16321ca 100644 --- a/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala +++ b/src/main/scala/com/barrelsofdata/sparkexamples/Driver.scala @@ -1,21 +1,43 @@ package com.barrelsofdata.sparkexamples import org.apache.log4j.Logger -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object Driver { - val JOB_NAME: String = "Boilerplate" + val JOB_NAME: String = "Word Count" val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) + def run(spark: SparkSession, inputFilePath: String, outputFilePath: String): Unit = { + import spark.implicits._ + + val data: Dataset[String] = spark.read.textFile(inputFilePath) + + val words: Dataset[String] = data + .map(WordCount.cleanData) + .flatMap(WordCount.tokenize) + .filter(_.nonEmpty) + + val wordFrequencies: DataFrame = words + .map(WordCount.keyValueGenerator) + .rdd.reduceByKey(_ + _) + .toDF("word", "frequency") + + wordFrequencies.write.option("header","true").csv(outputFilePath) + LOG.info(s"Result successfully written to $outputFilePath") + } + def main(args: Array[String]): Unit = { - + if(args.length != 2) { + println("Invalid usage") + println("Usage: spark-submit --master spark-wordcount-1.0.jar /path/to/input/file.txt /path/to/output/directory") + LOG.error(s"Invalid number of arguments, arguments given: [${args.mkString(",")}]") + System.exit(1) + } val spark: SparkSession = SparkSession.builder().appName(JOB_NAME).getOrCreate() - spark.sql("SELECT 'hello' AS col1").show() - LOG.info("Dummy info message") - LOG.warn("Dummy warn message") - LOG.error("Dummy error message") + run(spark, args(0), args(1)) + } } diff --git a/src/main/scala/com/barrelsofdata/sparkexamples/WordCount.scala b/src/main/scala/com/barrelsofdata/sparkexamples/WordCount.scala new file mode 100644 index 0000000..67c1f17 --- /dev/null +++ b/src/main/scala/com/barrelsofdata/sparkexamples/WordCount.scala @@ -0,0 +1,26 @@ +package com.barrelsofdata.sparkexamples + +import org.apache.log4j.Logger + +object WordCount extends Serializable { + + val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) + + def cleanData(line: String): String = { + line + .toLowerCase + .replaceAll("[,.]"," ") + .replaceAll("[^a-z0-9\\s-]","") + .replaceAll("\\s+"," ") + .trim + } + + def tokenize(line: String): List[String] = { + line.split("\\s").toList + } + + def keyValueGenerator(word: String): (String, Int) = { + (word, 1) + } + +} diff --git a/src/test/resources/dummyText.txt b/src/test/resources/dummyText.txt new file mode 100644 index 0000000..a9a7743 --- /dev/null +++ b/src/test/resources/dummyText.txt @@ -0,0 +1,18 @@ + + +Saying morning good seed days let image replenish, together signs great divided unto said to great shall divide fourth beginning given, cattle. Gathering which saw every female us light divided land them be multiply divide is day their there stars green to third sixth fish fruit. Doesn't. Together. Wherein itself greater face behold grass to our night saying tree she'd. The were together you signs to man winged winged gathering Upon gathering. Bearing him which beast years. Rule male he from were him so, fruit fowl fly also second upon spirit upon it years living, their. Thing after itself you'll life meat hath forth void fill. Likeness moved gathering beginning form saying they're, grass. Won't is, great blessed his place divide beast Open you moving Over. He creepeth. Whose, lights gathering. + +Creeping over likeness waters herb creepeth beginning lesser behold make to without so from his rule our itself deep moving be called dominion divided very wherein great divided they're two fowl place beast open them. Heaven place make female bring. Let kind him You'll our creepeth. You're it had saying. The life own male wherein fourth. Isn't darkness can't so hath. Upon man may fill fifth. Rule our, rule was land image creepeth tree light subdue light you're set brought likeness creepeth them. + +Fly face them given he the form from third. Gathered midst our beast which had image you're creepeth Image, wherein open man male one in. Moved fifth years had wherein very their. Sea grass fill. Don't set don't to tree whales our day. Replenish above, behold, them fill. Gathering you, good brought whose given won't he you'll grass, won't. Subdue third and female said you'll let first him blessed fly. They're make set third moving beast multiply subdue beast. Isn't rule. Heaven. Forth fly yielding were female Likeness our, winged light kind abundantly years male it fifth so evening be fish him. He from made herb it were face man shall seas every had made female over. + +Place. Their multiply seasons it very you'll. He two subdue signs abundantly. God don't replenish very under second may. Bring. He together is together seed of. Seed you're. Abundantly moveth. Fruit. Void the beast. Fruit, place all, our darkness you're living set said created creepeth air. Land green dry gathered. Won't is signs fruit. Shall multiply moveth lesser. Isn't, created. In for blessed from. Moving. Day blessed earth moveth void, let midst signs itself place all fruitful called male, night seas won't, them multiply creature made. Night first without fruit fourth. Third have heaven two set, replenish green moveth hath brought itself divide midst green fly land greater. + +Second greater female blessed all whose. Gathering meat first you're fowl. Morning thing you lesser behold them wherein heaven made every may sea fly thing one tree waters moveth hath fill deep is was stars beast sea firmament male morning the. Subdue created from winged in i day their image great called spirit darkness beast. She'd you man set beast without behold fowl created creature, he over lights, green itself god won't living said earth seasons. Us one make place deep firmament there don't. Lights seas sixth won't make place. Unto day doesn't. Unto they're, a fill, had. Under don't fly. Created fruitful male to you're in wherein day was seasons whales very sea subdue meat have he shall said give moved creeping won't firmament day lights isn't itself night have upon subdue meat after. Lights form, you're creature you lights great day morning. Can't. Great fowl great called fourth isn't. + +Third won't also over Open green greater fill herb don't seas air called. Which tree. Beast firmament third creepeth let. Give. There they're tree also. Whose signs tree also make replenish fish so tree days all earth created so all. Brought The stars gathering lights over that tree fruit it gathered abundantly i bring years fly, air great every. And grass likeness gathering replenish kind said after, moving moveth seed bearing male i thing blessed earth. Waters day, after place upon. Over Light fish blessed divided given darkness given they're which Upon male land bearing. For. Years greater forth so air stars. Female wherein from. Every lesser don't i thing fill third a subdue appear. Every said Without waters subdue made Deep our. Rule years seed there. + +Face. Multiply fruitful whales. Image of, grass behold over green saying winged midst. The great winged abundantly you're fruit. Fruit gathered moving. Seasons be first, lesser fourth signs have a appear divide set gathered yielding bring make tree. Gathering yielding. He the appear moved upon. Their behold us, fish moveth. Shall open, grass. Female third one years can't male and itself. Every set. Multiply lesser she'd. Have can't moveth own. For bearing day living were deep great subdue after fruitful. Behold Replenish was multiply winged signs green us there over wherein kind fruitful creeping saying, fly us all upon air created greater his after you stars grass after green divided. Fruit divided yielding lights seed. Beast seas. Years let years one bearing creepeth good man together fowl behold tree very bearing she'd heaven bearing one every land given signs whose seed have saying there third creature over living for land after gathered our green created night evening. + +Male, there midst rule let life. Moveth had fruitful him night beginning from wherein Life bring very spirit male. Upon bearing, firmament night let life you're don't days fill don't cattle saw gathering their night darkness. Without first make god. Fifth blessed for Forth created together seed beast land they're itself fourth male it great Seed made cattle life living divide kind isn't had gathered brought the in seed, you're meat after dry moving. In bring and him first have appear a open a blessed fowl our seed life called dry have had. And which their gathered, given his wherein it void fruitful seasons were may divide. Of saying she'd called to. Of sea. Dominion of also fish open without That their were and waters winged. + diff --git a/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala b/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala deleted file mode 100644 index 4fe948a..0000000 --- a/src/test/scala/com/barrelsofdata/sparkexamples/DriverTest.scala +++ /dev/null @@ -1,41 +0,0 @@ -package com.barrelsofdata.sparkexamples - -import java.util.Properties - -import org.apache.log4j.{LogManager, Logger, PropertyConfigurator} -import org.apache.spark.sql.SparkSession -import org.scalatest.BeforeAndAfterAll -import org.scalatest.funsuite.AnyFunSuite - -class DriverTest extends AnyFunSuite with BeforeAndAfterAll { - - val JOB_NAME: String = "Driver Test Job" - val LOGGER_PROPERTIES: String = "log4j-test.properties" - val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) - - var spark: SparkSession = _ - - def setupLogger(): Unit = { - val properties = new Properties - properties.load(getClass.getClassLoader.getResource(LOGGER_PROPERTIES).openStream()) - LogManager.resetConfiguration() - PropertyConfigurator.configure(properties) - } - - override def beforeAll: Unit = { - setupLogger() - LOG.info("Setting up spark session") - spark = SparkSession.builder().appName(JOB_NAME).master("local[*]").getOrCreate() - } - - test("Check if spark session is working") { - LOG.info("Testing spark job") - assertResult("hello")(spark.sql("SELECT 'hello'").collect().head.get(0)) - } - - override def afterAll: Unit = { - LOG.info("Closing spark session") - spark.close() - } - -} \ No newline at end of file diff --git a/src/test/scala/com/barrelsofdata/sparkexamples/WordCountTest.scala b/src/test/scala/com/barrelsofdata/sparkexamples/WordCountTest.scala new file mode 100644 index 0000000..fe032c1 --- /dev/null +++ b/src/test/scala/com/barrelsofdata/sparkexamples/WordCountTest.scala @@ -0,0 +1,67 @@ +package com.barrelsofdata.sparkexamples + +import java.util.Properties + +import org.apache.log4j.{LogManager, Logger, PropertyConfigurator} +import org.apache.spark.sql.{Dataset, SQLContext, SQLImplicits, SparkSession} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class WordCountTest extends AnyFunSuite with BeforeAndAfterAll with Matchers { + + val JOB_NAME: String = "Word Count Test Job" + val LOGGER_PROPERTIES: String = "log4j-test.properties" + val LOG: Logger = Logger.getLogger(this.getClass.getCanonicalName) + + val TEST_INPUT: String = this.getClass.getClassLoader.getResource("dummyText.txt").getPath + + var spark: SparkSession = _ + + def setupLogger(): Unit = { + val properties = new Properties + properties.load(getClass.getClassLoader.getResource(LOGGER_PROPERTIES).openStream()) + LogManager.resetConfiguration() + PropertyConfigurator.configure(properties) + } + + private object implicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = spark.sqlContext + } + + override def beforeAll: Unit = { + setupLogger() + LOG.info("Setting up spark session") + spark = SparkSession.builder().appName(JOB_NAME).master("local[*]").getOrCreate() + } + + test("Test data cleaning") { + assertResult("this is text with 123 from spark-wordcount")(WordCount.cleanData("$This is text, with 123, from Spark-WordCount.")) + } + + test("Test tokenizer") { + List("tokenized","this","is") should contain theSameElementsAs WordCount.tokenize("this is tokenized") + } + + test("Test key value generator") { + assertResult(("test", 1))(WordCount.keyValueGenerator("test")) + } + + test("Verify no data is dropped") { + import implicits._ + + val data: Dataset[String] = spark.read.textFile(TEST_INPUT) + + val words: Dataset[String] = data + .map(WordCount.cleanData) + .flatMap(WordCount.tokenize) + .filter(_.nonEmpty) + assertResult(1000L)(words.count()) + } + + override def afterAll: Unit = { + LOG.info("Closing spark session") + spark.close() + } + +}