diff --git a/.gitea/workflows/workflow.yaml b/.gitea/workflows/workflow.yaml index a3c55f1..8193f5f 100644 --- a/.gitea/workflows/workflow.yaml +++ b/.gitea/workflows/workflow.yaml @@ -37,7 +37,7 @@ jobs: - name: Set up java uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Run tests id: tests @@ -64,7 +64,7 @@ jobs: - name: Set up java uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Run integration tests id: integration-tests @@ -91,7 +91,7 @@ jobs: - name: Set up java uses: actions/setup-java@v3 with: - java-version: '17' + java-version: '21' distribution: 'temurin' - name: Build id: build diff --git a/README.md b/README.md index 0a0a876..a025c13 100755 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ This project holds all the examples of apache beam that are detailed on my blog at [https://barrelsofdata.com](https://barrelsofdata.com) ## Build instructions -This is a multi-module project and uses version catalogs. The dependencies can we viewed at [/barrelsofdata/apache-beam-examples/src/branch/main/gradle/libs.versions.toml](./gradle/libs.versions.toml) +This is a multi-module project and uses version catalogs. The dependencies can we viewed at [./gradle/libs.versions.toml](./gradle/libs.versions.toml) To build project, execute the following commands from the project root - To clear all compiled classes, build and log directories diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7a9b668..2358cfa 100755 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,13 +1,13 @@ [versions] -apache-beam = "2.51.0" -apache-kafka = "3.3.2" +apache-beam = "2.52.0" +apache-kafka = "3.6.1" h2-database = "2.2.224" hamcrest = "2.2" -jackson = "2.15.3" -java = "17" -junit-jupiter = "5.10.0" -postgresql-driver = "42.6.0" -spring-kafka-test = "3.0.12" +jackson = "2.16.0" +java = "21" +junit-jupiter = "5.10.1" +postgresql-driver = "42.7.1" +spring-kafka-test = "3.1.0" [libraries] beam-core = { module = "org.apache.beam:beam-sdks-java-core", version.ref = "apache-beam" } diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7f93135..d64cd49 100755 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 3fa8f86..1af9e09 100755 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java index e2e9049..211c293 100755 --- a/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java +++ b/streaming/src/test/java/com/barrelsofdata/examples/beam/streaming/sensor/testutils/kafka/EmbeddedKafka.java @@ -5,16 +5,18 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; +import java.util.Map; import java.util.Properties; public class EmbeddedKafka { - private static final int NUMBER_OF_BROKERS = 1; + private static final int NUMBER_OF_BROKERS = 2; private final EmbeddedKafkaBroker embeddedKafkaBroker; - private final Producer kafkaProducer; + private Producer kafkaProducer; public static EmbeddedKafka withDefaults() { return new EmbeddedKafka(NUMBER_OF_BROKERS); @@ -22,9 +24,8 @@ public class EmbeddedKafka { public EmbeddedKafka(int numBrokers) { validate(numBrokers); - embeddedKafkaBroker = new EmbeddedKafkaBroker(numBrokers); - embeddedKafkaBroker.brokerProperty("log.dir", "build/embedded-kafka/logs"); - kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString()); + embeddedKafkaBroker = new EmbeddedKafkaKraftBroker(numBrokers, numBrokers); + embeddedKafkaBroker.brokerProperties(Map.of("log.dir", "build/embedded-kafka/logs")); } public void start() { @@ -36,7 +37,7 @@ public class EmbeddedKafka { } public void send(String topic, String key, String message) { - kafkaProducer.send(topic, key, message); + getKafkaProducer().send(topic, key, message); } public void sendFile(String topic, String filepath, String delimiter) throws FileNotFoundException { @@ -56,6 +57,12 @@ public class EmbeddedKafka { return embeddedKafkaBroker.getBrokersAsString(); } + private Producer getKafkaProducer() { + if(kafkaProducer == null) + kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString()); + return kafkaProducer; + } + private void validate(int numBrokers) { if(numBrokers < 1) throw new RuntimeException("Number of brokers should be atleast 1");