commit 6ce4437766e3cc0ea782015bd75d39f843dd3341 Author: karthik Date: Sat Jul 22 20:12:54 2023 +0530 Add sensor pipeline diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..0005e1b --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +# Compiled classes +*.class +# Gradle files +.gradle +# IntelliJ IDEA files +.idea +# Build files +build \ No newline at end of file diff --git a/README.md b/README.md new file mode 100755 index 0000000..b309798 --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# Apache Beam Examples +This project holds all the examples of apache beam that are detailed on my blog at [https://barrelsofdata.com](https://barrelsofdata.com) + +## Build instructions +This is a multi-module project and uses version catalogs. The dependencies can we viewed at [/barrelsofdata/apache-beam-examples/src/branch/main/gradle/libs.versions.toml](./gradle/libs.versions.toml) + +To build project, execute the following commands from the project root +- To clear all compiled classes, build and log directories +```shell script +./gradlew clean +``` +- To run unit tests +```shell script +./gradlew test +``` +- To run integration tests +```shell script +./gradlew integrationTest +``` +- To build jar +```shell script +./gradlew build +``` + +## Run +Ensure you have kafka running ([kafka tutorial](https://barrelsofdata.com/apache-kafka-setup)) and a postgres sql database running. + +- Create table +```sql +CREATE TABLE (id VARCHAR(255), ts TIMESTAMP, computed DOUBLE); +``` +- Start streaming job on direct runner +```shell script +java -jar streaming/build/libs/streaming.jar \ + --runner=DirectRunner \ + --kafkaBrokers= \ + --kafkaTopic= \ + --kafkaConsumerGroupId= \ + --sqlDriver=org.postgresql.Driver \ + --jdbcUrl=jdbc:postgresql:/// \ + --table= \ + --username= \ + --password= \ + --resetToEarliest +``` +- You can feed simulated data to the kafka topic with the key being a string (used as unique id of a sensor) and value being a json string of the following schema +```json +{"ts":,"value":} +``` \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts new file mode 100755 index 0000000..a9966bc --- /dev/null +++ b/build.gradle.kts @@ -0,0 +1,6 @@ +plugins { + java +} + +project.group = "com.barrelsofdata" +project.version = "1.0.0" \ No newline at end of file diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts new file mode 100755 index 0000000..75e3c81 --- /dev/null +++ b/buildSrc/build.gradle.kts @@ -0,0 +1,11 @@ +repositories { + gradlePluginPortal() +} + +plugins { + `kotlin-dsl` +} + +dependencies { + implementation(files(libs.javaClass.superclass.protectionDomain.codeSource.location)) +} \ No newline at end of file diff --git a/buildSrc/settings.gradle.kts b/buildSrc/settings.gradle.kts new file mode 100755 index 0000000..858ff6d --- /dev/null +++ b/buildSrc/settings.gradle.kts @@ -0,0 +1,20 @@ +pluginManagement { + repositories { + mavenCentral() + gradlePluginPortal() + } +} + +dependencyResolutionManagement { + repositories { + mavenCentral() + } +} + +dependencyResolutionManagement { + versionCatalogs { + create("libs") { + from(files("../gradle/libs.versions.toml")) + } + } +} \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/com/barrelsofdata/project-conventions.gradle.kts b/buildSrc/src/main/kotlin/com/barrelsofdata/project-conventions.gradle.kts new file mode 100755 index 0000000..5550b9c --- /dev/null +++ b/buildSrc/src/main/kotlin/com/barrelsofdata/project-conventions.gradle.kts @@ -0,0 +1,62 @@ +package com.barrelsofdata; + +import org.gradle.accessors.dm.LibrariesForLibs +val libs = the() + +plugins { + java +} + +configurations { + implementation { + resolutionStrategy.failOnVersionConflict() + } +} + +val integrationTest by sourceSets.creating +configurations[integrationTest.implementationConfigurationName].extendsFrom(configurations.testImplementation.get()) +configurations[integrationTest.runtimeOnlyConfigurationName].extendsFrom(configurations.testRuntimeOnly.get()) + +tasks.test { + useJUnitPlatform { + filter { + includeTestsMatching("*Test") + excludeTestsMatching("*IT") + } + } +} + +val integrationTestTask = tasks.register("integrationTest") { + description = "Runs integration tests" + group = "verification" + useJUnitPlatform { + filter { + includeTestsMatching("*IT") + excludeTestsMatching("*Test") + } + } + testClassesDirs = integrationTest.output.classesDirs + classpath = configurations[integrationTest.runtimeClasspathConfigurationName] + integrationTest.output +} + +tasks.check { + dependsOn(integrationTestTask) +} + +tasks.withType().configureEach { + maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).coerceAtLeast(1) +} + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(libs.versions.java.get()) + vendor = JvmVendorSpec.ADOPTIUM + } +} + +tasks.withType { + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + doFirst { + from (configurations.runtimeClasspath.get().map { if (it.isDirectory()) it else zipTree(it) }) + } +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties new file mode 100755 index 0000000..0ea1212 --- /dev/null +++ b/gradle.properties @@ -0,0 +1,7 @@ +org.gradle.caching=true +org.gradle.configureondemand=true +org.gradle.daemon=false +org.gradle.jvmargs=-Xms256m -Xmx2048m -XX:MaxMetaspaceSize=512m -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8 +org.gradle.parallel=true +org.gradle.warning.mode=all +org.gradle.welcome=never \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100755 index 0000000..660e2cb --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,27 @@ +[versions] +apache-beam = "2.49.0" +apache-kafka = "3.3.2" +h2-database = "2.2.220" +hamcrest = "2.2" +jackson = "2.15.2" +java = "17" +junit-jupiter = "5.9.3" +postgresql-driver = "42.6.0" +spring-kafka-test = "3.0.9" + +[libraries] +beam-core = { module = "org.apache.beam:beam-sdks-java-core", version.ref = "apache-beam" } +beam-direct-runner = { module = "org.apache.beam:beam-runners-direct-java", version.ref = "apache-beam" } +beam-jdbc-io = { module = "org.apache.beam:beam-sdks-java-io-jdbc", version.ref = "apache-beam" } +beam-kafka-io = { module = "org.apache.beam:beam-sdks-java-io-kafka", version.ref = "apache-beam" } +h2 = { module = "com.h2database:h2", version.ref = "h2-database" } +hamcrest = { module = "org.hamcrest:hamcrest", version.ref = "hamcrest" } +jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson" } +jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit-jupiter" } +jupiter-migration-support = { module = "org.junit.jupiter:junit-jupiter-migrationsupport", version.ref = "junit-jupiter" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "apache-kafka" } +postgres-driver = { module = "org.postgresql:postgresql", version.ref = "postgresql-driver" } +spring-kafka-test = { module = "org.springframework.kafka:spring-kafka-test", version.ref = "spring-kafka-test" } + +[bundles] +beam-java = ["beam-core", "beam-direct-runner"] \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100755 index 0000000..033e24c Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100755 index 0000000..ae4c681 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..620a176 --- /dev/null +++ b/gradlew @@ -0,0 +1,248 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100755 index 0000000..6689b85 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,92 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle.kts b/settings.gradle.kts new file mode 100755 index 0000000..265a65e --- /dev/null +++ b/settings.gradle.kts @@ -0,0 +1,19 @@ +pluginManagement { + repositories { + mavenCentral() + gradlePluginPortal() + } +} + +dependencyResolutionManagement { + repositories { + mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven/") + } + } +} + +rootProject.name = "apache-beam-examples" + +include(":streaming") \ No newline at end of file diff --git a/streaming/build.gradle.kts b/streaming/build.gradle.kts new file mode 100755 index 0000000..3cb962e --- /dev/null +++ b/streaming/build.gradle.kts @@ -0,0 +1,18 @@ +plugins { + id("com.barrelsofdata.project-conventions") +} + +dependencies { + implementation(libs.bundles.beam.java) + implementation(libs.beam.jdbc.io) + implementation(libs.beam.kafka.io) + implementation(libs.jackson.core) + implementation(libs.kafka.clients) + implementation(libs.postgres.driver) + + testImplementation(libs.jupiter) + testImplementation(libs.jupiter.migration.support) // Apache beam still uses jUnit 4 + testImplementation(libs.h2) + testImplementation(libs.hamcrest) + testImplementation(libs.spring.kafka.test) +} \ No newline at end of file diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/Application.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/Application.java new file mode 100755 index 0000000..2382524 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/Application.java @@ -0,0 +1,20 @@ +package com.barrelsofdata.examples.beam.streaming.sensor; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions; +import com.barrelsofdata.examples.beam.streaming.sensor.pipeline.SensorAnalytics; +import com.barrelsofdata.examples.beam.streaming.sensor.util.PipelineOptionsBuilder; +import org.apache.beam.sdk.Pipeline; + +/** + * The main application class that triggers the pipeline run + */ +public class Application { + + public static void main(String[] args) { + SensorPipelineOptions pipelineOptions + = PipelineOptionsBuilder.from(args, SensorPipelineOptions.class); + + Pipeline pipeline = Pipeline.create(pipelineOptions); + SensorAnalytics.from(pipeline, pipelineOptions).run(); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/SensorPipelineOptions.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/SensorPipelineOptions.java new file mode 100755 index 0000000..ebf8988 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/SensorPipelineOptions.java @@ -0,0 +1,58 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.config; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation; + +public interface SensorPipelineOptions extends PipelineOptions { + + @Description("Comma separated list of kafka brokers (host:port)") + @Validation.Required + String getKafkaBrokers(); + void setKafkaBrokers(String kafkaBrokers); + + @Description("Kafka topic to read from") + @Validation.Required + String getKafkaTopic(); + void setKafkaTopic(String kafkaTopic); + + @Description("Kafka consumer group id to read from") + @Validation.Required + String getKafkaConsumerGroupId(); + void setKafkaConsumerGroupId(String kafkaConsumerGroupId); + + @Description("Set kafka auto offset reset to earliest (default: latest)") + @Default.Boolean(false) + boolean isResetToEarliest(); + void setResetToEarliest(boolean resetToEarliest); + + @Description("Kafka max read duration in minutes (Only used for testing)") + Integer getKafkaMaxReadDurationMinutes(); + void setKafkaMaxReadDurationMinutes(Integer kafkaMaxReadDurationMinutes); + + @Description("Sql driver to be used for jdbc") + @Validation.Required + String getSqlDriver(); + void setSqlDriver(String sqlDriver); + + @Description("Target jdbc url to connect to") + @Validation.Required + String getJdbcUrl(); + void setJdbcUrl(String jdbcUrl); + + @Description("Target table to write to") + @Validation.Required + String getTable(); + void setTable(String table); + + @Description("Username to access the jdbc data target") + @Validation.Required + String getUsername(); + void setUsername(String username); + + @Description("Password of the provided user") + @Validation.Required + String getPassword(); + void setPassword(String table); +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/Steps.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/Steps.java new file mode 100755 index 0000000..1e7f8ce --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/config/Steps.java @@ -0,0 +1,26 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.config; + +public enum Steps { + SOURCE("Source"), + READ_KAFKA("Read kafka"), + PARSE("Parse"), + PARSE_EVENT("Parse events"), + ATTACH_TIMESTAMP("Attach event timestamps"), + PROCESS_DATA("Process data"), + EXTRACT_KEY("Extract key"), + ADD_WINDOWS("Add windows"), + COMPUTE_AVERAGE("Compute average"), + REMOVE_KEY("Remove key"), + TARGET("Target"), + JDBC_WRITE("Write to JDBC"); + + private final String step; + + private Steps(String step) { + this.step = step; + } + + public String getStep() { + return step; + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/Average.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/Average.java new file mode 100755 index 0000000..34a5f55 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/Average.java @@ -0,0 +1,55 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.apache.beam.sdk.transforms.Combine; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.StreamSupport; + +/** + * Combiner function that emits a {@link ComputedEvent} by computing averages of values + * from a group of {@link RawEvent} + */ +public class Average extends Combine.CombineFn, ComputedEvent> { + @Override + public ArrayList createAccumulator() { + // Initializes an accumulator at worker level + return new ArrayList<>(); + } + + @Override + public ArrayList addInput(ArrayList accumulator, RawEvent input) { + // Adds data into accumulator, at worker level + accumulator.add(input); + return accumulator; + } + + @Override + public ArrayList mergeAccumulators(Iterable> accumulators) { + // Merging accumulators from all workers into one + List flattened = StreamSupport.stream(accumulators.spliterator(), true).flatMap(ArrayList::stream).toList(); + return new ArrayList<>(flattened); + } + + @Override + public ComputedEvent extractOutput(ArrayList accumulator) { + // Aggregating and emitting one result + + // Get id and timestamp of last event to use as timestamp of the new event + // Getting id here make it easy for us to throwaway the grouping keys later + Collections.sort(accumulator); + RawEvent lastRawEventInWindow = accumulator.get(accumulator.size() - 1); + String id = lastRawEventInWindow.id(); + Long timestamp = lastRawEventInWindow.data().ts(); + + Double average = accumulator.stream() + .mapToDouble(ev -> ev.data().value()) + .average() + .orElse(-1); + + return new ComputedEvent(id, timestamp, average); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractKeys.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractKeys.java new file mode 100755 index 0000000..eabb797 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractKeys.java @@ -0,0 +1,19 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.apache.beam.sdk.transforms.SimpleFunction; + +/** + * Extracts id as keys from {@link RawEvent} + */ +public class ExtractKeys extends SimpleFunction { + + @Override + public String apply(RawEvent input) { + return input.id(); + } + + public static ExtractKeys of() { + return new ExtractKeys(); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestamp.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestamp.java new file mode 100755 index 0000000..e60dc18 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestamp.java @@ -0,0 +1,21 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.joda.time.Instant; + +/** + * Extracts timestamp of {@link RawEvent} through the {@link com.barrelsofdata.examples.beam.streaming.sensor.model.Data} object + */ +public class ExtractTimestamp extends SimpleFunction { + private ExtractTimestamp(){} + + @Override + public Instant apply(RawEvent input) { + return Instant.ofEpochMilli(input.data().ts()); + } + + public static ExtractTimestamp of() { + return new ExtractTimestamp(); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilder.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilder.java new file mode 100755 index 0000000..30829ca --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilder.java @@ -0,0 +1,30 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.WithFailures; + +import java.util.Objects; + +/** + * Builder that builds the {@link FailedEvent} + */ +public class FailedEventBuilder extends SimpleFunction, FailedEvent> { + + private final String step; + + private FailedEventBuilder(String step) { + this.step = step; + } + + @Override + public FailedEvent apply(WithFailures.ExceptionElement input) { + if(input != null) + return new FailedEvent(step, Objects.toString(input.element(), null), input.exception()); + return new FailedEvent(step, null, null); + } + + public static FailedEventBuilder of(String step) { + return new FailedEventBuilder<>(step); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEvents.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEvents.java new file mode 100755 index 0000000..d13d053 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEvents.java @@ -0,0 +1,44 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.Data; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; + +import java.util.Objects; + +/** + * Parses kafka KV into {@link RawEvent} + * Kafka key is used as id + * Kafka value is used as {@link Data} + * Throws {@link RuntimeException} on failed parsing + */ +public class ParseEvents extends SimpleFunction, RawEvent> { + + private JsonMapper jsonMapper; + + private ParseEvents(){} + + private ParseEvents(JsonMapper jsonMapper) { + this.jsonMapper = jsonMapper; + } + + @Override + public RawEvent apply(KV input) { + try { + String userId = Objects.requireNonNull(input).getKey(); + String eventDataJson = input.getValue(); + + Data data = jsonMapper.readValue(eventDataJson, Data.class); + return new RawEvent(userId, data); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static ParseEvents of(JsonMapper jsonMapper) { + return new ParseEvents(jsonMapper); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/ComputedEvent.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/ComputedEvent.java new file mode 100755 index 0000000..25bda85 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/ComputedEvent.java @@ -0,0 +1,5 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.model; + +import java.io.Serializable; + +public record ComputedEvent(String id, Long ts, Double value) implements Serializable {} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/Data.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/Data.java new file mode 100755 index 0000000..48965f6 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/Data.java @@ -0,0 +1,7 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; + +public record Data(@JsonProperty(required = true) Long ts, Double value) implements Serializable {} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/FailedEvent.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/FailedEvent.java new file mode 100755 index 0000000..19c1b81 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/FailedEvent.java @@ -0,0 +1,5 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.model; + +import java.io.Serializable; + +public record FailedEvent(String step, String event, Throwable exception) implements Serializable {} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/RawEvent.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/RawEvent.java new file mode 100755 index 0000000..9132242 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/model/RawEvent.java @@ -0,0 +1,10 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.model; + +import java.io.Serializable; + +public record RawEvent(String id, Data data) implements Comparable, Serializable { + @Override + public int compareTo(RawEvent rawEvent) { + return (int) (this.data().ts() - rawEvent.data().ts()); + } +} \ No newline at end of file diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/pipeline/SensorAnalytics.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/pipeline/SensorAnalytics.java new file mode 100755 index 0000000..e8e6ae1 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/pipeline/SensorAnalytics.java @@ -0,0 +1,64 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.pipeline; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions; +import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps; +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.target.JDBCTarget; +import com.barrelsofdata.examples.beam.streaming.sensor.transform.ComputeAverage; +import com.barrelsofdata.examples.beam.streaming.sensor.source.KafkaSource; +import com.barrelsofdata.examples.beam.streaming.sensor.transform.ParseWithTimestamp; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; + +/** + * This class builds the sensor pipeline by applying various transforms + */ +public class SensorAnalytics { + + private static final Long WINDOW_DURATION_SEC = 180L; // 3 minutes + private static final Long WINDOW_FREQUENCY_SEC = 60L; // Every 1 minute + private static final Long ALLOWED_LATENESS_SEC = 120L; // 2 minutes + + private SensorAnalytics() {}; + + /** + * Builds the pipeline by consuming the passed pipeline options + * @param pipeline Pipeline object to apply the transforms to + * @param options Configuration to be used for the pipeline + */ + public static Pipeline from(Pipeline pipeline, SensorPipelineOptions options) { + + KafkaSource kafkaSource = new KafkaSource( + options.getKafkaBrokers(), + options.getKafkaTopic(), + options.getKafkaConsumerGroupId(), + options.isResetToEarliest()); + + // Convert o bounded source - Only for testing + Integer kafkaMaxReadDuration = options.getKafkaMaxReadDurationMinutes(); + if(kafkaMaxReadDuration != null) + kafkaSource.withMaxReadTime(Duration.standardMinutes(kafkaMaxReadDuration)); + + JDBCTarget jdbcTarget = new JDBCTarget( + options.getSqlDriver(), + options.getJdbcUrl(), + options.getTable(), + options.getUsername(), + options.getPassword()); + + PCollection> readFromSource = pipeline.apply(Steps.SOURCE.getStep(), + kafkaSource); + PCollection parsed = readFromSource.apply(Steps.PARSE.getStep(), + new ParseWithTimestamp()); + PCollection averaged = parsed.apply(Steps.PROCESS_DATA.getStep(), + new ComputeAverage(WINDOW_DURATION_SEC, WINDOW_FREQUENCY_SEC, ALLOWED_LATENESS_SEC)); + PDone writeToTarget = averaged.apply(Steps.TARGET.getStep(), + jdbcTarget); + + return writeToTarget.getPipeline(); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/source/KafkaSource.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/source/KafkaSource.java new file mode 100755 index 0000000..8551cf5 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/source/KafkaSource.java @@ -0,0 +1,62 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.source; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.joda.time.Duration; + +import java.util.Map; + +/** + * PTransform that connects to kafka and reads from the given topic + */ +public class KafkaSource extends PTransform>> { + + private final String brokers; + private final String topic; + private final String consumerGroup; + private final Boolean resetToEarliest; + + private Duration maxReadTime; + + public KafkaSource(String brokers, String topic, String consumerGroup, Boolean resetToEarliest) { + this.brokers = brokers; + this.topic = topic; + this.consumerGroup = consumerGroup; + this.resetToEarliest = resetToEarliest; + } + + /** + * Sets the amount of time to keep reading from kafka + * Only used in testing to convert unbounded source to bounded source + * @param maxReadTime Duration to read data + */ + public void withMaxReadTime(Duration maxReadTime) { + this.maxReadTime = maxReadTime; + } + + @Override + public PCollection> expand(PInput input) { + Pipeline pipeline = input.getPipeline(); + KafkaIO.Read kafkaIo = KafkaIO.read() + .withBootstrapServers(brokers) + .withTopic(topic) + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withConsumerConfigUpdates(Map.of( + "group.id", consumerGroup, + "auto.offset.reset", resetToEarliest ? "earliest" : "latest")) + .commitOffsetsInFinalize() + .withLogAppendTime(); // Do not use timestamp from the event yet + + if(maxReadTime != null) + kafkaIo = kafkaIo.withMaxReadTime(maxReadTime); + + return pipeline.apply(Steps.READ_KAFKA.getStep(), kafkaIo.withoutMetadata()); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTarget.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTarget.java new file mode 100755 index 0000000..2c40825 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTarget.java @@ -0,0 +1,46 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.target; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps; +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import org.apache.beam.sdk.io.jdbc.JdbcIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import java.sql.Date; +import java.sql.Timestamp; + +/** + * PTransform that writes the {@link ComputedEvent} into a target table through a jdbc connection + */ +public class JDBCTarget extends PTransform, PDone> { + + private final String driver; + private final String jdbcUrl; + private final String table; + private final String username; + private final String password; + + public JDBCTarget(String driver, String jdbcUrl, String table, String username, String password) { + this.driver = driver; + this.jdbcUrl = jdbcUrl; + this.table = table; + this.username = username; + this.password = password; + } + + @Override + public PDone expand(PCollection input) { + return input.apply(Steps.JDBC_WRITE.getStep(), JdbcIO.write() + .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration + .create(driver, jdbcUrl) + .withUsername(username) + .withPassword(password)) + .withStatement("INSERT INTO %s VALUES(?, ?, ?)".formatted(table)) // Use Merge if you want to avoid duplicates + .withPreparedStatementSetter((JdbcIO.PreparedStatementSetter) (element, query) -> { + query.setString(1, element.id()); + query.setTimestamp(2, new Timestamp(element.ts())); + query.setDouble(3, element.value()); + })); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverage.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverage.java new file mode 100755 index 0000000..34a4c50 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverage.java @@ -0,0 +1,58 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.transform; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps; +import com.barrelsofdata.examples.beam.streaming.sensor.function.Average; +import com.barrelsofdata.examples.beam.streaming.sensor.function.ExtractKeys; +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * PTransform that applies sliding windows and computes average from a group of @{@link RawEvent} + * id from the event is used for grouping the data + * Emits a {@link ComputedEvent} for every window group + * id and timestamp for the emitted are fetched from the last event in the group + */ +public class ComputeAverage extends PTransform, PCollection> { + + private final Long windowDurationSeconds; + private final Long windowFrequencySeconds; + private final Long allowedLatenessSeconds; + private final Window window; + + public ComputeAverage(Long windowDurationSeconds, Long windowFrequencySeconds, Long allowedLatenessSeconds) { + this.windowDurationSeconds = windowDurationSeconds; + this.windowFrequencySeconds = windowFrequencySeconds; + this.allowedLatenessSeconds = allowedLatenessSeconds; + this.window = configureWindow(); + } + + @Override + public PCollection expand(PCollection input) { + + PCollection windowed = input.apply(Steps.ADD_WINDOWS.getStep(), window); + return windowed + .apply(Steps.EXTRACT_KEY.getStep(), WithKeys.of(new ExtractKeys())) + .apply(Steps.COMPUTE_AVERAGE.getStep(), Combine.perKey(new Average())) + .apply(Steps.REMOVE_KEY.getStep(), Values.create()); + } + + private Window configureWindow() { + return Window.into(SlidingWindows + .of(Duration.standardSeconds(windowDurationSeconds)) + .every(Duration.standardSeconds(windowFrequencySeconds))) + .withAllowedLateness(Duration.standardSeconds(allowedLatenessSeconds)) + .accumulatingFiredPanes() + .triggering(AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterPane.elementCountAtLeast(1))); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestamp.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestamp.java new file mode 100755 index 0000000..f9e5595 --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestamp.java @@ -0,0 +1,41 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.transform; + +import com.barrelsofdata.examples.beam.streaming.sensor.function.ExtractTimestamp; +import com.barrelsofdata.examples.beam.streaming.sensor.function.ParseEvents; +import com.barrelsofdata.examples.beam.streaming.sensor.function.FailedEventBuilder; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.WithFailures; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * PTransform that converts key, value received from kafka into {@link RawEvent} object + * and attaches the timestamp from event {@link com.barrelsofdata.examples.beam.streaming.sensor.model.Data} + */ +public class ParseWithTimestamp extends PTransform>, PCollection> { + + private final JsonMapper jsonMapper = new JsonMapper(); + + @Override + public PCollection expand(PCollection> input) { + WithFailures.Result, FailedEvent> parsedWithExceptions = input.apply(Steps.PARSE_EVENT.getStep(), MapElements + .via(ParseEvents.of(jsonMapper)) + .exceptionsInto(TypeDescriptor.of(FailedEvent.class)) + .exceptionsVia(FailedEventBuilder.of(Steps.PARSE_EVENT.getStep()))); + + PCollection parsed = parsedWithExceptions.output(); + + // Probably send the below to a dead letter kafka topic + // PCollection failed = parsedWithExceptions.failures(); + + return parsed.apply(Steps.ATTACH_TIMESTAMP.getStep(), + WithTimestamps.of(ExtractTimestamp.of())); + } +} diff --git a/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilder.java b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilder.java new file mode 100755 index 0000000..647449e --- /dev/null +++ b/streaming/src/main/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilder.java @@ -0,0 +1,15 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.util; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Builds pipeline options from java program arguments + */ +public class PipelineOptionsBuilder { + private PipelineOptionsBuilder() {} + + public static T from(String[] args, Class cls) { + return PipelineOptionsFactory.fromArgs(args).withValidation().as(cls); + } +} diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/AverageTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/AverageTest.java new file mode 100755 index 0000000..fd45d6d --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/AverageTest.java @@ -0,0 +1,73 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator; +import org.joda.time.Instant; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +class AverageTest { + + @ParameterizedTest + @CsvSource(value = { + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|20|5.75", + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|20|1.0" + }, delimiter = '|') + void testThatComputedAveragesAreCorrect(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count, Double expectedAverage) { + ArrayList rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count); + int half = rawEvents.size() / 2; + ArrayList> workerSplits = new ArrayList<>(rawEvents.stream() + .collect(Collectors.partitioningBy( + s -> rawEvents.indexOf(s) > half)) + .values().stream().map(ArrayList::new).toList()); + + Average average = new Average(); + ArrayList merged = average.mergeAccumulators(workerSplits); + ComputedEvent computed = average.extractOutput(merged); + + assertEquals(id, computed.id(), "Id in ComputedEvent is wrong"); + assertEquals(new Instant(startTimestamp).getMillis() + (timestampStepSeconds * 1000 * --count), computed.ts(), "Timestamp of ComputedEvent is wrong"); + assertEquals(expectedAverage, computed.value(), "Average computed is incorrect"); + } + + @ParameterizedTest + @CsvSource(value = { + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|20", + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|25" + }, delimiter = '|') + void testThatAccumulatorMergeIsWorking(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count) { + ArrayList rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count); + int half = rawEvents.size() / 2; + ArrayList> workerSplits = new ArrayList<>(rawEvents.stream() + .collect(Collectors.partitioningBy( + s -> rawEvents.indexOf(s) > half)) + .values().stream().map(ArrayList::new).toList()); + + Average average = new Average(); + ArrayList merged = average.mergeAccumulators(workerSplits); + assertEquals(count, merged.size(), "Merged accumulator size does not match with input list"); + assertTrue(rawEvents.containsAll(merged) && merged.containsAll(rawEvents), "Merged accumulator missing elements from input list"); + } + + @ParameterizedTest + @CsvSource(value = { + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|0", + "testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|25" + }, delimiter = '|') + void testThatElementsAreAddedToAccumulators(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count) { + ArrayList rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count); + RawEvent eventToAdd = RawEventsGenerator.generateOne(); + + Average average = new Average(); + ArrayList merged = average.addInput(rawEvents, eventToAdd); + assertEquals(count + 1, merged.size(), "Element was not added to accumulator"); + assertTrue(merged.contains(eventToAdd), "Element was not found in the accumulator"); + } + +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestampTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestampTest.java new file mode 100755 index 0000000..5abde82 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ExtractTimestampTest.java @@ -0,0 +1,22 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.Data; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.joda.time.Instant; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class ExtractTimestampTest { + @Test + void testTimestampExtraction() { + Long ts = 1698489173051L; + Data testData = new Data(ts, 50.0); + RawEvent testRawEvent = new RawEvent("testUser1", testData); + + ExtractTimestamp extractTimestamp = ExtractTimestamp.of(); + Instant extractedTime = extractTimestamp.apply(testRawEvent); + + assertEquals(ts, extractedTime.getMillis()); + } +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilderTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilderTest.java new file mode 100755 index 0000000..d2a0d79 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/FailedEventBuilderTest.java @@ -0,0 +1,40 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent; +import org.apache.beam.sdk.transforms.WithFailures; +import org.apache.beam.sdk.values.KV; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class FailedEventBuilderTest { + @Test + void testPipelineException() { + String step = "Test step"; + String exceptionMessage = "Test exception"; + + KV testInput = KV.of("testUser1", """ + {"ts":1698489173051,speed":50}"""); + WithFailures.ExceptionElement> testExceptionElement = new WithFailures.ExceptionElement<>() { + @Override + public KV element() { + return testInput; + } + @Override + public @UnknownKeyFor @NonNull @Initialized Exception exception() { + return new RuntimeException(exceptionMessage); + } + }; + + FailedEventBuilder> failedEventBuilder = FailedEventBuilder.of(step); + FailedEvent failedEvent = failedEventBuilder.apply(testExceptionElement); + + assertEquals(testInput.toString(), failedEvent.event()); + assertEquals(step, failedEvent.step()); + assertEquals(exceptionMessage, failedEvent.exception().getMessage()); + } + +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEventsTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEventsTest.java new file mode 100755 index 0000000..70b56f0 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/ParseEventsTest.java @@ -0,0 +1,38 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.Data; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.apache.beam.sdk.values.KV; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class ParseEventsTest { + + @Test + void testValidEvent() { + String id = "testId1"; + Long ts = 1698489173051L; + Double value = 50.0; + KV testInput = KV.of(id, """ + {"ts":%s,"value":%s}""".formatted(ts, value)); + ParseEvents parseEvents = ParseEvents.of(new JsonMapper()); + RawEvent parsed = parseEvents.apply(testInput); + Data parsedData = parsed.data(); + + assertEquals(id, parsed.id()); + assertEquals(ts, parsedData.ts()); + assertEquals(value, parsedData.value()); + } + + @Test + void testInvalidEvent() { + KV testInput = KV.of("testUser1", """ + {"ts":1698489173051,value":50}"""); + ParseEvents parseEvents = ParseEvents.of(new JsonMapper()); + + assertThrows(RuntimeException.class, () -> parseEvents.apply(testInput)); + } + +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/RawExtractKeysTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/RawExtractKeysTest.java new file mode 100755 index 0000000..62ee03e --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/function/RawExtractKeysTest.java @@ -0,0 +1,21 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.function; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.Data; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class RawExtractKeysTest { + @Test + void testValid() { + String id = "testId1"; + Data testData = new Data(1698489173051L, 10.0); + RawEvent testRawEvent = new RawEvent(id, testData); + + ExtractKeys extractKeys = ExtractKeys.of(); + String extracted = extractKeys.apply(testRawEvent); + + assertEquals(id, extracted); + } +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTargetTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTargetTest.java new file mode 100755 index 0000000..3f1396d --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/target/JDBCTargetTest.java @@ -0,0 +1,71 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.target; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.sql.*; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class JDBCTargetTest { + + private final static String driver = "org.h2.Driver"; + private final static String jdbcUrl = "jdbc:h2:mem:testDb"; + private final static String table = "testTable"; + private final static String username = ""; + private final static String password = ""; + private static Connection con; + private static Statement statement; + + @BeforeAll + static void initDb() throws SQLException { + con = DriverManager.getConnection(jdbcUrl, username, password); + statement = con.createStatement(); + statement.executeUpdate(""" + CREATE TABLE %s (id VARCHAR(255), ts TIMESTAMP, computed DOUBLE)""".formatted(table)); + } + + @AfterAll + static void teardown() throws SQLException { + con.close(); + } + + @Test + void testExpectedDataWasWrittenToTable() throws SQLException { + String id1 = "testId1"; + String id2 = "testId2"; + Long ts = Instant.now().toEpochMilli(); + Double value = 10.0; + List events = List.of( + new ComputedEvent(id1, ts, value), + new ComputedEvent(id2, ts, value)); + + JDBCTarget jdbcTarget = new JDBCTarget(driver, jdbcUrl, table, username, password); + + TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + pipeline + .apply(Create.of(events)) + .setCoder(SerializableCoder.of(ComputedEvent.class)) + .apply(jdbcTarget); + + pipeline.run() + .waitUntilFinish(); + + ResultSet results = statement.executeQuery("SELECT * FROM %s".formatted(table)); + List dbResults = new ArrayList<>(); + while(results.next()) + dbResults.add(new ComputedEvent(results.getString("id"), results.getTimestamp("ts").getTime(), results.getDouble("computed"))); + + assertTrue(dbResults.containsAll(events)); + } + +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/RawEventsGenerator.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/RawEventsGenerator.java new file mode 100755 index 0000000..d3e53d5 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/RawEventsGenerator.java @@ -0,0 +1,27 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.testutils; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.Data; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.stream.IntStream; + +public class RawEventsGenerator { + private RawEventsGenerator() {} + + public static RawEvent generateOne() { + return generate("testId", "2023-03-01T10:00:00.000Z", 0L, 10.0, 0.0, 1).get(0); + } + + public static ArrayList generate(String id, String timestampStart, Long timestampStepsSeconds, Double valueStart, Double valueSteps, int count) { + Instant ts = new Instant(timestampStart); + + return new ArrayList<>(IntStream.range(0, count).boxed() + .map(i -> { + Data data = new Data(ts.getMillis() + (timestampStepsSeconds * 1000 * i), valueStart + (valueSteps * i)); + return new RawEvent(id, data); + }) + .toList()); + } +} diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java new file mode 100755 index 0000000..e2e9049 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java @@ -0,0 +1,82 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.testutils.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.util.Properties; + +public class EmbeddedKafka { + private static final int NUMBER_OF_BROKERS = 1; + private final EmbeddedKafkaBroker embeddedKafkaBroker; + private final Producer kafkaProducer; + + public static EmbeddedKafka withDefaults() { + return new EmbeddedKafka(NUMBER_OF_BROKERS); + } + + public EmbeddedKafka(int numBrokers) { + validate(numBrokers); + embeddedKafkaBroker = new EmbeddedKafkaBroker(numBrokers); + embeddedKafkaBroker.brokerProperty("log.dir", "build/embedded-kafka/logs"); + kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString()); + } + + public void start() { + embeddedKafkaBroker.afterPropertiesSet(); + } + + public void addTopics(String... topics) { + embeddedKafkaBroker.addTopics(topics); + } + + public void send(String topic, String key, String message) { + kafkaProducer.send(topic, key, message); + } + + public void sendFile(String topic, String filepath, String delimiter) throws FileNotFoundException { + BufferedReader br = new BufferedReader(new FileReader(filepath)); + br.lines().forEach(line -> { + String[] pairs = line.split(delimiter); + assert pairs.length > 1; + send(topic, pairs[0], pairs[1]); + }); + } + + public void stop() { + embeddedKafkaBroker.destroy(); + } + + public String brokers() { + return embeddedKafkaBroker.getBrokersAsString(); + } + + private void validate(int numBrokers) { + if(numBrokers < 1) + throw new RuntimeException("Number of brokers should be atleast 1"); + } + + private static class Producer { + + private final KafkaProducer kafkaProducer; + + public Producer(String bootstrapServers) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + kafkaProducer = new KafkaProducer<>(properties); + } + + public void send(String topic, String key, String value) { + var record = new ProducerRecord<>(topic, key, value); + kafkaProducer.send(record); + kafkaProducer.flush(); + } + } +} diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverageIT.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverageIT.java new file mode 100755 index 0000000..a1a5122 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverageIT.java @@ -0,0 +1,142 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.transform; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.DateTime; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.junit.jupiter.api.Test; + +import java.util.List; + +class ComputeAverageIT { + + @Test + void lateEventShouldFireCorrectedAverage() { + Long windowDurationSeconds = 180L; // 3 minutes sliding window + Long windowFrequencySeconds = 60L; // Slides every minute + Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes + + DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z"); + Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute + Double valueStart = 1.0; // Starts from 1.0 + Double valueSteps = 1.0; // Every subsequent event value increments by 1.0 + int eventsInTest = 15; // We will simulate for 5 minutes thus 5 * 3 events + + String id = "testId1"; + List> testEvents = RawEventsGenerator.generate( + "testId1", startTime.toString( + DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds, + valueStart, valueSteps, eventsInTest) + .stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts()))) + .toList(); + + TestStream simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class)) + .addElements(testEvents.get(0), testEvents.get(1), testEvents.get(2)) // Sending events for first 1 minute i.e, event at 9:00:00, 9:00:20, 9:00:40 + // Windows are end exclusive, so 9:01:00 element would not be part of first window + .addElements(testEvents.get(3), testEvents.get(4), testEvents.get(5)) // Sending events for 9:01:00 to 9:01:40 + .addElements(testEvents.get(6), testEvents.get(7), testEvents.get(8)) // Sending events for 9:02 to 9:02:40 + .advanceWatermarkTo(startTime.plusMinutes(3).toInstant()) // Advance to 9:03:00 - Emits 2 incomplete windows and 1 complete window at this point + .addElements(testEvents.get(9), testEvents.get(11)) // Sending events for 9:03 to 9:03:40, skipping the 9:03:20 event + .advanceWatermarkTo(startTime.plusMinutes(4).toInstant()) // Advance to 9:04:00 - Emits 4th window but we missed one event + .addElements(testEvents.get(12), testEvents.get(13), testEvents.get(14)) // Sending events for 9:04 to 9:04:40 + .advanceWatermarkTo(startTime.plusMinutes(6).toInstant()) // Advance to 9:06:00 - Closes late arrival period for window 4, so the average will not be corrected + // But the missed event is also part of windows 5 and 6 as we are using sliding windows + .addElements(testEvents.get(10)) // Late event arrives at 9:06:00, still within late arrival period for windows 5 and 6, with an event timestamp of 9:03:20 + .advanceWatermarkToInfinity(); // All windows would be emitted at this point + + List expectedEvents = List.of( + new ComputedEvent(id, testEvents.get(2).getValue().data().ts(), 2.0), // 6 / 3 - Incomplete window ending at the first minute 9:01, with events from 8:58 - 9:01 + new ComputedEvent(id, testEvents.get(5).getValue().data().ts(), 3.5), // 21 / 6 - Incomplete window, with events from 8:59 - 9:02 + new ComputedEvent(id, testEvents.get(8).getValue().data().ts(), 5.0), // 45 / 9 - Complete window, with events from 9:00 - 9:03 + new ComputedEvent(id, testEvents.get(11).getValue().data().ts(), 7.625), // 61 / 8 - Window, with events from 9:01 - 9:04 but with 1 event missing + + new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 88 / 8 - Window, with events from 9:02 - 9:05 but with 1 event missing + new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 99 / 9 - Complete window, with events from 9:02 - 9:05 + new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.8), // 64 / 5 - Window, with events from 9:03 - 9:06 but with 1 event missing - Note we stopped sending data after 9:40, so there are only 5 elements in the window + new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.5), // 75 / 6 - Complete window, with events from 9:03 - 9:06 + + new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 14.0)); // 42 / 3 - Complete window, with events from 9:04 - 9:07 + + ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds); + TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + PCollection computed = pipeline + .apply("Create input stream", simulatedStream) + .apply(average); + + // Checking if all the expected events are present in the PCollection + PAssert.that(computed) + .containsInAnyOrder(expectedEvents); + pipeline.run(); + } + + @Test + void testAveragesAreComputedForAllIds() { + Long windowDurationSeconds = 180L; // 3 minutes sliding window + Long windowFrequencySeconds = 60L; // Slides every minute + Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes + + DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z"); + Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute + Double valueStart = 1.0; // Starts from 1.0 + Double valueStepsUser1 = 1.0; // Every subsequent event value increments by 1.0 + Double valueStepsUser2 = 0.0; // Every subsequent event value will be same + int eventsInTest = 9; // We will simulate for 3 minutes thus 3 * 3 events + + String idUser1 = "testId1"; + String idUser2 = "testId2"; + List> testEventsUser1 = RawEventsGenerator.generate( + idUser1, startTime.toString( + DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds, + valueStart, valueStepsUser1, eventsInTest) + .stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts()))) + .toList(); + List> testEventsUser2 = RawEventsGenerator.generate( + idUser2, startTime.toString( + DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds, + valueStart, valueStepsUser2, eventsInTest) + .stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts()))) + .toList(); + + TestStream simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class)) + .addElements(testEventsUser1.get(0), testEventsUser1.get(1), testEventsUser1.get(2)) + .addElements(testEventsUser1.get(3), testEventsUser1.get(4), testEventsUser1.get(5)) + .addElements(testEventsUser1.get(6), testEventsUser1.get(7), testEventsUser1.get(8)) + .addElements(testEventsUser2.get(0), testEventsUser2.get(1), testEventsUser2.get(2)) + .addElements(testEventsUser2.get(3), testEventsUser2.get(4), testEventsUser2.get(5)) + .addElements(testEventsUser2.get(6), testEventsUser2.get(7), testEventsUser2.get(8)) + .advanceWatermarkToInfinity(); // All windows would be emitted at this point - 4 incomplete windows and 1 complete window at this point + + List expectedEvents = List.of( + new ComputedEvent(idUser1, testEventsUser1.get(2).getValue().data().ts(), 2.0), + new ComputedEvent(idUser1, testEventsUser1.get(5).getValue().data().ts(), 3.5), + new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 5.0), + new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 6.5), + new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 8.0), + new ComputedEvent(idUser2, testEventsUser2.get(2).getValue().data().ts(), 1.0), + new ComputedEvent(idUser2, testEventsUser2.get(5).getValue().data().ts(), 1.0), + new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0), + new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0), + new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0)); + + ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds); + TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + PCollection computed = pipeline + .apply("Create input stream", simulatedStream) + .apply(average); + + // Checking if all the expected events are present in the PCollection + PAssert.that(computed) + .containsInAnyOrder(expectedEvents); + pipeline.run(); + } +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestampIT.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestampIT.java new file mode 100755 index 0000000..9276872 --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ParseWithTimestampIT.java @@ -0,0 +1,42 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.transform; + +import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; +import org.junit.jupiter.api.Test; + +class ParseWithTimestampIT { + + @Test + void testThatDataIsParsedAndTimestampAttached() { + Long ts = 1698489173051L; + KV inputEvent = KV.of("testId1", """ + {"ts":%s,"value":10.0}""".formatted(ts)); + + ParseWithTimestamp parseWithTimestamp = new ParseWithTimestamp(); + + TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + PCollection parsed = pipeline + .apply(Create.of(inputEvent)) + .apply(parseWithTimestamp) + .apply(ParDo.of(new BeamTimestampGrabber())); + + PAssert.that(parsed).containsInAnyOrder(ts); + pipeline.run(); + } + + private static class BeamTimestampGrabber extends DoFn { + @ProcessElement + public void process(ProcessContext context, @Timestamp Instant timestamp) { + context.output(timestamp.getMillis()); + } + } + +} \ No newline at end of file diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilderTest.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilderTest.java new file mode 100755 index 0000000..6ea9c5b --- /dev/null +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/util/PipelineOptionsBuilderTest.java @@ -0,0 +1,42 @@ +package com.barrelsofdata.examples.beam.streaming.sensor.util; + +import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import static org.junit.jupiter.api.Assertions.*; + +class PipelineOptionsBuilderTest { + + @ParameterizedTest + @CsvSource(value = { + "DirectRunner|localhost:9092,localhost:9093|test_topic|test_cg_1|true|org.h2.driver|jdbc:h2:mem:testdb|testTable|user|pass", + "DirectRunner|127.0.0.1|test_topic|test_cg_2|false|org.h2.driver|jdbc:h2:mem:testdb|testTable|user|pass" + }, delimiter = '|') + void testOptionsParsing(String runner, String kafkaBrokers, String kafkaTopic, String kafkaConsumerGroupId, Boolean resetToEarliest, String sqlDriver, String jdbcUrl, String table, String username, String password) { + String[] args = { + "--runner=%s".formatted(runner), + "--kafkaBrokers=%s".formatted(kafkaBrokers), + "--kafkaTopic=%s".formatted(kafkaTopic), + "--kafkaConsumerGroupId=%s".formatted(kafkaConsumerGroupId), + "--sqlDriver=%s".formatted(sqlDriver), + "--jdbcUrl=%s".formatted(jdbcUrl), + "--table=%s".formatted(table), + "--username=%s".formatted(username), + "--password=%s".formatted(password), + resetToEarliest ? "--resetToEarliest" : "" + }; + SensorPipelineOptions options = PipelineOptionsBuilder.from(args, SensorPipelineOptions.class); + + assertEquals(runner, options.getRunner().getSimpleName()); + assertEquals(kafkaBrokers, options.getKafkaBrokers()); + assertEquals(kafkaTopic, options.getKafkaTopic()); + assertEquals(kafkaConsumerGroupId, options.getKafkaConsumerGroupId()); + assertEquals(sqlDriver, options.getSqlDriver()); + assertEquals(jdbcUrl, options.getJdbcUrl()); + assertEquals(table, options.getTable()); + assertEquals(username, options.getUsername()); + assertEquals(password, options.getPassword()); + } + +} \ No newline at end of file