From 363d636c25b54ff074a5a2b6e8527a3998b44198 Mon Sep 17 00:00:00 2001 From: Gaurang Mundhra <146422929+GaurangMundhra@users.noreply.github.com> Date: Sun, 9 Feb 2025 16:17:48 +0000 Subject: [PATCH 1/4] Task 1 completed Added the required dependencies and class in TaskOneTests java file. --- pom.xml | 43 +++++++++++++++++++ .../java/com/jpmc/midascore/TaskOneTests.java | 8 ++++ 2 files changed, 51 insertions(+) diff --git a/pom.xml b/pom.xml index d1dedfe..db111d6 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,49 @@ 17 + + + org.springframework.boot + spring-boot-starter-data-jpa + 3.2.5 + + + + org.springframework.boot + spring-boot-starter-web + 3.2.5 + + + + org.springframework.kafka + spring-kafka + 3.1.4 + + + + com.h2database + h2 + 2.2.224 + + + + org.springframework.boot + spring-boot-starter-test + 3.2.5 + + + + org.springframework.kafka + spring-kafka-test + 3.1.4 + + + + org.testcontainers + kafka + 1.19.1 + + diff --git a/src/test/java/com/jpmc/midascore/TaskOneTests.java b/src/test/java/com/jpmc/midascore/TaskOneTests.java index 4f5a384..0fba3ab 100644 --- a/src/test/java/com/jpmc/midascore/TaskOneTests.java +++ b/src/test/java/com/jpmc/midascore/TaskOneTests.java @@ -4,11 +4,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Configuration; @SpringBootTest class TaskOneTests { static final Logger logger = LoggerFactory.getLogger(TaskOneTests.class); + @Configuration + static class Config { + Config() { + System.out.println("Config"); + } + } + @Test void task_one_verifier() throws InterruptedException { Thread.sleep(2000); From 99e293721a8897e586cb2ee7f3d9831534c63cf4 Mon Sep 17 00:00:00 2001 From: Gaurang Mundhra <146422929+GaurangMundhra@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:08:46 +0000 Subject: [PATCH 2/4] Task2: Changes made for kafka integration in yml files --- application.yml | 16 ++++++++++++++++ docker-compose.yml | 17 +++++++++++++++++ pom.xml | 19 ++++++++++++------- 3 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 docker-compose.yml diff --git a/application.yml b/application.yml index e69de29..be7a1d8 100644 --- a/application.yml +++ b/application.yml @@ -0,0 +1,16 @@ +spring: + kafka: + topic: transactions + bootstrap-servers: localhost:9092 # Ensure this matches your Docker Kafka setup + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + consumer: + group-id: midas-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + listener: + missing-topics-fatal: false # Prevents app crash if topic is missing diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..98eae82 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.8' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + + kafka: + image: wurstmeister/kafka + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + depends_on: + - zookeeper diff --git a/pom.xml b/pom.xml index db111d6..c8ff23b 100644 --- a/pom.xml +++ b/pom.xml @@ -21,43 +21,42 @@ org.springframework.boot spring-boot-starter-data-jpa - 3.2.5 org.springframework.boot spring-boot-starter-web - 3.2.5 org.springframework.kafka spring-kafka - 3.1.4 com.h2database h2 - 2.2.224 org.springframework.boot spring-boot-starter-test - 3.2.5 org.springframework.kafka spring-kafka-test - 3.1.4 org.testcontainers kafka - 1.19.1 + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 @@ -68,6 +67,12 @@ org.springframework.boot spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + From f24640784adf251a3f0f91ee2a3b67b801215135 Mon Sep 17 00:00:00 2001 From: Gaurang Mundhra <146422929+GaurangMundhra@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:12:23 +0000 Subject: [PATCH 3/4] Task2 : Made Kafka folder for TransactionListener,KafkaTemplate, kafka configuration --- .../com/jpmc/midascore/kafka/KafkaConfig.java | 52 +++++++++++++++++++ .../jpmc/midascore/kafka/KafkaProducer.java | 24 +++++++++ .../midascore/kafka/TransactionListener.java | 18 +++++++ .../com/jpmc/midascore/KafkaProducer.java | 22 -------- 4 files changed, 94 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java create mode 100644 src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java create mode 100644 src/main/java/com/jpmc/midascore/kafka/TransactionListener.java delete mode 100644 src/test/java/com/jpmc/midascore/KafkaProducer.java diff --git a/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java b/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java new file mode 100644 index 0000000..c80228a --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java @@ -0,0 +1,52 @@ +package com.jpmc.midascore.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Configuration +public class KafkaConfig { + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "midas-group"); + configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(configProps); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java b/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java new file mode 100644 index 0000000..0e0d9cd --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java @@ -0,0 +1,24 @@ +package com.jpmc.midascore.kafka; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + private final String topic; + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer( + @Value("${spring.kafka.topic}") String topic, // Ensure this property exists in application.yml + KafkaTemplate kafkaTemplate + ) { + this.topic = topic; + this.kafkaTemplate = kafkaTemplate; + } + + public void send(String message) { + kafkaTemplate.send(topic, message); + } +} diff --git a/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java new file mode 100644 index 0000000..6ae9984 --- /dev/null +++ b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java @@ -0,0 +1,18 @@ +package com.jpmc.midascore.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import com.jpmc.midascore.foundation.Transaction; + +@Service +public class TransactionListener { + + @KafkaListener(topics = "${spring.kafka.topic}", groupId = "midas-group") + public void listen(ConsumerRecord record) { + Transaction transaction = record.value(); + System.out.println("Received transaction: " + transaction); + // No processing yet, just verifying receipt + } +} diff --git a/src/test/java/com/jpmc/midascore/KafkaProducer.java b/src/test/java/com/jpmc/midascore/KafkaProducer.java deleted file mode 100644 index fa22e2e..0000000 --- a/src/test/java/com/jpmc/midascore/KafkaProducer.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.jpmc.midascore; - -import com.jpmc.midascore.foundation.Transaction; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -public class KafkaProducer { - private final String topic; - private final KafkaTemplate kafkaTemplate; - - public KafkaProducer(@Value("${general.kafka-topic}") String topic, KafkaTemplate kafkaTemplate) { - this.topic = topic; - this.kafkaTemplate = kafkaTemplate; - } - - public void send(String transactionLine) { - String[] transactionData = transactionLine.split(", "); - kafkaTemplate.send(topic, new Transaction(Long.parseLong(transactionData[0]), Long.parseLong(transactionData[1]), Float.parseFloat(transactionData[2]))); - } -} \ No newline at end of file From 46fde35eb08dcc36a38805aabc12b4611dddf52e Mon Sep 17 00:00:00 2001 From: Gaurang Mundhra <146422929+GaurangMundhra@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:13:24 +0000 Subject: [PATCH 4/4] Task2 : Necessary imports for task2 --- .../java/com/jpmc/midascore/TaskFiveTests.java | 2 ++ .../java/com/jpmc/midascore/TaskFourTests.java | 2 ++ .../java/com/jpmc/midascore/TaskThreeTests.java | 2 ++ .../java/com/jpmc/midascore/TaskTwoTests.java | 16 +++++++++++----- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/jpmc/midascore/TaskFiveTests.java b/src/test/java/com/jpmc/midascore/TaskFiveTests.java index f1b6e65..c38f23e 100644 --- a/src/test/java/com/jpmc/midascore/TaskFiveTests.java +++ b/src/test/java/com/jpmc/midascore/TaskFiveTests.java @@ -1,6 +1,8 @@ package com.jpmc.midascore; import com.jpmc.midascore.foundation.Balance; +import com.jpmc.midascore.kafka.KafkaProducer; + import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/test/java/com/jpmc/midascore/TaskFourTests.java b/src/test/java/com/jpmc/midascore/TaskFourTests.java index 75b941b..d33abcc 100644 --- a/src/test/java/com/jpmc/midascore/TaskFourTests.java +++ b/src/test/java/com/jpmc/midascore/TaskFourTests.java @@ -8,6 +8,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; +import com.jpmc.midascore.kafka.KafkaProducer; + @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) diff --git a/src/test/java/com/jpmc/midascore/TaskThreeTests.java b/src/test/java/com/jpmc/midascore/TaskThreeTests.java index 6ef57b6..b0bcede 100644 --- a/src/test/java/com/jpmc/midascore/TaskThreeTests.java +++ b/src/test/java/com/jpmc/midascore/TaskThreeTests.java @@ -8,6 +8,8 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; +import com.jpmc.midascore.kafka.KafkaProducer; + @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) diff --git a/src/test/java/com/jpmc/midascore/TaskTwoTests.java b/src/test/java/com/jpmc/midascore/TaskTwoTests.java index 30a7a92..d9acb34 100644 --- a/src/test/java/com/jpmc/midascore/TaskTwoTests.java +++ b/src/test/java/com/jpmc/midascore/TaskTwoTests.java @@ -8,7 +8,9 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; -@SpringBootTest +import com.jpmc.midascore.kafka.KafkaProducer; + +@SpringBootTest(classes = MidasCoreApplication.class) // Ensure Spring Boot loads the main application @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) class TaskTwoTests { @@ -22,20 +24,24 @@ class TaskTwoTests { @Test void task_two_verifier() throws InterruptedException { + if (fileLoader == null || kafkaProducer == null) { + throw new IllegalStateException("KafkaProducer or FileLoader is not initialized"); + } + String[] transactionLines = fileLoader.loadStrings("/test_data/poiuytrewq.uiop"); for (String transactionLine : transactionLines) { kafkaProducer.send(transactionLine); } + Thread.sleep(2000); logger.info("----------------------------------------------------------"); - logger.info("----------------------------------------------------------"); - logger.info("----------------------------------------------------------"); logger.info("use your debugger to watch for incoming transactions"); logger.info("kill this test once you find the answer"); - while (true) { + + int attempts = 5; // Prevent infinite loop (optional, remove if necessary) + while (attempts-- > 0) { Thread.sleep(20000); logger.info("..."); } } - }