diff --git a/application.yml b/application.yml
index e69de29..c2dca9f 100644
--- a/application.yml
+++ b/application.yml
@@ -0,0 +1,35 @@
+spring:
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ consumer:
+ group-id: midas-core-group
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ properties:
+ spring.json.trusted.packages: "*"
+
+ datasource:
+ url: jdbc:h2:mem:midasdb # ✅ In-memory H2 database
+ driver-class-name: org.h2.Driver
+ username: sa
+ password:
+
+ jpa:
+ database-platform: org.hibernate.dialect.H2Dialect
+ hibernate:
+ ddl-auto: update # ✅ Automatically updates schema
+
+ h2:
+ console:
+ enabled: true # ✅ Enables H2 web console
+
+server:
+ port: 33400 # ✅ Set application to run on port 33400
+
+
+general:
+ kafka-topic: transactions
diff --git a/pom.xml b/pom.xml
index d1dedfe..679dc72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,67 @@
17
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+ 3.2.5
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 3.2.5
+
+
+
+
+ com.h2database
+ h2
+ 2.2.224
+ test
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ 3.2.5
+ test
+
+
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ 3.1.4
+ test
+
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 3.1.4
+
+
+
+
+
+
+
+ org.testcontainers
+ kafka
+ 1.19.1
+ test
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.1.2
+
+
@@ -25,6 +86,11 @@
org.springframework.boot
spring-boot-maven-plugin
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.1.2
+
diff --git a/src/main/java/com/jpmc/midascore/MidasCoreApplication.java b/src/main/java/com/jpmc/midascore/MidasCoreApplication.java
index 9222581..639f450 100644
--- a/src/main/java/com/jpmc/midascore/MidasCoreApplication.java
+++ b/src/main/java/com/jpmc/midascore/MidasCoreApplication.java
@@ -2,6 +2,8 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.web.client.RestTemplate;
@SpringBootApplication
public class MidasCoreApplication {
@@ -10,4 +12,9 @@ public static void main(String[] args) {
SpringApplication.run(MidasCoreApplication.class, args);
}
+ @Bean
+ public RestTemplate restTemplate() {
+ return new RestTemplate();
+ }
+
}
diff --git a/src/main/java/com/jpmc/midascore/controller/BalanceController.java b/src/main/java/com/jpmc/midascore/controller/BalanceController.java
new file mode 100644
index 0000000..e9bd477
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/controller/BalanceController.java
@@ -0,0 +1,24 @@
+package com.jpmc.midascore.controller;
+
+import com.jpmc.midascore.model.Balance;
+import com.jpmc.midascore.entity.UserRecord;
+import com.jpmc.midascore.repository.UserRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Optional;
+
+@RestController
+@RequestMapping
+public class BalanceController {
+
+ @Autowired
+ private UserRepository userRepository;
+
+ @GetMapping("/balance")
+ public Balance getBalance(@RequestParam Long userId) {
+ Optional userOpt = userRepository.findById(userId);
+ float balance = userOpt.map(UserRecord::getBalance).orElse(0.0f);
+ return new Balance(balance);
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java
new file mode 100644
index 0000000..3e9571e
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/kafka/TransactionListener.java
@@ -0,0 +1,55 @@
+package com.jpmc.midascore.kafka;
+
+import com.jpmc.midascore.entity.UserRecord;
+import com.jpmc.midascore.foundation.Transaction;
+import com.jpmc.midascore.model.TransactionRecord;
+import com.jpmc.midascore.repository.TransactionRepository;
+import com.jpmc.midascore.repository.UserRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Optional;
+
+@Service
+public class TransactionListener {
+
+ @Autowired
+ private UserRepository userRepository;
+
+ @Autowired
+ private TransactionRepository transactionRepository;
+
+ @KafkaListener(topics = "${general.kafka-topic}", groupId = "midas-core-group")
+ @Transactional
+ public void listen(Transaction transaction) {
+ Optional senderOpt = Optional.ofNullable(userRepository.findById(transaction.getSenderId()));
+ Optional recipientOpt = Optional.ofNullable(userRepository.findById(transaction.getRecipientId()));
+
+ if (senderOpt.isPresent() && recipientOpt.isPresent()) {
+ UserRecord sender = senderOpt.get();
+ UserRecord recipient = recipientOpt.get();
+
+ if (sender.getBalance() >= transaction.getAmount()) {
+ // Deduct from sender and add to recipient
+ sender.setBalance(sender.getBalance() - transaction.getAmount());
+ recipient.setBalance(recipient.getBalance() + transaction.getAmount());
+
+ // Save updated users
+ userRepository.save(sender);
+ userRepository.save(recipient);
+
+ // Save transaction record
+ TransactionRecord record = new TransactionRecord(sender, recipient, transaction.getAmount(), 0);
+ transactionRepository.save(record);
+
+ System.out.println("✅ Transaction recorded: " + record.getSender() + " Balance: " + transaction.getAmount());
+ } else {
+ System.out.println("❌ Transaction failed: Insufficient balance for sender ID " + sender.getId());
+ }
+ } else {
+ System.out.println("❌ Transaction failed: Invalid sender or recipient ID.");
+ }
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/kafka/TransactionProducer.java b/src/main/java/com/jpmc/midascore/kafka/TransactionProducer.java
new file mode 100644
index 0000000..cf45b58
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/kafka/TransactionProducer.java
@@ -0,0 +1,29 @@
+package com.jpmc.midascore.kafka;
+
+import com.jpmc.midascore.foundation.Transaction;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+public class TransactionProducer {
+ public static void main(String[] args) {
+ String topic = "transactions";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ Producer producer = new KafkaProducer<>(props);
+
+ Transaction transaction = new Transaction(101, 202, 500.75f);
+ ProducerRecord record = new ProducerRecord(topic, "txn-key", transaction);
+
+ producer.send(record);
+ producer.close();
+
+ System.out.println("Transaction sent successfully");
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/model/Balance.java b/src/main/java/com/jpmc/midascore/model/Balance.java
new file mode 100644
index 0000000..0057305
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/model/Balance.java
@@ -0,0 +1,17 @@
+package com.jpmc.midascore.model;
+
+public class Balance {
+ private float balance;
+
+ public Balance(float balance) {
+ this.balance = balance;
+ }
+
+ public float getBalance() {
+ return balance;
+ }
+
+ public void setBalance(float balance) {
+ this.balance = balance;
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/model/Incentive.java b/src/main/java/com/jpmc/midascore/model/Incentive.java
new file mode 100644
index 0000000..e22496d
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/model/Incentive.java
@@ -0,0 +1,13 @@
+package com.jpmc.midascore.model;
+
+public class Incentive {
+ private float amount;
+
+ public float getAmount() {
+ return amount;
+ }
+
+ public void setAmount(float amount) {
+ this.amount = amount;
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/model/TransactionRecord.java b/src/main/java/com/jpmc/midascore/model/TransactionRecord.java
new file mode 100644
index 0000000..d4a94a3
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/model/TransactionRecord.java
@@ -0,0 +1,43 @@
+package com.jpmc.midascore.model;
+
+
+import com.jpmc.midascore.entity.UserRecord;
+import jakarta.persistence.*;
+
+@Entity
+public class TransactionRecord {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ private long id;
+
+ @ManyToOne
+ @JoinColumn(name = "sender_id", nullable = false)
+ private UserRecord sender;
+
+ @ManyToOne
+ @JoinColumn(name = "recipient_id", nullable = false)
+ private UserRecord recipient;
+
+ @Column(nullable = false)
+ private float amount;
+
+ @Column(nullable = false)
+ private float incentive;
+
+ public TransactionRecord() {}
+
+ public TransactionRecord(UserRecord sender, UserRecord recipient, float amount, float incentive) {
+ this.sender = sender;
+ this.recipient = recipient;
+ this.amount = amount;
+ this.incentive = incentive;
+ }
+
+ public float getIncentive() {return incentive;}
+ public void setIncentive(float incentive){this.incentive = incentive;}
+ public long getId() {return id;}
+ public UserRecord getSender() {return sender;}
+ public UserRecord getRecipient() {return recipient;}
+ public float getAmount() {return amount;}
+}
diff --git a/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java b/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java
new file mode 100644
index 0000000..d23b714
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/repository/TransactionRepository.java
@@ -0,0 +1,7 @@
+package com.jpmc.midascore.repository;
+
+import com.jpmc.midascore.model.TransactionRecord;
+import org.springframework.data.jpa.repository.JpaRepository;
+
+public interface TransactionRepository extends JpaRepository {
+}
diff --git a/src/main/java/com/jpmc/midascore/repository/UserRepository.java b/src/main/java/com/jpmc/midascore/repository/UserRepository.java
index 937275b..6e71158 100644
--- a/src/main/java/com/jpmc/midascore/repository/UserRepository.java
+++ b/src/main/java/com/jpmc/midascore/repository/UserRepository.java
@@ -1,8 +1,8 @@
package com.jpmc.midascore.repository;
import com.jpmc.midascore.entity.UserRecord;
-import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.jpa.repository.JpaRepository;
-public interface UserRepository extends CrudRepository {
+public interface UserRepository extends JpaRepository {
UserRecord findById(long id);
}
diff --git a/src/main/java/com/jpmc/midascore/service/IncentiveService.java b/src/main/java/com/jpmc/midascore/service/IncentiveService.java
new file mode 100644
index 0000000..e70822d
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/service/IncentiveService.java
@@ -0,0 +1,21 @@
+package com.jpmc.midascore.service;
+
+import com.jpmc.midascore.foundation.Transaction;
+import com.jpmc.midascore.model.Incentive;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+@Service
+public class IncentiveService {
+
+ private final RestTemplate restTemplate;
+
+ public IncentiveService(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
+ }
+
+ public Incentive getIncentive(Transaction transaction) {
+ String url = "http://localhost:8080/incentive";
+ return restTemplate.postForObject(url, transaction, Incentive.class);
+ }
+}
diff --git a/src/main/java/com/jpmc/midascore/service/TransactionService.java b/src/main/java/com/jpmc/midascore/service/TransactionService.java
new file mode 100644
index 0000000..14a91e8
--- /dev/null
+++ b/src/main/java/com/jpmc/midascore/service/TransactionService.java
@@ -0,0 +1,52 @@
+package com.jpmc.midascore.service;
+
+import com.jpmc.midascore.entity.UserRecord;
+import com.jpmc.midascore.foundation.Transaction;
+import com.jpmc.midascore.model.TransactionRecord;
+import com.jpmc.midascore.repository.TransactionRepository;
+import com.jpmc.midascore.repository.UserRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+@Service
+public class TransactionService {
+
+ private final UserRepository userRepository;
+ private final TransactionRepository transactionRepository;
+ private final IncentiveService incentiveService;
+
+ @Autowired
+ public TransactionService(UserRepository userRepository, TransactionRepository transactionRepository, IncentiveService incentiveService) {
+ this.userRepository = userRepository;
+ this.transactionRepository = transactionRepository;
+ this.incentiveService = incentiveService;
+ }
+
+ @Transactional
+ public void processTransaction(Transaction transaction) {
+ UserRecord sender = userRepository.findById(transaction.getSenderId());
+ UserRecord recipient = userRepository.findById(transaction.getRecipientId());
+
+ if (sender == null || recipient == null || sender.getBalance() < transaction.getAmount()) {
+ return; // Invalid transaction
+ }
+
+ // Deduct amount from sender
+ sender.setBalance(sender.getBalance() - transaction.getAmount());
+
+ // Call Incentive API
+ float incentiveAmount = incentiveService.getIncentive(transaction).getAmount();
+
+ // Add amount + incentive to recipient
+ recipient.setBalance(recipient.getBalance() + transaction.getAmount() + incentiveAmount);
+
+ // Save the transaction with incentive
+ TransactionRecord transactionRecord = new TransactionRecord(sender, recipient, transaction.getAmount(), incentiveAmount);
+ transactionRepository.save(transactionRecord);
+
+ // Update users
+ userRepository.save(sender);
+ userRepository.save(recipient);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/jpmc/midascore/KafkaProducer.java b/src/test/java/com/jpmc/midascore/KafkaProducer.java
index fa22e2e..d2571d7 100644
--- a/src/test/java/com/jpmc/midascore/KafkaProducer.java
+++ b/src/test/java/com/jpmc/midascore/KafkaProducer.java
@@ -1,6 +1,7 @@
package com.jpmc.midascore;
import com.jpmc.midascore.foundation.Transaction;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@@ -8,6 +9,7 @@
@Component
public class KafkaProducer {
private final String topic;
+ @Autowired
private final KafkaTemplate kafkaTemplate;
public KafkaProducer(@Value("${general.kafka-topic}") String topic, KafkaTemplate kafkaTemplate) {
diff --git a/src/test/java/com/jpmc/midascore/TaskOneTests.java b/src/test/java/com/jpmc/midascore/TaskOneTests.java
index 4f5a384..0fba3ab 100644
--- a/src/test/java/com/jpmc/midascore/TaskOneTests.java
+++ b/src/test/java/com/jpmc/midascore/TaskOneTests.java
@@ -4,11 +4,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Configuration;
@SpringBootTest
class TaskOneTests {
static final Logger logger = LoggerFactory.getLogger(TaskOneTests.class);
+ @Configuration
+ static class Config {
+ Config() {
+ System.out.println("Config");
+ }
+ }
+
@Test
void task_one_verifier() throws InterruptedException {
Thread.sleep(2000);