You've already forked apache-beam-examples
Update versions and switch to java 21
This commit is contained in:
@ -5,16 +5,18 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileReader;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
public class EmbeddedKafka {
|
||||
private static final int NUMBER_OF_BROKERS = 1;
|
||||
private static final int NUMBER_OF_BROKERS = 2;
|
||||
private final EmbeddedKafkaBroker embeddedKafkaBroker;
|
||||
private final Producer kafkaProducer;
|
||||
private Producer kafkaProducer;
|
||||
|
||||
public static EmbeddedKafka withDefaults() {
|
||||
return new EmbeddedKafka(NUMBER_OF_BROKERS);
|
||||
@ -22,9 +24,8 @@ public class EmbeddedKafka {
|
||||
|
||||
public EmbeddedKafka(int numBrokers) {
|
||||
validate(numBrokers);
|
||||
embeddedKafkaBroker = new EmbeddedKafkaBroker(numBrokers);
|
||||
embeddedKafkaBroker.brokerProperty("log.dir", "build/embedded-kafka/logs");
|
||||
kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString());
|
||||
embeddedKafkaBroker = new EmbeddedKafkaKraftBroker(numBrokers, numBrokers);
|
||||
embeddedKafkaBroker.brokerProperties(Map.of("log.dir", "build/embedded-kafka/logs"));
|
||||
}
|
||||
|
||||
public void start() {
|
||||
@ -36,7 +37,7 @@ public class EmbeddedKafka {
|
||||
}
|
||||
|
||||
public void send(String topic, String key, String message) {
|
||||
kafkaProducer.send(topic, key, message);
|
||||
getKafkaProducer().send(topic, key, message);
|
||||
}
|
||||
|
||||
public void sendFile(String topic, String filepath, String delimiter) throws FileNotFoundException {
|
||||
@ -56,6 +57,12 @@ public class EmbeddedKafka {
|
||||
return embeddedKafkaBroker.getBrokersAsString();
|
||||
}
|
||||
|
||||
private Producer getKafkaProducer() {
|
||||
if(kafkaProducer == null)
|
||||
kafkaProducer = new Producer(embeddedKafkaBroker.getBrokersAsString());
|
||||
return kafkaProducer;
|
||||
}
|
||||
|
||||
private void validate(int numBrokers) {
|
||||
if(numBrokers < 1)
|
||||
throw new RuntimeException("Number of brokers should be atleast 1");
|
||||
|
Reference in New Issue
Block a user