diff --git a/application.yml b/application.yml index e69de29..be7a1d8 100644 --- a/application.yml +++ b/application.yml @@ -0,0 +1,16 @@ +spring: + kafka: + topic: transactions + bootstrap-servers: localhost:9092 # Ensure this matches your Docker Kafka setup + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + consumer: + group-id: midas-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + listener: + missing-topics-fatal: false # Prevents app crash if topic is missing diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..98eae82 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.8' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + + kafka: + image: wurstmeister/kafka + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + depends_on: + - zookeeper diff --git a/pom.xml b/pom.xml index d1dedfe..c8ff23b 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,48 @@ 17 + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.kafka + spring-kafka + + + + com.h2database + h2 + + + + org.springframework.boot + spring-boot-starter-test + + + + org.springframework.kafka + spring-kafka-test + + + + org.testcontainers + kafka + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + + @@ -25,6 +67,12 @@ org.springframework.boot spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + diff --git a/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java b/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java new file mode 100644 index 0000000..c80228a --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java @@ -0,0 +1,52 @@ +package com.jpmc.midascore.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Configuration +public class KafkaConfig { + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "midas-group"); + configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(configProps); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java b/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java new file mode 100644 index 0000000..0e0d9cd --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java @@ -0,0 +1,24 @@ +package com.jpmc.midascore.kafka; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + private final String topic; + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer( + @Value("${spring.kafka.topic}") String topic, // Ensure this property exists in application.yml + KafkaTemplate kafkaTemplate + ) { + this.topic = topic; + this.kafkaTemplate = kafkaTemplate; + } + + public void send(String message) { + kafkaTemplate.send(topic, message); + } +} diff --git a/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java new file mode 100644 index 0000000..6ae9984 --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java @@ -0,0 +1,18 @@ +package com.jpmc.midascore.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import com.jpmc.midascore.foundation.Transaction; + +@Service +public class TransactionListener { + + @KafkaListener(topics = "${spring.kafka.topic}", groupId = "midas-group") + public void listen(ConsumerRecord record) { + Transaction transaction = record.value(); + System.out.println("Received transaction: " + transaction); + // No processing yet, just verifying receipt + } +} diff --git a/src/test/java/com/jpmc/midascore/KafkaProducer.java b/src/test/java/com/jpmc/midascore/KafkaProducer.java deleted file mode 100644 index fa22e2e..0000000 --- a/src/test/java/com/jpmc/midascore/KafkaProducer.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.jpmc.midascore; - -import com.jpmc.midascore.foundation.Transaction; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -public class KafkaProducer { - private final String topic; - private final KafkaTemplate kafkaTemplate; - - public KafkaProducer(@Value("${general.kafka-topic}") String topic, KafkaTemplate kafkaTemplate) { - this.topic = topic; - this.kafkaTemplate = kafkaTemplate; - } - - public void send(String transactionLine) { - String[] transactionData = transactionLine.split(", "); - kafkaTemplate.send(topic, new Transaction(Long.parseLong(transactionData[0]), Long.parseLong(transactionData[1]), Float.parseFloat(transactionData[2]))); - } -} \ No newline at end of file diff --git a/src/test/java/com/jpmc/midascore/TaskFiveTests.java b/src/test/java/com/jpmc/midascore/TaskFiveTests.java index f1b6e65..c38f23e 100644 --- a/src/test/java/com/jpmc/midascore/TaskFiveTests.java +++ b/src/test/java/com/jpmc/midascore/TaskFiveTests.java @@ -1,6 +1,8 @@ package com.jpmc.midascore; import com.jpmc.midascore.foundation.Balance; +import com.jpmc.midascore.kafka.KafkaProducer; + import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/test/java/com/jpmc/midascore/TaskFourTests.java b/src/test/java/com/jpmc/midascore/TaskFourTests.java index 75b941b..d33abcc 100644 --- a/src/test/java/com/jpmc/midascore/TaskFourTests.java +++ b/src/test/java/com/jpmc/midascore/TaskFourTests.java @@ -8,6 +8,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; +import com.jpmc.midascore.kafka.KafkaProducer; + @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) diff --git a/src/test/java/com/jpmc/midascore/TaskOneTests.java b/src/test/java/com/jpmc/midascore/TaskOneTests.java index 4f5a384..0fba3ab 100644 --- a/src/test/java/com/jpmc/midascore/TaskOneTests.java +++ b/src/test/java/com/jpmc/midascore/TaskOneTests.java @@ -4,11 +4,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Configuration; @SpringBootTest class TaskOneTests { static final Logger logger = LoggerFactory.getLogger(TaskOneTests.class); + @Configuration + static class Config { + Config() { + System.out.println("Config"); + } + } + @Test void task_one_verifier() throws InterruptedException { Thread.sleep(2000); diff --git a/src/test/java/com/jpmc/midascore/TaskThreeTests.java b/src/test/java/com/jpmc/midascore/TaskThreeTests.java index 6ef57b6..b0bcede 100644 --- a/src/test/java/com/jpmc/midascore/TaskThreeTests.java +++ b/src/test/java/com/jpmc/midascore/TaskThreeTests.java @@ -8,6 +8,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; +import com.jpmc.midascore.kafka.KafkaProducer; + @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) diff --git a/src/test/java/com/jpmc/midascore/TaskTwoTests.java b/src/test/java/com/jpmc/midascore/TaskTwoTests.java index 30a7a92..d9acb34 100644 --- a/src/test/java/com/jpmc/midascore/TaskTwoTests.java +++ b/src/test/java/com/jpmc/midascore/TaskTwoTests.java @@ -8,7 +8,9 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; -@SpringBootTest +import com.jpmc.midascore.kafka.KafkaProducer; + +@SpringBootTest(classes = MidasCoreApplication.class) // Ensure Spring Boot loads the main application @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) class TaskTwoTests { @@ -22,20 +24,24 @@ class TaskTwoTests { @Test void task_two_verifier() throws InterruptedException { + if (fileLoader == null || kafkaProducer == null) { + throw new IllegalStateException("KafkaProducer or FileLoader is not initialized"); + } + String[] transactionLines = fileLoader.loadStrings("/test_data/poiuytrewq.uiop"); for (String transactionLine : transactionLines) { kafkaProducer.send(transactionLine); } + Thread.sleep(2000); logger.info("----------------------------------------------------------"); - logger.info("----------------------------------------------------------"); - logger.info("----------------------------------------------------------"); logger.info("use your debugger to watch for incoming transactions"); logger.info("kill this test once you find the answer"); - while (true) { + + int attempts = 5; // Prevent infinite loop (optional, remove if necessary) + while (attempts-- > 0) { Thread.sleep(20000); logger.info("..."); } } - }