apache-beam-examples/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/transform/ComputeAverageIT.java

142 lines
9.8 KiB
Java
Executable File

package com.barrelsofdata.examples.beam.streaming.sensor.transform;
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.junit.jupiter.api.Test;
import java.util.List;
class ComputeAverageIT {
@Test
void lateEventShouldFireCorrectedAverage() {
Long windowDurationSeconds = 180L; // 3 minutes sliding window
Long windowFrequencySeconds = 60L; // Slides every minute
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
Double valueStart = 1.0; // Starts from 1.0
Double valueSteps = 1.0; // Every subsequent event value increments by 1.0
int eventsInTest = 15; // We will simulate for 5 minutes thus 5 * 3 events
String id = "testId1";
List<TimestampedValue<RawEvent>> testEvents = RawEventsGenerator.generate(
"testId1", startTime.toString(
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
valueStart, valueSteps, eventsInTest)
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
.toList();
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
.addElements(testEvents.get(0), testEvents.get(1), testEvents.get(2)) // Sending events for first 1 minute i.e, event at 9:00:00, 9:00:20, 9:00:40
// Windows are end exclusive, so 9:01:00 element would not be part of first window
.addElements(testEvents.get(3), testEvents.get(4), testEvents.get(5)) // Sending events for 9:01:00 to 9:01:40
.addElements(testEvents.get(6), testEvents.get(7), testEvents.get(8)) // Sending events for 9:02 to 9:02:40
.advanceWatermarkTo(startTime.plusMinutes(3).toInstant()) // Advance to 9:03:00 - Emits 2 incomplete windows and 1 complete window at this point
.addElements(testEvents.get(9), testEvents.get(11)) // Sending events for 9:03 to 9:03:40, skipping the 9:03:20 event
.advanceWatermarkTo(startTime.plusMinutes(4).toInstant()) // Advance to 9:04:00 - Emits 4th window but we missed one event
.addElements(testEvents.get(12), testEvents.get(13), testEvents.get(14)) // Sending events for 9:04 to 9:04:40
.advanceWatermarkTo(startTime.plusMinutes(6).toInstant()) // Advance to 9:06:00 - Closes late arrival period for window 4, so the average will not be corrected
// But the missed event is also part of windows 5 and 6 as we are using sliding windows
.addElements(testEvents.get(10)) // Late event arrives at 9:06:00, still within late arrival period for windows 5 and 6, with an event timestamp of 9:03:20
.advanceWatermarkToInfinity(); // All windows would be emitted at this point
List<ComputedEvent> expectedEvents = List.of(
new ComputedEvent(id, testEvents.get(2).getValue().data().ts(), 2.0), // 6 / 3 - Incomplete window ending at the first minute 9:01, with events from 8:58 - 9:01
new ComputedEvent(id, testEvents.get(5).getValue().data().ts(), 3.5), // 21 / 6 - Incomplete window, with events from 8:59 - 9:02
new ComputedEvent(id, testEvents.get(8).getValue().data().ts(), 5.0), // 45 / 9 - Complete window, with events from 9:00 - 9:03
new ComputedEvent(id, testEvents.get(11).getValue().data().ts(), 7.625), // 61 / 8 - Window, with events from 9:01 - 9:04 but with 1 event missing
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 88 / 8 - Window, with events from 9:02 - 9:05 but with 1 event missing
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 99 / 9 - Complete window, with events from 9:02 - 9:05
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.8), // 64 / 5 - Window, with events from 9:03 - 9:06 but with 1 event missing - Note we stopped sending data after 9:40, so there are only 5 elements in the window
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.5), // 75 / 6 - Complete window, with events from 9:03 - 9:06
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 14.0)); // 42 / 3 - Complete window, with events from 9:04 - 9:07
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<ComputedEvent> computed = pipeline
.apply("Create input stream", simulatedStream)
.apply(average);
// Checking if all the expected events are present in the PCollection
PAssert.that(computed)
.containsInAnyOrder(expectedEvents);
pipeline.run();
}
@Test
void testAveragesAreComputedForAllIds() {
Long windowDurationSeconds = 180L; // 3 minutes sliding window
Long windowFrequencySeconds = 60L; // Slides every minute
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
Double valueStart = 1.0; // Starts from 1.0
Double valueStepsUser1 = 1.0; // Every subsequent event value increments by 1.0
Double valueStepsUser2 = 0.0; // Every subsequent event value will be same
int eventsInTest = 9; // We will simulate for 3 minutes thus 3 * 3 events
String idUser1 = "testId1";
String idUser2 = "testId2";
List<TimestampedValue<RawEvent>> testEventsUser1 = RawEventsGenerator.generate(
idUser1, startTime.toString(
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
valueStart, valueStepsUser1, eventsInTest)
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
.toList();
List<TimestampedValue<RawEvent>> testEventsUser2 = RawEventsGenerator.generate(
idUser2, startTime.toString(
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
valueStart, valueStepsUser2, eventsInTest)
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
.toList();
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
.addElements(testEventsUser1.get(0), testEventsUser1.get(1), testEventsUser1.get(2))
.addElements(testEventsUser1.get(3), testEventsUser1.get(4), testEventsUser1.get(5))
.addElements(testEventsUser1.get(6), testEventsUser1.get(7), testEventsUser1.get(8))
.addElements(testEventsUser2.get(0), testEventsUser2.get(1), testEventsUser2.get(2))
.addElements(testEventsUser2.get(3), testEventsUser2.get(4), testEventsUser2.get(5))
.addElements(testEventsUser2.get(6), testEventsUser2.get(7), testEventsUser2.get(8))
.advanceWatermarkToInfinity(); // All windows would be emitted at this point - 4 incomplete windows and 1 complete window at this point
List<ComputedEvent> expectedEvents = List.of(
new ComputedEvent(idUser1, testEventsUser1.get(2).getValue().data().ts(), 2.0),
new ComputedEvent(idUser1, testEventsUser1.get(5).getValue().data().ts(), 3.5),
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 5.0),
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 6.5),
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 8.0),
new ComputedEvent(idUser2, testEventsUser2.get(2).getValue().data().ts(), 1.0),
new ComputedEvent(idUser2, testEventsUser2.get(5).getValue().data().ts(), 1.0),
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0));
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<ComputedEvent> computed = pipeline
.apply("Create input stream", simulatedStream)
.apply(average);
// Checking if all the expected events are present in the PCollection
PAssert.that(computed)
.containsInAnyOrder(expectedEvents);
pipeline.run();
}
}