Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated pom.xml and TaskOneTests.java #34

Open
wants to merge 4 commits into
base: flow
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
spring:
kafka:
topic: transactions
bootstrap-servers: localhost:9092 # Ensure this matches your Docker Kafka setup
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: midas-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
listener:
missing-topics-fatal: false # Prevents app crash if topic is missing
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
48 changes: 48 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,48 @@
<java.version>17</java.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version> <!-- Ensure this is correct -->
</dependency>

</dependencies>

<build>
Expand All @@ -25,6 +67,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version> <!-- You can also try the latest version -->
</plugin>
</plugins>
</build>

Expand Down
52 changes: 52 additions & 0 deletions src/main/java/com/jpmc/midascore/kafka/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.jpmc.midascore.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "midas-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
24 changes: 24 additions & 0 deletions src/main/java/com/jpmc/midascore/kafka/KafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.jpmc.midascore.kafka;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

private final String topic;
private final KafkaTemplate<String, String> kafkaTemplate;

public KafkaProducer(
@Value("${spring.kafka.topic}") String topic, // Ensure this property exists in application.yml
KafkaTemplate<String, String> kafkaTemplate
) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}

public void send(String message) {
kafkaTemplate.send(topic, message);
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/jpmc/midascore/kafka/TransactionListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.jpmc.midascore.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.jpmc.midascore.foundation.Transaction;

@Service
public class TransactionListener {

@KafkaListener(topics = "${spring.kafka.topic}", groupId = "midas-group")
public void listen(ConsumerRecord<String, Transaction> record) {
Transaction transaction = record.value();
System.out.println("Received transaction: " + transaction);
// No processing yet, just verifying receipt
}
}
22 changes: 0 additions & 22 deletions src/test/java/com/jpmc/midascore/KafkaProducer.java

This file was deleted.

2 changes: 2 additions & 0 deletions src/test/java/com/jpmc/midascore/TaskFiveTests.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.jpmc.midascore;

import com.jpmc.midascore.foundation.Balance;
import com.jpmc.midascore.kafka.KafkaProducer;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/jpmc/midascore/TaskFourTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

import com.jpmc.midascore.kafka.KafkaProducer;

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/com/jpmc/midascore/TaskOneTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;

@SpringBootTest
class TaskOneTests {
static final Logger logger = LoggerFactory.getLogger(TaskOneTests.class);

@Configuration
static class Config {
Config() {
System.out.println("Config");
}
}

@Test
void task_one_verifier() throws InterruptedException {
Thread.sleep(2000);
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/jpmc/midascore/TaskThreeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

import com.jpmc.midascore.kafka.KafkaProducer;

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
Expand Down
16 changes: 11 additions & 5 deletions src/test/java/com/jpmc/midascore/TaskTwoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

@SpringBootTest
import com.jpmc.midascore.kafka.KafkaProducer;

@SpringBootTest(classes = MidasCoreApplication.class) // Ensure Spring Boot loads the main application
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
class TaskTwoTests {
Expand All @@ -22,20 +24,24 @@ class TaskTwoTests {

@Test
void task_two_verifier() throws InterruptedException {
if (fileLoader == null || kafkaProducer == null) {
throw new IllegalStateException("KafkaProducer or FileLoader is not initialized");
}

String[] transactionLines = fileLoader.loadStrings("/test_data/poiuytrewq.uiop");
for (String transactionLine : transactionLines) {
kafkaProducer.send(transactionLine);
}

Thread.sleep(2000);
logger.info("----------------------------------------------------------");
logger.info("----------------------------------------------------------");
logger.info("----------------------------------------------------------");
logger.info("use your debugger to watch for incoming transactions");
logger.info("kill this test once you find the answer");
while (true) {

int attempts = 5; // Prevent infinite loop (optional, remove if necessary)
while (attempts-- > 0) {
Thread.sleep(20000);
logger.info("...");
}
}

}