142 lines
9.8 KiB
Java
Executable File
142 lines
9.8 KiB
Java
Executable File
package com.barrelsofdata.examples.beam.streaming.sensor.transform;
|
|
|
|
import com.barrelsofdata.examples.beam.streaming.sensor.model.ComputedEvent;
|
|
import com.barrelsofdata.examples.beam.streaming.sensor.model.RawEvent;
|
|
import com.barrelsofdata.examples.beam.streaming.sensor.testutils.RawEventsGenerator;
|
|
import org.apache.beam.sdk.coders.SerializableCoder;
|
|
import org.apache.beam.sdk.testing.PAssert;
|
|
import org.apache.beam.sdk.testing.TestPipeline;
|
|
import org.apache.beam.sdk.testing.TestStream;
|
|
import org.apache.beam.sdk.values.PCollection;
|
|
import org.apache.beam.sdk.values.TimestampedValue;
|
|
import org.joda.time.DateTime;
|
|
import org.joda.time.Instant;
|
|
import org.joda.time.format.DateTimeFormat;
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
import java.util.List;
|
|
|
|
class ComputeAverageIT {
|
|
|
|
@Test
|
|
void lateEventShouldFireCorrectedAverage() {
|
|
Long windowDurationSeconds = 180L; // 3 minutes sliding window
|
|
Long windowFrequencySeconds = 60L; // Slides every minute
|
|
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
|
|
|
|
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
|
|
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
|
|
Double valueStart = 1.0; // Starts from 1.0
|
|
Double valueSteps = 1.0; // Every subsequent event value increments by 1.0
|
|
int eventsInTest = 15; // We will simulate for 5 minutes thus 5 * 3 events
|
|
|
|
String id = "testId1";
|
|
List<TimestampedValue<RawEvent>> testEvents = RawEventsGenerator.generate(
|
|
"testId1", startTime.toString(
|
|
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
|
valueStart, valueSteps, eventsInTest)
|
|
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
|
.toList();
|
|
|
|
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
|
|
.addElements(testEvents.get(0), testEvents.get(1), testEvents.get(2)) // Sending events for first 1 minute i.e, event at 9:00:00, 9:00:20, 9:00:40
|
|
// Windows are end exclusive, so 9:01:00 element would not be part of first window
|
|
.addElements(testEvents.get(3), testEvents.get(4), testEvents.get(5)) // Sending events for 9:01:00 to 9:01:40
|
|
.addElements(testEvents.get(6), testEvents.get(7), testEvents.get(8)) // Sending events for 9:02 to 9:02:40
|
|
.advanceWatermarkTo(startTime.plusMinutes(3).toInstant()) // Advance to 9:03:00 - Emits 2 incomplete windows and 1 complete window at this point
|
|
.addElements(testEvents.get(9), testEvents.get(11)) // Sending events for 9:03 to 9:03:40, skipping the 9:03:20 event
|
|
.advanceWatermarkTo(startTime.plusMinutes(4).toInstant()) // Advance to 9:04:00 - Emits 4th window but we missed one event
|
|
.addElements(testEvents.get(12), testEvents.get(13), testEvents.get(14)) // Sending events for 9:04 to 9:04:40
|
|
.advanceWatermarkTo(startTime.plusMinutes(6).toInstant()) // Advance to 9:06:00 - Closes late arrival period for window 4, so the average will not be corrected
|
|
// But the missed event is also part of windows 5 and 6 as we are using sliding windows
|
|
.addElements(testEvents.get(10)) // Late event arrives at 9:06:00, still within late arrival period for windows 5 and 6, with an event timestamp of 9:03:20
|
|
.advanceWatermarkToInfinity(); // All windows would be emitted at this point
|
|
|
|
List<ComputedEvent> expectedEvents = List.of(
|
|
new ComputedEvent(id, testEvents.get(2).getValue().data().ts(), 2.0), // 6 / 3 - Incomplete window ending at the first minute 9:01, with events from 8:58 - 9:01
|
|
new ComputedEvent(id, testEvents.get(5).getValue().data().ts(), 3.5), // 21 / 6 - Incomplete window, with events from 8:59 - 9:02
|
|
new ComputedEvent(id, testEvents.get(8).getValue().data().ts(), 5.0), // 45 / 9 - Complete window, with events from 9:00 - 9:03
|
|
new ComputedEvent(id, testEvents.get(11).getValue().data().ts(), 7.625), // 61 / 8 - Window, with events from 9:01 - 9:04 but with 1 event missing
|
|
|
|
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 88 / 8 - Window, with events from 9:02 - 9:05 but with 1 event missing
|
|
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 11.0), // 99 / 9 - Complete window, with events from 9:02 - 9:05
|
|
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.8), // 64 / 5 - Window, with events from 9:03 - 9:06 but with 1 event missing - Note we stopped sending data after 9:40, so there are only 5 elements in the window
|
|
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 12.5), // 75 / 6 - Complete window, with events from 9:03 - 9:06
|
|
|
|
new ComputedEvent(id, testEvents.get(14).getValue().data().ts(), 14.0)); // 42 / 3 - Complete window, with events from 9:04 - 9:07
|
|
|
|
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
|
|
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
|
|
|
PCollection<ComputedEvent> computed = pipeline
|
|
.apply("Create input stream", simulatedStream)
|
|
.apply(average);
|
|
|
|
// Checking if all the expected events are present in the PCollection
|
|
PAssert.that(computed)
|
|
.containsInAnyOrder(expectedEvents);
|
|
pipeline.run();
|
|
}
|
|
|
|
@Test
|
|
void testAveragesAreComputedForAllIds() {
|
|
Long windowDurationSeconds = 180L; // 3 minutes sliding window
|
|
Long windowFrequencySeconds = 60L; // Slides every minute
|
|
Long windowAllowedLatenessSeconds = 120L; // Allowing late data to arrive upto 2 minutes
|
|
|
|
DateTime startTime = new DateTime("2023-02-01T09:00:00.000Z");
|
|
Long timeStepSeconds = 20L; // Sensor emits an event every 20 sec i.e, 3 events per minute
|
|
Double valueStart = 1.0; // Starts from 1.0
|
|
Double valueStepsUser1 = 1.0; // Every subsequent event value increments by 1.0
|
|
Double valueStepsUser2 = 0.0; // Every subsequent event value will be same
|
|
int eventsInTest = 9; // We will simulate for 3 minutes thus 3 * 3 events
|
|
|
|
String idUser1 = "testId1";
|
|
String idUser2 = "testId2";
|
|
List<TimestampedValue<RawEvent>> testEventsUser1 = RawEventsGenerator.generate(
|
|
idUser1, startTime.toString(
|
|
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
|
valueStart, valueStepsUser1, eventsInTest)
|
|
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
|
.toList();
|
|
List<TimestampedValue<RawEvent>> testEventsUser2 = RawEventsGenerator.generate(
|
|
idUser2, startTime.toString(
|
|
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSZZ")), timeStepSeconds,
|
|
valueStart, valueStepsUser2, eventsInTest)
|
|
.stream().map(ev -> TimestampedValue.of(ev, Instant.ofEpochMilli(ev.data().ts())))
|
|
.toList();
|
|
|
|
TestStream<RawEvent> simulatedStream = TestStream.create(SerializableCoder.of(RawEvent.class))
|
|
.addElements(testEventsUser1.get(0), testEventsUser1.get(1), testEventsUser1.get(2))
|
|
.addElements(testEventsUser1.get(3), testEventsUser1.get(4), testEventsUser1.get(5))
|
|
.addElements(testEventsUser1.get(6), testEventsUser1.get(7), testEventsUser1.get(8))
|
|
.addElements(testEventsUser2.get(0), testEventsUser2.get(1), testEventsUser2.get(2))
|
|
.addElements(testEventsUser2.get(3), testEventsUser2.get(4), testEventsUser2.get(5))
|
|
.addElements(testEventsUser2.get(6), testEventsUser2.get(7), testEventsUser2.get(8))
|
|
.advanceWatermarkToInfinity(); // All windows would be emitted at this point - 4 incomplete windows and 1 complete window at this point
|
|
|
|
List<ComputedEvent> expectedEvents = List.of(
|
|
new ComputedEvent(idUser1, testEventsUser1.get(2).getValue().data().ts(), 2.0),
|
|
new ComputedEvent(idUser1, testEventsUser1.get(5).getValue().data().ts(), 3.5),
|
|
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 5.0),
|
|
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 6.5),
|
|
new ComputedEvent(idUser1, testEventsUser1.get(8).getValue().data().ts(), 8.0),
|
|
new ComputedEvent(idUser2, testEventsUser2.get(2).getValue().data().ts(), 1.0),
|
|
new ComputedEvent(idUser2, testEventsUser2.get(5).getValue().data().ts(), 1.0),
|
|
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
|
|
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0),
|
|
new ComputedEvent(idUser2, testEventsUser2.get(8).getValue().data().ts(), 1.0));
|
|
|
|
ComputeAverage average = new ComputeAverage(windowDurationSeconds, windowFrequencySeconds, windowAllowedLatenessSeconds);
|
|
TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
|
|
|
|
PCollection<ComputedEvent> computed = pipeline
|
|
.apply("Create input stream", simulatedStream)
|
|
.apply(average);
|
|
|
|
// Checking if all the expected events are present in the PCollection
|
|
PAssert.that(computed)
|
|
.containsInAnyOrder(expectedEvents);
|
|
pipeline.run();
|
|
}
|
|
} |