You've already forked apache-beam-examples
Add sensor pipeline
This commit is contained in:
18
streaming/build.gradle.kts
Executable file
18
streaming/build.gradle.kts
Executable file
@ -0,0 +1,18 @@
|
||||
plugins {
|
||||
id("com.barrelsofdata.project-conventions")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(libs.bundles.beam.java)
|
||||
implementation(libs.beam.jdbc.io)
|
||||
implementation(libs.beam.kafka.io)
|
||||
implementation(libs.jackson.core)
|
||||
implementation(libs.kafka.clients)
|
||||
implementation(libs.postgres.driver)
|
||||
|
||||
testImplementation(libs.jupiter)
|
||||
testImplementation(libs.jupiter.migration.support) // Apache beam still uses jUnit 4
|
||||
testImplementation(libs.h2)
|
||||
testImplementation(libs.hamcrest)
|
||||
testImplementation(libs.spring.kafka.test)
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.pipeline.SensorAnalytics;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.util.PipelineOptionsBuilder;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
|
||||
/**
|
||||
* The main application class that triggers the pipeline run
|
||||
*/
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SensorPipelineOptions pipelineOptions
|
||||
= PipelineOptionsBuilder.from(args, SensorPipelineOptions.class);
|
||||
|
||||
Pipeline pipeline = Pipeline.create(pipelineOptions);
|
||||
SensorAnalytics.from(pipeline, pipelineOptions).run();
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.config;
|
||||
|
||||
import org.apache.beam.sdk.options.Default;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.options.Validation;
|
||||
|
||||
public interface SensorPipelineOptions extends PipelineOptions {
|
||||
|
||||
@Description("Comma separated list of kafka brokers (host:port)")
|
||||
@Validation.Required
|
||||
String getKafkaBrokers();
|
||||
void setKafkaBrokers(String kafkaBrokers);
|
||||
|
||||
@Description("Kafka topic to read from")
|
||||
@Validation.Required
|
||||
String getKafkaTopic();
|
||||
void setKafkaTopic(String kafkaTopic);
|
||||
|
||||
@Description("Kafka consumer group id to read from")
|
||||
@Validation.Required
|
||||
String getKafkaConsumerGroupId();
|
||||
void setKafkaConsumerGroupId(String kafkaConsumerGroupId);
|
||||
|
||||
@Description("Set kafka auto offset reset to earliest (default: latest)")
|
||||
@Default.Boolean(false)
|
||||
boolean isResetToEarliest();
|
||||
void setResetToEarliest(boolean resetToEarliest);
|
||||
|
||||
@Description("Kafka max read duration in minutes (Only used for testing)")
|
||||
Integer getKafkaMaxReadDurationMinutes();
|
||||
void setKafkaMaxReadDurationMinutes(Integer kafkaMaxReadDurationMinutes);
|
||||
|
||||
@Description("Sql driver to be used for jdbc")
|
||||
@Validation.Required
|
||||
String getSqlDriver();
|
||||
void setSqlDriver(String sqlDriver);
|
||||
|
||||
@Description("Target jdbc url to connect to")
|
||||
@Validation.Required
|
||||
String getJdbcUrl();
|
||||
void setJdbcUrl(String jdbcUrl);
|
||||
|
||||
@Description("Target table to write to")
|
||||
@Validation.Required
|
||||
String getTable();
|
||||
void setTable(String table);
|
||||
|
||||
@Description("Username to access the jdbc data target")
|
||||
@Validation.Required
|
||||
String getUsername();
|
||||
void setUsername(String username);
|
||||
|
||||
@Description("Password of the provided user")
|
||||
@Validation.Required
|
||||
String getPassword();
|
||||
void setPassword(String table);
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.config;
|
||||
|
||||
public enum Steps {
|
||||
SOURCE("Source"),
|
||||
READ_KAFKA("Read kafka"),
|
||||
PARSE("Parse"),
|
||||
PARSE_EVENT("Parse events"),
|
||||
ATTACH_TIMESTAMP("Attach event timestamps"),
|
||||
PROCESS_DATA("Process data"),
|
||||
EXTRACT_KEY("Extract key"),
|
||||
ADD_WINDOWS("Add windows"),
|
||||
COMPUTE_AVERAGE("Compute average"),
|
||||
REMOVE_KEY("Remove key"),
|
||||
TARGET("Target"),
|
||||
JDBC_WRITE("Write to JDBC");
|
||||
|
||||
private final String step;
|
||||
|
||||
private Steps(String step) {
|
||||
this.step = step;
|
||||
}
|
||||
|
||||
public String getStep() {
|
||||
return step;
|
||||
}
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.apache.beam.sdk.transforms.Combine;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* Combiner function that emits a {@link ComputedEvent} by computing averages of values
|
||||
* from a group of {@link RawEvent}
|
||||
*/
|
||||
public class Average extends Combine.CombineFn<RawEvent, ArrayList<RawEvent>, ComputedEvent> {
|
||||
@Override
|
||||
public ArrayList<RawEvent> createAccumulator() {
|
||||
// Initializes an accumulator at worker level
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<RawEvent> addInput(ArrayList<RawEvent> accumulator, RawEvent input) {
|
||||
// Adds data into accumulator, at worker level
|
||||
accumulator.add(input);
|
||||
return accumulator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<RawEvent> mergeAccumulators(Iterable<ArrayList<RawEvent>> accumulators) {
|
||||
// Merging accumulators from all workers into one
|
||||
List<RawEvent> flattened = StreamSupport.stream(accumulators.spliterator(), true).flatMap(ArrayList::stream).toList();
|
||||
return new ArrayList<>(flattened);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComputedEvent extractOutput(ArrayList<RawEvent> accumulator) {
|
||||
// Aggregating and emitting one result
|
||||
|
||||
// Get id and timestamp of last event to use as timestamp of the new event
|
||||
// Getting id here make it easy for us to throwaway the grouping keys later
|
||||
Collections.sort(accumulator);
|
||||
RawEvent lastRawEventInWindow = accumulator.get(accumulator.size() - 1);
|
||||
String id = lastRawEventInWindow.id();
|
||||
Long timestamp = lastRawEventInWindow.data().ts();
|
||||
|
||||
Double average = accumulator.stream()
|
||||
.mapToDouble(ev -> ev.data().value())
|
||||
.average()
|
||||
.orElse(-1);
|
||||
|
||||
return new ComputedEvent(id, timestamp, average);
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.apache.beam.sdk.transforms.SimpleFunction;
|
||||
|
||||
/**
|
||||
* Extracts id as keys from {@link RawEvent}
|
||||
*/
|
||||
public class ExtractKeys extends SimpleFunction<RawEvent, String> {
|
||||
|
||||
@Override
|
||||
public String apply(RawEvent input) {
|
||||
return input.id();
|
||||
}
|
||||
|
||||
public static ExtractKeys of() {
|
||||
return new ExtractKeys();
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.apache.beam.sdk.transforms.SimpleFunction;
|
||||
import org.joda.time.Instant;
|
||||
|
||||
/**
|
||||
* Extracts timestamp of {@link RawEvent} through the {@link com.barrelsofdata.examples.beam.streaming.sensor.model.Data} object
|
||||
*/
|
||||
public class ExtractTimestamp extends SimpleFunction<RawEvent, Instant> {
|
||||
private ExtractTimestamp(){}
|
||||
|
||||
@Override
|
||||
public Instant apply(RawEvent input) {
|
||||
return Instant.ofEpochMilli(input.data().ts());
|
||||
}
|
||||
|
||||
public static ExtractTimestamp of() {
|
||||
return new ExtractTimestamp();
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent;
|
||||
import org.apache.beam.sdk.transforms.SimpleFunction;
|
||||
import org.apache.beam.sdk.transforms.WithFailures;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Builder that builds the {@link FailedEvent}
|
||||
*/
|
||||
public class FailedEventBuilder<T> extends SimpleFunction<WithFailures.ExceptionElement<T>, FailedEvent> {
|
||||
|
||||
private final String step;
|
||||
|
||||
private FailedEventBuilder(String step) {
|
||||
this.step = step;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FailedEvent apply(WithFailures.ExceptionElement<T> input) {
|
||||
if(input != null)
|
||||
return new FailedEvent(step, Objects.toString(input.element(), null), input.exception());
|
||||
return new FailedEvent(step, null, null);
|
||||
}
|
||||
|
||||
public static <T> FailedEventBuilder<T> of(String step) {
|
||||
return new FailedEventBuilder<>(step);
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.Data;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import org.apache.beam.sdk.transforms.SimpleFunction;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Parses kafka KV into {@link RawEvent}
|
||||
* Kafka key is used as id
|
||||
* Kafka value is used as {@link Data}
|
||||
* Throws {@link RuntimeException} on failed parsing
|
||||
*/
|
||||
public class ParseEvents extends SimpleFunction<KV<String, String>, RawEvent> {
|
||||
|
||||
private JsonMapper jsonMapper;
|
||||
|
||||
private ParseEvents(){}
|
||||
|
||||
private ParseEvents(JsonMapper jsonMapper) {
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RawEvent apply(KV<String, String> input) {
|
||||
try {
|
||||
String userId = Objects.requireNonNull(input).getKey();
|
||||
String eventDataJson = input.getValue();
|
||||
|
||||
Data data = jsonMapper.readValue(eventDataJson, Data.class);
|
||||
return new RawEvent(userId, data);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static ParseEvents of(JsonMapper jsonMapper) {
|
||||
return new ParseEvents(jsonMapper);
|
||||
}
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public record ComputedEvent(String id, Long ts, Double value) implements Serializable {}
|
@ -0,0 +1,7 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public record Data(@JsonProperty(required = true) Long ts, Double value) implements Serializable {}
|
@ -0,0 +1,5 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public record FailedEvent(String step, String event, Throwable exception) implements Serializable {}
|
@ -0,0 +1,10 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.model;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public record RawEvent(String id, Data data) implements Comparable<RawEvent>, Serializable {
|
||||
@Override
|
||||
public int compareTo(RawEvent rawEvent) {
|
||||
return (int) (this.data().ts() - rawEvent.data().ts());
|
||||
}
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.pipeline;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.target.JDBCTarget;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.transform.ComputeAverage;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.source.KafkaSource;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.transform.ParseWithTimestamp;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PDone;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
* This class builds the sensor pipeline by applying various transforms
|
||||
*/
|
||||
public class SensorAnalytics {
|
||||
|
||||
private static final Long WINDOW_DURATION_SEC = 180L; // 3 minutes
|
||||
private static final Long WINDOW_FREQUENCY_SEC = 60L; // Every 1 minute
|
||||
private static final Long ALLOWED_LATENESS_SEC = 120L; // 2 minutes
|
||||
|
||||
private SensorAnalytics() {};
|
||||
|
||||
/**
|
||||
* Builds the pipeline by consuming the passed pipeline options
|
||||
* @param pipeline Pipeline object to apply the transforms to
|
||||
* @param options Configuration to be used for the pipeline
|
||||
*/
|
||||
public static Pipeline from(Pipeline pipeline, SensorPipelineOptions options) {
|
||||
|
||||
KafkaSource kafkaSource = new KafkaSource(
|
||||
options.getKafkaBrokers(),
|
||||
options.getKafkaTopic(),
|
||||
options.getKafkaConsumerGroupId(),
|
||||
options.isResetToEarliest());
|
||||
|
||||
// Convert o bounded source - Only for testing
|
||||
Integer kafkaMaxReadDuration = options.getKafkaMaxReadDurationMinutes();
|
||||
if(kafkaMaxReadDuration != null)
|
||||
kafkaSource.withMaxReadTime(Duration.standardMinutes(kafkaMaxReadDuration));
|
||||
|
||||
JDBCTarget jdbcTarget = new JDBCTarget(
|
||||
options.getSqlDriver(),
|
||||
options.getJdbcUrl(),
|
||||
options.getTable(),
|
||||
options.getUsername(),
|
||||
options.getPassword());
|
||||
|
||||
PCollection<KV<String, String>> readFromSource = pipeline.apply(Steps.SOURCE.getStep(),
|
||||
kafkaSource);
|
||||
PCollection<RawEvent> parsed = readFromSource.apply(Steps.PARSE.getStep(),
|
||||
new ParseWithTimestamp());
|
||||
PCollection<ComputedEvent> averaged = parsed.apply(Steps.PROCESS_DATA.getStep(),
|
||||
new ComputeAverage(WINDOW_DURATION_SEC, WINDOW_FREQUENCY_SEC, ALLOWED_LATENESS_SEC));
|
||||
PDone writeToTarget = averaged.apply(Steps.TARGET.getStep(),
|
||||
jdbcTarget);
|
||||
|
||||
return writeToTarget.getPipeline();
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.source;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.io.kafka.KafkaIO;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PInput;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* PTransform that connects to kafka and reads from the given topic
|
||||
*/
|
||||
public class KafkaSource extends PTransform<PInput, PCollection<KV<String, String>>> {
|
||||
|
||||
private final String brokers;
|
||||
private final String topic;
|
||||
private final String consumerGroup;
|
||||
private final Boolean resetToEarliest;
|
||||
|
||||
private Duration maxReadTime;
|
||||
|
||||
public KafkaSource(String brokers, String topic, String consumerGroup, Boolean resetToEarliest) {
|
||||
this.brokers = brokers;
|
||||
this.topic = topic;
|
||||
this.consumerGroup = consumerGroup;
|
||||
this.resetToEarliest = resetToEarliest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the amount of time to keep reading from kafka
|
||||
* Only used in testing to convert unbounded source to bounded source
|
||||
* @param maxReadTime Duration to read data
|
||||
*/
|
||||
public void withMaxReadTime(Duration maxReadTime) {
|
||||
this.maxReadTime = maxReadTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PCollection<KV<String, String>> expand(PInput input) {
|
||||
Pipeline pipeline = input.getPipeline();
|
||||
KafkaIO.Read<String, String> kafkaIo = KafkaIO.<String, String>read()
|
||||
.withBootstrapServers(brokers)
|
||||
.withTopic(topic)
|
||||
.withKeyDeserializer(StringDeserializer.class)
|
||||
.withValueDeserializer(StringDeserializer.class)
|
||||
.withConsumerConfigUpdates(Map.of(
|
||||
"group.id", consumerGroup,
|
||||
"auto.offset.reset", resetToEarliest ? "earliest" : "latest"))
|
||||
.commitOffsetsInFinalize()
|
||||
.withLogAppendTime(); // Do not use timestamp from the event yet
|
||||
|
||||
if(maxReadTime != null)
|
||||
kafkaIo = kafkaIo.withMaxReadTime(maxReadTime);
|
||||
|
||||
return pipeline.apply(Steps.READ_KAFKA.getStep(), kafkaIo.withoutMetadata());
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.target;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import org.apache.beam.sdk.io.jdbc.JdbcIO;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PDone;
|
||||
|
||||
import java.sql.Date;
|
||||
import java.sql.Timestamp;
|
||||
|
||||
/**
|
||||
* PTransform that writes the {@link ComputedEvent} into a target table through a jdbc connection
|
||||
*/
|
||||
public class JDBCTarget extends PTransform<PCollection<ComputedEvent>, PDone> {
|
||||
|
||||
private final String driver;
|
||||
private final String jdbcUrl;
|
||||
private final String table;
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
public JDBCTarget(String driver, String jdbcUrl, String table, String username, String password) {
|
||||
this.driver = driver;
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
this.table = table;
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PDone expand(PCollection<ComputedEvent> input) {
|
||||
return input.apply(Steps.JDBC_WRITE.getStep(), JdbcIO.<ComputedEvent>write()
|
||||
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
|
||||
.create(driver, jdbcUrl)
|
||||
.withUsername(username)
|
||||
.withPassword(password))
|
||||
.withStatement("INSERT INTO %s VALUES(?, ?, ?)".formatted(table)) // Use Merge if you want to avoid duplicates
|
||||
.withPreparedStatementSetter((JdbcIO.PreparedStatementSetter<ComputedEvent>) (element, query) -> {
|
||||
query.setString(1, element.id());
|
||||
query.setTimestamp(2, new Timestamp(element.ts()));
|
||||
query.setDouble(3, element.value());
|
||||
}));
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.transform;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.function.Average;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.function.ExtractKeys;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.apache.beam.sdk.transforms.Combine;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.transforms.Values;
|
||||
import org.apache.beam.sdk.transforms.WithKeys;
|
||||
import org.apache.beam.sdk.transforms.windowing.AfterPane;
|
||||
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
|
||||
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
|
||||
import org.apache.beam.sdk.transforms.windowing.Window;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
/**
|
||||
* PTransform that applies sliding windows and computes average from a group of @{@link RawEvent}
|
||||
* id from the event is used for grouping the data
|
||||
* Emits a {@link ComputedEvent} for every window group
|
||||
* id and timestamp for the emitted are fetched from the last event in the group
|
||||
*/
|
||||
public class ComputeAverage extends PTransform<PCollection<RawEvent>, PCollection<ComputedEvent>> {
|
||||
|
||||
private final Long windowDurationSeconds;
|
||||
private final Long windowFrequencySeconds;
|
||||
private final Long allowedLatenessSeconds;
|
||||
private final Window<RawEvent> window;
|
||||
|
||||
public ComputeAverage(Long windowDurationSeconds, Long windowFrequencySeconds, Long allowedLatenessSeconds) {
|
||||
this.windowDurationSeconds = windowDurationSeconds;
|
||||
this.windowFrequencySeconds = windowFrequencySeconds;
|
||||
this.allowedLatenessSeconds = allowedLatenessSeconds;
|
||||
this.window = configureWindow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PCollection<ComputedEvent> expand(PCollection<RawEvent> input) {
|
||||
|
||||
PCollection<RawEvent> windowed = input.apply(Steps.ADD_WINDOWS.getStep(), window);
|
||||
return windowed
|
||||
.apply(Steps.EXTRACT_KEY.getStep(), WithKeys.of(new ExtractKeys()))
|
||||
.apply(Steps.COMPUTE_AVERAGE.getStep(), Combine.perKey(new Average()))
|
||||
.apply(Steps.REMOVE_KEY.getStep(), Values.create());
|
||||
}
|
||||
|
||||
private Window<RawEvent> configureWindow() {
|
||||
return Window.<RawEvent>into(SlidingWindows
|
||||
.of(Duration.standardSeconds(windowDurationSeconds))
|
||||
.every(Duration.standardSeconds(windowFrequencySeconds)))
|
||||
.withAllowedLateness(Duration.standardSeconds(allowedLatenessSeconds))
|
||||
.accumulatingFiredPanes()
|
||||
.triggering(AfterWatermark.pastEndOfWindow()
|
||||
.withLateFirings(AfterPane.elementCountAtLeast(1)));
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.transform;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.function.ExtractTimestamp;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.function.ParseEvents;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.function.FailedEventBuilder;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.Steps;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import org.apache.beam.sdk.transforms.MapElements;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.transforms.WithFailures;
|
||||
import org.apache.beam.sdk.transforms.WithTimestamps;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||
|
||||
/**
|
||||
* PTransform that converts key, value received from kafka into {@link RawEvent} object
|
||||
* and attaches the timestamp from event {@link com.barrelsofdata.examples.beam.streaming.sensor.model.Data}
|
||||
*/
|
||||
public class ParseWithTimestamp extends PTransform<PCollection<KV<String, String>>, PCollection<RawEvent>> {
|
||||
|
||||
private final JsonMapper jsonMapper = new JsonMapper();
|
||||
|
||||
@Override
|
||||
public PCollection<RawEvent> expand(PCollection<KV<String, String>> input) {
|
||||
WithFailures.Result<PCollection<RawEvent>, FailedEvent> parsedWithExceptions = input.apply(Steps.PARSE_EVENT.getStep(), MapElements
|
||||
.via(ParseEvents.of(jsonMapper))
|
||||
.exceptionsInto(TypeDescriptor.of(FailedEvent.class))
|
||||
.exceptionsVia(FailedEventBuilder.of(Steps.PARSE_EVENT.getStep())));
|
||||
|
||||
PCollection<RawEvent> parsed = parsedWithExceptions.output();
|
||||
|
||||
// Probably send the below to a dead letter kafka topic
|
||||
// PCollection<FailedEvent> failed = parsedWithExceptions.failures();
|
||||
|
||||
return parsed.apply(Steps.ATTACH_TIMESTAMP.getStep(),
|
||||
WithTimestamps.of(ExtractTimestamp.of()));
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.util;
|
||||
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
|
||||
/**
|
||||
* Builds pipeline options from java program arguments
|
||||
*/
|
||||
public class PipelineOptionsBuilder<T> {
|
||||
private PipelineOptionsBuilder() {}
|
||||
|
||||
public static <T extends PipelineOptions> T from(String[] args, Class<T> cls) {
|
||||
return PipelineOptionsFactory.fromArgs(args).withValidation().as(cls);
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator;
|
||||
import org.joda.time.Instant;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class AverageTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(value = {
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|20|5.75",
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|20|1.0"
|
||||
}, delimiter = '|')
|
||||
void testThatComputedAveragesAreCorrect(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count, Double expectedAverage) {
|
||||
ArrayList<RawEvent> rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count);
|
||||
int half = rawEvents.size() / 2;
|
||||
ArrayList<ArrayList<RawEvent>> workerSplits = new ArrayList<>(rawEvents.stream()
|
||||
.collect(Collectors.partitioningBy(
|
||||
s -> rawEvents.indexOf(s) > half))
|
||||
.values().stream().map(ArrayList::new).toList());
|
||||
|
||||
Average average = new Average();
|
||||
ArrayList<RawEvent> merged = average.mergeAccumulators(workerSplits);
|
||||
ComputedEvent computed = average.extractOutput(merged);
|
||||
|
||||
assertEquals(id, computed.id(), "Id in ComputedEvent is wrong");
|
||||
assertEquals(new Instant(startTimestamp).getMillis() + (timestampStepSeconds * 1000 * --count), computed.ts(), "Timestamp of ComputedEvent is wrong");
|
||||
assertEquals(expectedAverage, computed.value(), "Average computed is incorrect");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(value = {
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|20",
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|25"
|
||||
}, delimiter = '|')
|
||||
void testThatAccumulatorMergeIsWorking(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count) {
|
||||
ArrayList<RawEvent> rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count);
|
||||
int half = rawEvents.size() / 2;
|
||||
ArrayList<ArrayList<RawEvent>> workerSplits = new ArrayList<>(rawEvents.stream()
|
||||
.collect(Collectors.partitioningBy(
|
||||
s -> rawEvents.indexOf(s) > half))
|
||||
.values().stream().map(ArrayList::new).toList());
|
||||
|
||||
Average average = new Average();
|
||||
ArrayList<RawEvent> merged = average.mergeAccumulators(workerSplits);
|
||||
assertEquals(count, merged.size(), "Merged accumulator size does not match with input list");
|
||||
assertTrue(rawEvents.containsAll(merged) && merged.containsAll(rawEvents), "Merged accumulator missing elements from input list");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(value = {
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.5|0",
|
||||
"testId1|2022-06-01T10:00:00.000Z|10|1.0|0.0|25"
|
||||
}, delimiter = '|')
|
||||
void testThatElementsAreAddedToAccumulators(String id, String startTimestamp, Long timestampStepSeconds, Double startValue, Double valueSteps, int count) {
|
||||
ArrayList<RawEvent> rawEvents = RawEventsGenerator.generate(id, startTimestamp, timestampStepSeconds, startValue, valueSteps, count);
|
||||
RawEvent eventToAdd = RawEventsGenerator.generateOne();
|
||||
|
||||
Average average = new Average();
|
||||
ArrayList<RawEvent> merged = average.addInput(rawEvents, eventToAdd);
|
||||
assertEquals(count + 1, merged.size(), "Element was not added to accumulator");
|
||||
assertTrue(merged.contains(eventToAdd), "Element was not found in the accumulator");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.Data;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.joda.time.Instant;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class ExtractTimestampTest {
|
||||
@Test
|
||||
void testTimestampExtraction() {
|
||||
Long ts = 1698489173051L;
|
||||
Data testData = new Data(ts, 50.0);
|
||||
RawEvent testRawEvent = new RawEvent("testUser1", testData);
|
||||
|
||||
ExtractTimestamp extractTimestamp = ExtractTimestamp.of();
|
||||
Instant extractedTime = extractTimestamp.apply(testRawEvent);
|
||||
|
||||
assertEquals(ts, extractedTime.getMillis());
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.FailedEvent;
|
||||
import org.apache.beam.sdk.transforms.WithFailures;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.checkerframework.checker.initialization.qual.Initialized;
|
||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class FailedEventBuilderTest {
|
||||
@Test
|
||||
void testPipelineException() {
|
||||
String step = "Test step";
|
||||
String exceptionMessage = "Test exception";
|
||||
|
||||
KV<String, String> testInput = KV.of("testUser1", """
|
||||
{"ts":1698489173051,speed":50}""");
|
||||
WithFailures.ExceptionElement<KV<String, String>> testExceptionElement = new WithFailures.ExceptionElement<>() {
|
||||
@Override
|
||||
public KV<String, String> element() {
|
||||
return testInput;
|
||||
}
|
||||
@Override
|
||||
public @UnknownKeyFor @NonNull @Initialized Exception exception() {
|
||||
return new RuntimeException(exceptionMessage);
|
||||
}
|
||||
};
|
||||
|
||||
FailedEventBuilder<KV<String, String>> failedEventBuilder = FailedEventBuilder.of(step);
|
||||
FailedEvent failedEvent = failedEventBuilder.apply(testExceptionElement);
|
||||
|
||||
assertEquals(testInput.toString(), failedEvent.event());
|
||||
assertEquals(step, failedEvent.step());
|
||||
assertEquals(exceptionMessage, failedEvent.exception().getMessage());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.Data;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class ParseEventsTest {
|
||||
|
||||
@Test
|
||||
void testValidEvent() {
|
||||
String id = "testId1";
|
||||
Long ts = 1698489173051L;
|
||||
Double value = 50.0;
|
||||
KV<String, String> testInput = KV.of(id, """
|
||||
{"ts":%s,"value":%s}""".formatted(ts, value));
|
||||
ParseEvents parseEvents = ParseEvents.of(new JsonMapper());
|
||||
RawEvent parsed = parseEvents.apply(testInput);
|
||||
Data parsedData = parsed.data();
|
||||
|
||||
assertEquals(id, parsed.id());
|
||||
assertEquals(ts, parsedData.ts());
|
||||
assertEquals(value, parsedData.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testInvalidEvent() {
|
||||
KV<String, String> testInput = KV.of("testUser1", """
|
||||
{"ts":1698489173051,value":50}""");
|
||||
ParseEvents parseEvents = ParseEvents.of(new JsonMapper());
|
||||
|
||||
assertThrows(RuntimeException.class, () -> parseEvents.apply(testInput));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.function;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.Data;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class RawExtractKeysTest {
|
||||
@Test
|
||||
void testValid() {
|
||||
String id = "testId1";
|
||||
Data testData = new Data(1698489173051L, 10.0);
|
||||
RawEvent testRawEvent = new RawEvent(id, testData);
|
||||
|
||||
ExtractKeys extractKeys = ExtractKeys.of();
|
||||
String extracted = extractKeys.apply(testRawEvent);
|
||||
|
||||
assertEquals(id, extracted);
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.target;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.sql.*;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class JDBCTargetTest {
|
||||
|
||||
private final static String driver = "org.h2.Driver";
|
||||
private final static String jdbcUrl = "jdbc:h2:mem:testDb";
|
||||
private final static String table = "testTable";
|
||||
private final static String username = "";
|
||||
private final static String password = "";
|
||||
private static Connection con;
|
||||
private static Statement statement;
|
||||
|
||||
@BeforeAll
|
||||
static void initDb() throws SQLException {
|
||||
con = DriverManager.getConnection(jdbcUrl, username, password);
|
||||
statement = con.createStatement();
|
||||
statement.executeUpdate("""
|
||||
CREATE TABLE %s (id VARCHAR(255), ts TIMESTAMP, computed DOUBLE)""".formatted(table));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void teardown() throws SQLException {
|
||||
con.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testExpectedDataWasWrittenToTable() throws SQLException {
|
||||
String id1 = "testId1";
|
||||
String id2 = "testId2";
|
||||
Long ts = Instant.now().toEpochMilli();
|
||||
Double value = 10.0;
|
||||
List<ComputedEvent> events = List.of(
|
||||
new ComputedEvent(id1, ts, value),
|
||||
new ComputedEvent(id2, ts, value));
|
||||
|
||||
JDBCTarget jdbcTarget = new JDBCTarget(driver, jdbcUrl, table, username, password);
|
||||
|
||||
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
||||
|
||||
pipeline
|
||||
.apply(Create.of(events))
|
||||
.setCoder(SerializableCoder.of(ComputedEvent.class))
|
||||
.apply(jdbcTarget);
|
||||
|
||||
pipeline.run()
|
||||
.waitUntilFinish();
|
||||
|
||||
ResultSet results = statement.executeQuery("SELECT * FROM %s".formatted(table));
|
||||
List<ComputedEvent> dbResults = new ArrayList<>();
|
||||
while(results.next())
|
||||
dbResults.add(new ComputedEvent(results.getString("id"), results.getTimestamp("ts").getTime(), results.getDouble("computed")));
|
||||
|
||||
assertTrue(dbResults.containsAll(events));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.testutils;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.Data;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.joda.time.Instant;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class RawEventsGenerator {
|
||||
private RawEventsGenerator() {}
|
||||
|
||||
public static RawEvent generateOne() {
|
||||
return generate("testId", "2023-03-01T10:00:00.000Z", 0L, 10.0, 0.0, 1).get(0);
|
||||
}
|
||||
|
||||
public static ArrayList<RawEvent> generate(String id, String timestampStart, Long timestampStepsSeconds, Double valueStart, Double valueSteps, int count) {
|
||||
Instant ts = new Instant(timestampStart);
|
||||
|
||||
return new ArrayList<>(IntStream.range(0, count).boxed()
|
||||
.map(i -> {
|
||||
Data data = new Data(ts.getMillis() + (timestampStepsSeconds * 1000 * i), valueStart + (valueSteps * i));
|
||||
return new RawEvent(id, data);
|
||||
})
|
||||
.toList());
|
||||
}
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.testutils.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileReader;
|
||||
import java.util.Properties;
|
||||
|
||||
public class EmbeddedKafka {
|
||||
private static final int NUMBER_OF_BROKERS = 1;
|
||||
private final EmbeddedKafkaBroker embeddedKafkaBroker;
|
||||
private final Producer kafkaProducer;
|
||||
|
||||
public static EmbeddedKafka withDefaults() {
|
||||
return new EmbeddedKafka(NUMBER_OF_BROKERS);
|
||||
}
|
||||
|
||||
public EmbeddedKafka(int numBrokers) {
|
||||
validate(numBrokers);
|
||||
embeddedKafkaBroker = new EmbeddedKafkaBroker(numBrokers);
|
||||
embeddedKafkaBroker.brokerProperty("log.dir", "build/embedded-kafka/logs");
|
||||
kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString());
|
||||
}
|
||||
|
||||
public void start() {
|
||||
embeddedKafkaBroker.afterPropertiesSet();
|
||||
}
|
||||
|
||||
public void addTopics(String... topics) {
|
||||
embeddedKafkaBroker.addTopics(topics);
|
||||
}
|
||||
|
||||
public void send(String topic, String key, String message) {
|
||||
kafkaProducer.send(topic, key, message);
|
||||
}
|
||||
|
||||
public void sendFile(String topic, String filepath, String delimiter) throws FileNotFoundException {
|
||||
BufferedReader br = new BufferedReader(new FileReader(filepath));
|
||||
br.lines().forEach(line -> {
|
||||
String[] pairs = line.split(delimiter);
|
||||
assert pairs.length > 1;
|
||||
send(topic, pairs[0], pairs[1]);
|
||||
});
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
embeddedKafkaBroker.destroy();
|
||||
}
|
||||
|
||||
public String brokers() {
|
||||
return embeddedKafkaBroker.getBrokersAsString();
|
||||
}
|
||||
|
||||
private void validate(int numBrokers) {
|
||||
if(numBrokers < 1)
|
||||
throw new RuntimeException("Number of brokers should be atleast 1");
|
||||
}
|
||||
|
||||
private static class Producer {
|
||||
|
||||
private final KafkaProducer<String, String> kafkaProducer;
|
||||
|
||||
public Producer(String bootstrapServers) {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
kafkaProducer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
public void send(String topic, String key, String value) {
|
||||
var record = new ProducerRecord<>(topic, key, value);
|
||||
kafkaProducer.send(record);
|
||||
kafkaProducer.flush();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,142 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.transform;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.testing.TestStream;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.TimestampedValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Instant;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class ComputeAverageIT {
|
||||
|
||||
@Test
|
||||
void lateEventShouldFireCorrectedAverage() {
|
||||
Long windowDurationSeconds = 180L; // 3 minutes sliding window
|
||||
Long windowFrequencySeconds = 60L; // Slides every minute
|
||||
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
|
||||
|
||||
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
|
||||
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
|
||||
Double valueStart = 1.0; // Starts from 1.0
|
||||
Double valueSteps = 1.0; // Every subsequent event value increments by 1.0
|
||||
int eventsInTest = 15; // We will simulate for 5 minutes thus 5 * 3 events
|
||||
|
||||
String id = "testId1";
|
||||
List<TimestampedValue<RawEvent>> testEvents = RawEventsGenerator.generate(
|
||||
"testId1", startTime.toString(
|
||||
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
||||
valueStart, valueSteps, eventsInTest)
|
||||
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
||||
.toList();
|
||||
|
||||
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
|
||||
.addElements(testEvents.get(0), testEvents.get(1), testEvents.get(2)) // Sending events for first 1 minute i.e, event at 9:00:00, 9:00:20, 9:00:40
|
||||
// Windows are end exclusive, so 9:01:00 element would not be part of first window
|
||||
.addElements(testEvents.get(3), testEvents.get(4), testEvents.get(5)) // Sending events for 9:01:00 to 9:01:40
|
||||
.addElements(testEvents.get(6), testEvents.get(7), testEvents.get(8)) // Sending events for 9:02 to 9:02:40
|
||||
.advanceWatermarkTo(startTime.plusMinutes(3).toInstant()) // Advance to 9:03:00 - Emits 2 incomplete windows and 1 complete window at this point
|
||||
.addElements(testEvents.get(9), testEvents.get(11)) // Sending events for 9:03 to 9:03:40, skipping the 9:03:20 event
|
||||
.advanceWatermarkTo(startTime.plusMinutes(4).toInstant()) // Advance to 9:04:00 - Emits 4th window but we missed one event
|
||||
.addElements(testEvents.get(12), testEvents.get(13), testEvents.get(14)) // Sending events for 9:04 to 9:04:40
|
||||
.advanceWatermarkTo(startTime.plusMinutes(6).toInstant()) // Advance to 9:06:00 - Closes late arrival period for window 4, so the average will not be corrected
|
||||
// But the missed event is also part of windows 5 and 6 as we are using sliding windows
|
||||
.addElements(testEvents.get(10)) // Late event arrives at 9:06:00, still within late arrival period for windows 5 and 6, with an event timestamp of 9:03:20
|
||||
.advanceWatermarkToInfinity(); // All windows would be emitted at this point
|
||||
|
||||
List<ComputedEvent> expectedEvents = List.of(
|
||||
new ComputedEvent(id, testEvents.get(2).getValue().data().ts(), 2.0), // 6 / 3 - Incomplete window ending at the first minute 9:01, with events from 8:58 - 9:01
|
||||
new ComputedEvent(id, testEvents.get(5).getValue().data().ts(), 3.5), // 21 / 6 - Incomplete window, with events from 8:59 - 9:02
|
||||
new ComputedEvent(id, testEvents.get(8).getValue().data().ts(), 5.0), // 45 / 9 - Complete window, with events from 9:00 - 9:03
|
||||
new ComputedEvent(id, testEvents.get(11).getValue().data().ts(), 7.625), // 61 / 8 - Window, with events from 9:01 - 9:04 but with 1 event missing
|
||||
|
||||
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 88 / 8 - Window, with events from 9:02 - 9:05 but with 1 event missing
|
||||
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 99 / 9 - Complete window, with events from 9:02 - 9:05
|
||||
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.8), // 64 / 5 - Window, with events from 9:03 - 9:06 but with 1 event missing - Note we stopped sending data after 9:40, so there are only 5 elements in the window
|
||||
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.5), // 75 / 6 - Complete window, with events from 9:03 - 9:06
|
||||
|
||||
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 14.0)); // 42 / 3 - Complete window, with events from 9:04 - 9:07
|
||||
|
||||
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
|
||||
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
||||
|
||||
PCollection<ComputedEvent> computed = pipeline
|
||||
.apply("Create input stream", simulatedStream)
|
||||
.apply(average);
|
||||
|
||||
// Checking if all the expected events are present in the PCollection
|
||||
PAssert.that(computed)
|
||||
.containsInAnyOrder(expectedEvents);
|
||||
pipeline.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAveragesAreComputedForAllIds() {
|
||||
Long windowDurationSeconds = 180L; // 3 minutes sliding window
|
||||
Long windowFrequencySeconds = 60L; // Slides every minute
|
||||
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
|
||||
|
||||
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
|
||||
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
|
||||
Double valueStart = 1.0; // Starts from 1.0
|
||||
Double valueStepsUser1 = 1.0; // Every subsequent event value increments by 1.0
|
||||
Double valueStepsUser2 = 0.0; // Every subsequent event value will be same
|
||||
int eventsInTest = 9; // We will simulate for 3 minutes thus 3 * 3 events
|
||||
|
||||
String idUser1 = "testId1";
|
||||
String idUser2 = "testId2";
|
||||
List<TimestampedValue<RawEvent>> testEventsUser1 = RawEventsGenerator.generate(
|
||||
idUser1, startTime.toString(
|
||||
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
||||
valueStart, valueStepsUser1, eventsInTest)
|
||||
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
||||
.toList();
|
||||
List<TimestampedValue<RawEvent>> testEventsUser2 = RawEventsGenerator.generate(
|
||||
idUser2, startTime.toString(
|
||||
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
||||
valueStart, valueStepsUser2, eventsInTest)
|
||||
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
||||
.toList();
|
||||
|
||||
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
|
||||
.addElements(testEventsUser1.get(0), testEventsUser1.get(1), testEventsUser1.get(2))
|
||||
.addElements(testEventsUser1.get(3), testEventsUser1.get(4), testEventsUser1.get(5))
|
||||
.addElements(testEventsUser1.get(6), testEventsUser1.get(7), testEventsUser1.get(8))
|
||||
.addElements(testEventsUser2.get(0), testEventsUser2.get(1), testEventsUser2.get(2))
|
||||
.addElements(testEventsUser2.get(3), testEventsUser2.get(4), testEventsUser2.get(5))
|
||||
.addElements(testEventsUser2.get(6), testEventsUser2.get(7), testEventsUser2.get(8))
|
||||
.advanceWatermarkToInfinity(); // All windows would be emitted at this point - 4 incomplete windows and 1 complete window at this point
|
||||
|
||||
List<ComputedEvent> expectedEvents = List.of(
|
||||
new ComputedEvent(idUser1, testEventsUser1.get(2).getValue().data().ts(), 2.0),
|
||||
new ComputedEvent(idUser1, testEventsUser1.get(5).getValue().data().ts(), 3.5),
|
||||
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 5.0),
|
||||
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 6.5),
|
||||
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 8.0),
|
||||
new ComputedEvent(idUser2, testEventsUser2.get(2).getValue().data().ts(), 1.0),
|
||||
new ComputedEvent(idUser2, testEventsUser2.get(5).getValue().data().ts(), 1.0),
|
||||
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
|
||||
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
|
||||
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0));
|
||||
|
||||
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
|
||||
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
||||
|
||||
PCollection<ComputedEvent> computed = pipeline
|
||||
.apply("Create input stream", simulatedStream)
|
||||
.apply(average);
|
||||
|
||||
// Checking if all the expected events are present in the PCollection
|
||||
PAssert.that(computed)
|
||||
.containsInAnyOrder(expectedEvents);
|
||||
pipeline.run();
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.transform;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.joda.time.Instant;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class ParseWithTimestampIT {
|
||||
|
||||
@Test
|
||||
void testThatDataIsParsedAndTimestampAttached() {
|
||||
Long ts = 1698489173051L;
|
||||
KV<String, String> inputEvent = KV.of("testId1", """
|
||||
{"ts":%s,"value":10.0}""".formatted(ts));
|
||||
|
||||
ParseWithTimestamp parseWithTimestamp = new ParseWithTimestamp();
|
||||
|
||||
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
||||
|
||||
PCollection<Long> parsed = pipeline
|
||||
.apply(Create.of(inputEvent))
|
||||
.apply(parseWithTimestamp)
|
||||
.apply(ParDo.of(new BeamTimestampGrabber()));
|
||||
|
||||
PAssert.that(parsed).containsInAnyOrder(ts);
|
||||
pipeline.run();
|
||||
}
|
||||
|
||||
private static class BeamTimestampGrabber extends DoFn<RawEvent, Long> {
|
||||
@ProcessElement
|
||||
public void process(ProcessContext context, @Timestamp Instant timestamp) {
|
||||
context.output(timestamp.getMillis());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.barrelsofdata.examples.beam.streaming.sensor.util;
|
||||
|
||||
import com.barrelsofdata.examples.beam.streaming.sensor.config.SensorPipelineOptions;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class PipelineOptionsBuilderTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(value = {
|
||||
"DirectRunner|localhost:9092,localhost:9093|test_topic|test_cg_1|true|org.h2.driver|jdbc:h2:mem:testdb|testTable|user|pass",
|
||||
"DirectRunner|127.0.0.1|test_topic|test_cg_2|false|org.h2.driver|jdbc:h2:mem:testdb|testTable|user|pass"
|
||||
}, delimiter = '|')
|
||||
void testOptionsParsing(String runner, String kafkaBrokers, String kafkaTopic, String kafkaConsumerGroupId, Boolean resetToEarliest, String sqlDriver, String jdbcUrl, String table, String username, String password) {
|
||||
String[] args = {
|
||||
"--runner=%s".formatted(runner),
|
||||
"--kafkaBrokers=%s".formatted(kafkaBrokers),
|
||||
"--kafkaTopic=%s".formatted(kafkaTopic),
|
||||
"--kafkaConsumerGroupId=%s".formatted(kafkaConsumerGroupId),
|
||||
"--sqlDriver=%s".formatted(sqlDriver),
|
||||
"--jdbcUrl=%s".formatted(jdbcUrl),
|
||||
"--table=%s".formatted(table),
|
||||
"--username=%s".formatted(username),
|
||||
"--password=%s".formatted(password),
|
||||
resetToEarliest ? "--resetToEarliest" : ""
|
||||
};
|
||||
SensorPipelineOptions options = PipelineOptionsBuilder.from(args, SensorPipelineOptions.class);
|
||||
|
||||
assertEquals(runner, options.getRunner().getSimpleName());
|
||||
assertEquals(kafkaBrokers, options.getKafkaBrokers());
|
||||
assertEquals(kafkaTopic, options.getKafkaTopic());
|
||||
assertEquals(kafkaConsumerGroupId, options.getKafkaConsumerGroupId());
|
||||
assertEquals(sqlDriver, options.getSqlDriver());
|
||||
assertEquals(jdbcUrl, options.getJdbcUrl());
|
||||
assertEquals(table, options.getTable());
|
||||
assertEquals(username, options.getUsername());
|
||||
assertEquals(password, options.getPassword());
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user