Skip to content

Commit

Permalink
Merge pull request 10Accademy-InsightStreamInc#18 from 10Accademy-Ins…
Browse files Browse the repository at this point in the history
…ightStreamInc/kafka

Kafka
  • Loading branch information
dev-abuke authored Jun 24, 2024
2 parents d7bacb1 + f0b2a09 commit 49a9643
Show file tree
Hide file tree
Showing 14 changed files with 840 additions and 47 deletions.
2 changes: 1 addition & 1 deletion backend/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy.orm import sessionmaker
import os

DATABASE_URL = os.getenv("PYCOPG_DATABASE_URL")
DATABASE_URL = os.getenv("PYCOPG_DATABASE_URL", "postgresql+psycopg2://trading_db_av2v_user:210M6MA9QKEEgVdiasnUdMQDBNN417oy@dpg-cpqojbqj1k6c73bkqq3g-a.oregon-postgres.render.com/trading_db_av2v")

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Expand Down
97 changes: 91 additions & 6 deletions backend/main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from typing import List
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.exc import IntegrityError
import pandas as pd
import threading

from . import models, schemas, database
from .auth import router as auth_router
from kafka_topic.kafka_config import get_kafka_producer, SCENE_TOPIC, RESULT_TOPIC, get_kafka_consumer

from backend.utils.backtest import run_backtest
from backend.utils.init_data import initialize_data

get_db = database.get_db

consumer = get_kafka_consumer(SCENE_TOPIC)
producer = get_kafka_producer()

models.Base.metadata.create_all(bind=database.engine)

app = FastAPI()

@app.on_event("startup") # TODO update the code with lifespan dependency
def start_kafka_consumer():
threading.Thread(target=consume_scene_parameters).start()

@app.on_event("startup") # TODO update the code with lifespan dependency
def on_startup():
db = next(get_db())
Expand All @@ -24,7 +34,7 @@ def on_startup():

@app.get('/health')
def check_health():
return "API is healthy"
return "API is working and healthy"

@app.post('/indicators/', response_model=schemas.Indicator)
def create_indicator(indicator: schemas.IndicatorCreate, db: Session = Depends(get_db)):
Expand Down Expand Up @@ -54,10 +64,51 @@ def read_stocks(skip: int = 0, limit: int = 20, db: Session = Depends(get_db)):

@app.post('/scenes/', response_model=schemas.Scene)
def create_scene(scene: schemas.SceneCreate, db: Session = Depends(get_db)):
db_scene = models.Scene(**scene.model_dump())
db.add(db_scene)
try:
db_scene = models.Scene(**scene.dict())
db.add(db_scene)
db.commit()
db.refresh(db_scene)
print("Scene created successfully:", db_scene)

# Send scene parameters to Kafka
scene_parameters = {
'period': db_scene.period,
'indicator_name': db_scene.indicator.name,
'indicator_symbol': db_scene.indicator.symbol,
'stock_name': db_scene.stock.name,
'stock_symbol': db_scene.stock.symbol,
'start_date': db_scene.start_date.strftime('%Y-%m-%d'),
'end_date': db_scene.end_date.strftime('%Y-%m-%d')
}
print("Sending scene parameters to Kafka:", scene_parameters)
producer.send(SCENE_TOPIC, scene_parameters)
producer.flush()

return db_scene
except IntegrityError:
db.rollback()
existing_scene = db.query(models.Scene).filter(
models.Scene.start_date == scene.start_date,
models.Scene.end_date == scene.end_date,
models.Scene.indicator_id == scene.indicator_id
).first()
return existing_scene

@app.delete('/scenes/{scene_id}', response_model=schemas.Scene)
def delete_scene(scene_id: int, db: Session = Depends(get_db)):
db_scene = db.query(models.Scene).options(joinedload(models.Scene.indicator), joinedload(models.Scene.stock)).filter(models.Scene.id == scene_id).first()
if db_scene is None:
raise HTTPException(status_code=404, detail="Scene not found")
db.delete(db_scene)
db.commit()
db.refresh(db_scene)
return db_scene

@app.get('/scenes/{scene_id}', response_model=schemas.Scene)
def read_scene(scene_id: int, db: Session = Depends(get_db)):
db_scene = db.query(models.Scene).options(joinedload(models.Scene.indicator), joinedload(models.Scene.stock)).filter(models.Scene.id == scene_id).first()
if db_scene is None:
raise HTTPException(status_code=404, detail="Scene not found")
return db_scene

@app.get('/scenes/', response_model=List[schemas.Scene])
Expand Down Expand Up @@ -103,5 +154,39 @@ def read_backtest_results(skip: int = 0, limit: int = 10, db: Session = Depends(

def fetch_data(start_date, end_date):
# Replace this with actual data fetching logic
df = pd.read_csv('btc_usdt_candlestick.csv', index_col='timestamp', parse_dates=True)
df = pd.read_csv('data/binance_btc_usdt_candlestick.csv', index_col='timestamp', parse_dates=True)
return df.loc[start_date:end_date]

def fetch_existing_backtest(scene_parameters):
# Function to fetch existing backtest results from the database
db = next(get_db())
existing_scene = db.query(models.Scene).filter(
models.Scene.start_date == scene_parameters['start_date'],
models.Scene.end_date == scene_parameters['end_date'],
models.Scene.period == scene_parameters['period'],
# models.Scene.stock.symbol == scene_parameters['stock_symbol'],
# models.Scene.indicator.symbol == scene_parameters['indicator_symbol']
).first()
if existing_scene:
existing_backtests = db.query(models.BacktestResult).filter(
models.BacktestResult.scene_id == existing_scene.id
).all()
return existing_backtests
return None

def consume_scene_parameters():
consumer = get_kafka_consumer(SCENE_TOPIC)
for message in consumer:
scene_parameters = message.value
print(f"Received scene parameters: {scene_parameters}")
existing_results = fetch_existing_backtest(scene_parameters)
if existing_results:
print(f"Using existing backtest results for parameters: {scene_parameters}")
continue
# Run the backtest if no existing results
df = fetch_data(scene_parameters['start_date'], scene_parameters['end_date'])
metrics = run_backtest(scene_parameters, df)
producer = get_kafka_producer()
print(f"Sending backtest results: {metrics}")
producer.send(RESULT_TOPIC, metrics)
producer.flush()
7 changes: 5 additions & 2 deletions backend/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Boolean, Column, Integer, Float, Date, ForeignKey, TIMESTAMP, String, Text, text
from sqlalchemy import Boolean, Column, Integer, Float, Date, ForeignKey, TIMESTAMP, String, Text, text, UniqueConstraint
from sqlalchemy.orm import relationship
from .database import Base

Expand Down Expand Up @@ -31,11 +31,14 @@ class Scene(Base):
period = Column(Integer, nullable=False)
start_date = Column(Date, nullable=False)
end_date = Column(Date, nullable=False)
stock_name = Column(String, ForeignKey('stocks.name'))
stock_id = Column(Integer, ForeignKey('stocks.id'))
indicator_id = Column(Integer, ForeignKey('indicators.id'))
backtests = relationship('BacktestResult', back_populates='scene')
stock = relationship('Stock')
indicator = relationship('Indicator')

__table_args__ = (UniqueConstraint('start_date', 'end_date', 'indicator_id', 'stock_id', name='_scene_uc'),)

class BacktestResult(Base):
__tablename__ = 'backtest_results'
id = Column(Integer, primary_key=True, index=True)
Expand Down
4 changes: 3 additions & 1 deletion backend/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Token(BaseModel):
token_type: str

class TokenData(BaseModel):
email: str | None = None
email: str

class UserBase(BaseModel):
email: EmailStr
Expand Down Expand Up @@ -60,6 +60,7 @@ class SceneBase(BaseModel):
start_date: date
end_date: date
indicator_id: int
stock_id: int

class SceneCreate(SceneBase):
pass
Expand All @@ -68,6 +69,7 @@ class Scene(SceneBase):
id: int
backtests: List['BacktestResult'] = []
indicator: Indicator
stock: Stock

class Config:
orm_mode = True
Expand Down
145 changes: 145 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
version: '3.8'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9001
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1

kafka-schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
hostname: kafka-schema-registry
container_name: kafka-schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
depends_on:
- zoo1
- kafka1


kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.2
hostname: kafka-rest-proxy
container_name: kafka-rest-proxy
ports:
- "8082:8082"
environment:
# KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
KAFKA_REST_HOST_NAME: kafka-rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
depends_on:
- zoo1
- kafka1
- kafka-schema-registry


kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
volumes:
- ./connectors:/etc/kafka-connect/jars/
depends_on:
- zoo1
- kafka1
- kafka-schema-registry
- kafka-rest-proxy
command:
- bash
- -c
- |
confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
/etc/confluent/docker/run
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
KSQL_LISTENERS: http://0.0.0.0:8088/
KSQL_KSQL_SERVICE_ID: ksqldb-server_
depends_on:
- zoo1
- kafka1

postgresql:
hostname: postgresql
container_name: postgresql
extends:
service: postgresql
file: conduktor.yml

conduktor-console:
hostname: conduktor-console
container_name: conduktor-console
extends:
service: conduktor-console
file: conduktor.yml

volumes:
pg_data: {}
conduktor_data: {}
34 changes: 34 additions & 0 deletions kafka_topic/conduktor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3.8'

services:

postgresql:
image: postgres:14
hostname: postgresql
volumes:
- pg_data:/var/lib/postgresql/data
environment:
POSTGRES_DB: "conduktor-console"
POSTGRES_USER: "conduktor"
POSTGRES_PASSWORD: "some_password"
POSTGRES_HOST_AUTH_METHOD: "scram-sha-256"

conduktor-console:
image: conduktor/conduktor-console:1.24.0
ports:
- "8080:8080"
volumes:
- conduktor_data:/var/conduktor
environment:
CDK_DATABASE_URL: "postgresql://conduktor:some_password@postgresql:5432/conduktor-console"
CDK_CLUSTERS_0_ID: "default"
CDK_CLUSTERS_0_NAME: "My Local Kafka Cluster"
CDK_CLUSTERS_0_COLOR: "#0013E7"
CDK_CLUSTERS_0_BOOTSTRAPSERVERS: "PLAINTEXT://kafka1:19092"
CDK_CLUSTERS_0_SCHEMAREGISTRY_URL: "http://kafka-schema-registry:8081"
CDK_CLUSTERS_0_KAFKACONNECTS_0_URL: "http://kafka-connect:8083"
CDK_CLUSTERS_0_KAFKACONNECTS_0_NAME: "full stack kafka connect"

volumes:
pg_data: {}
conduktor_data: {}
10 changes: 10 additions & 0 deletions kafka_topic/consume_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from kafka_config import get_kafka_consumer, RESULT_TOPIC
from kafka import KafkaConsumer
import json

consumer = get_kafka_consumer(RESULT_TOPIC)

for message in consumer:
backtest_result = message.value
print(f"Received backtest result: {backtest_result}")
# Process the backtest result as needed
Loading

0 comments on commit 49a9643

Please sign in to comment.