Skip to content

Commit

Permalink
chore: files for mlflow and db
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-abuke committed Jun 28, 2024
1 parent 8dc7d88 commit 8e97082
Show file tree
Hide file tree
Showing 22 changed files with 1,681 additions and 245 deletions.
23 changes: 18 additions & 5 deletions airflow/dags/fetch_stock_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,25 @@ def fetch_and_store_stock_data():
for index, row in stocks_df.iterrows():
ticker = row['symbol']
try:
print(f"Fetching data for {ticker}")
data: pd.DataFrame = yf.download(ticker, start='2023-01-01', end=datetime.today().strftime('%Y-%m-%d'))
data['symbol'] = ticker
data.reset_index(inplace=True)
data.index.name = 'id'
data.rename(columns={'Date': 'date', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Adj Close': 'adj_close', 'Volume': 'volume'}, inplace=True)
data.to_sql('stock_data', engine, if_exists='append')
if not data.empty:
data['symbol'] = ticker
data.reset_index(inplace=True)
data.rename(columns={
'Date': 'date',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Adj Close': 'adj_close',
'Volume': 'volume'
}, inplace=True)
# Ensure we don't overwrite by specifying the index as False
data.to_sql('stock_data', engine, if_exists='append', index=False)
print(f"Successfully stored data for {ticker}")
else:
print(f"No data found for {ticker}")
except Exception as e:
print(f"Error fetching data for {ticker}: {e}")

Expand Down
2 changes: 1 addition & 1 deletion airflow/logs/scheduler/latest
117 changes: 117 additions & 0 deletions alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# A generic, single database configuration.

[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =

# max length of characters to apply to the "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

sqlalchemy.url = postgresql+psycopg2://trading_db_av2v_user:210M6MA9QKEEgVdiasnUdMQDBNN417oy@dpg-cpqojbqj1k6c73bkqq3g-a.oregon-postgres.render.com/trading_db_av2v



[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
1 change: 1 addition & 0 deletions alembic/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Generic single-database configuration.
26 changes: 8 additions & 18 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,52 @@

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
from backend.models import Base # Import your Base
target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
url=url, target_metadata=target_metadata, literal_binds=True
)

with context.begin_transaction():
context.run_migrations()


def run_migrations_online() -> None:
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
Expand Down
26 changes: 26 additions & 0 deletions alembic/script.py.mako
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
${upgrades if upgrades else "pass"}


def downgrade() -> None:
${downgrades if downgrades else "pass"}
59 changes: 49 additions & 10 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ def create_scene(scene: schemas.SceneCreate, db: Session = Depends(get_db)):
scene_parameters = {
'scene_id': db_scene.id,
'period': db_scene.period,
'initial_cash': 500,
'indicator_name': db_scene.indicator.name,
'indicator': db_scene.indicator.symbol,
'initial_cash': 500,\
'stock_name': db_scene.stock.name,
'ticker': db_scene.stock.symbol,
'start_date': db_scene.start_date.strftime('%Y-%m-%d'),
Expand All @@ -114,7 +112,7 @@ def create_scene(scene: schemas.SceneCreate, db: Session = Depends(get_db)):
existing_scene = db.query(models.Scene).filter(
models.Scene.start_date == scene.start_date,
models.Scene.end_date == scene.end_date,
models.Scene.indicator_id == scene.indicator_id
models.Scene.stock_id == scene.stock_id
).first()
return existing_scene

Expand All @@ -129,7 +127,7 @@ def delete_scene(scene_id: int, db: Session = Depends(get_db)):

@app.get('/scenes/{scene_id}', response_model=schemas.Scene)
def read_scene(scene_id: int, db: Session = Depends(get_db)):
db_scene = db.query(models.Scene).options(joinedload(models.Scene.indicator), joinedload(models.Scene.stock)).filter(models.Scene.id == scene_id).first()
db_scene = db.query(models.Scene).options(joinedload(models.Scene.stock)).filter(models.Scene.id == scene_id).first()
if db_scene is None:
raise HTTPException(status_code=404, detail="Scene not found")
return db_scene
Expand All @@ -142,18 +140,33 @@ def read_scenes(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
logger.info(f"Backtest Result: {backtest.__dict__}")
return scenes

@app.post('/backtests/{scene_id}', response_model=List[schemas.BacktestResult])
def perform_backtest(scene_id: int, db: Session = Depends(get_db)):
@app.get('/best_indicator/{scene_id}', response_model=schemas.BacktestResult)
def get_best_indicator(scene_id: int, db: Session = Depends(get_db)):
db_scene = db.query(models.Scene).options(joinedload(models.Scene.backtests).joinedload(models.BacktestResult.indicator)).filter(models.Scene.id == scene_id).first()
if db_scene is None:
raise HTTPException(status_code=404, detail="Scene not found")

best_backtest = select_best_indicator([backtest.__dict__ for backtest in db_scene.backtests])

if best_backtest is None:
raise HTTPException(status_code=404, detail="No backtests found for the scene")

return best_backtest

@app.post('/backtests/{scene_id}/{indicator_id}', response_model=List[schemas.BacktestResult])
def perform_backtest(scene_id: int, indicator_id: int, db: Session = Depends(get_db)):
db_scene = db.query(models.Scene).filter(models.Scene.id == scene_id).first()
if db_scene is None:
raise HTTPException(status_code=404, detail="Scene not found")

print("The Db Scene: ", db_scene.__dict__)
db_indicator = db.query(models.Indicator).filter(models.Indicator.id == indicator_id).first()
config = {
'initial_cash': 500,
'start_date': db_scene.start_date.strftime('%Y-%m-%d'),
'end_date': db_scene.end_date.strftime('%Y-%m-%d'),
'ticker': db_scene.stock.symbol,
'indicator': db_scene.indicator.symbol
'indicator': db_indicator.symbol
}

logger.info(f"Config: {config}")
Expand All @@ -164,7 +177,7 @@ def perform_backtest(scene_id: int, db: Session = Depends(get_db)):

# Save metrics to database
backtest_results = []
db_backtest_result = models.BacktestResult(scene_id=scene_id, **metrics)
db_backtest_result = models.BacktestResult(scene_id=scene_id, **metrics, indicator_id=indicator_id)
db.add(db_backtest_result)
db.commit()
db.refresh(db_backtest_result)
Expand Down Expand Up @@ -250,4 +263,30 @@ def consume_scene_parameters():
logger.info("Backtest results sent to Kafka")
except Exception as e:
logger.error(f"Error in consumer loop: {e}")
time.sleep(5) # Sleep for a while before retrying
time.sleep(5) # Sleep for a while before retrying

def normalize(value, min_value, max_value):
return (value - min_value) / (max_value - min_value)

def calculate_score(backtest):
# Normalize metrics (assuming higher return and Sharpe ratio are better, lower max drawdown is better)
normalized_return = normalize(backtest['percentage_return'], -1, 1)
# normalized_max_drawdown = normalize(-backtest['max_drawdown'], -1, 0) # Negate since lower is better
normalized_sharpe_ratio = normalize(backtest['sharpe_ratio'], 0, 3)

score = (normalized_return * 0.4) + (normalized_sharpe_ratio * 0.3)
# Combine normalized metrics into a single score (weights can be adjusted)
# score = (normalized_return * 0.4) + (normalized_max_drawdown * 0.3) + (normalized_sharpe_ratio * 0.3)
return score

def select_best_indicator(backtests):
best_backtest = None
best_score = float('-inf')

for backtest in backtests:
score = calculate_score(backtest)
if score > best_score:
best_score = score
best_backtest = backtest

return best_backtest
8 changes: 4 additions & 4 deletions backend/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@ class Scene(Base):
start_date = Column(Date, nullable=False)
end_date = Column(Date, nullable=False)
stock_id = Column(Integer, ForeignKey('stocks.id'))
indicator_id = Column(Integer, ForeignKey('indicators.id'))
backtests = relationship('BacktestResult', back_populates='scene')
stock = relationship('Stock')
indicator = relationship('Indicator')

__table_args__ = (UniqueConstraint('start_date', 'end_date', 'indicator_id', 'stock_id', name='_scene_uc'),)
__table_args__ = (UniqueConstraint('start_date', 'end_date', 'stock_id', name='_scene_uc'),)

class BacktestResult(Base):
__tablename__ = 'backtest_results'
# __table_args__ = (UniqueConstraint('scene_id', 'indicator_id', 'stock_id', name='_backtest_uc'),)
id = Column(Integer, primary_key=True, index=True)
scene_id = Column(Integer, ForeignKey('scenes.id'))
initial_cash = Column(Float, nullable=False)
final_value = Column(Float, nullable=False)
percentage_return = Column(Float)
Expand All @@ -53,7 +50,10 @@ class BacktestResult(Base):
losing_trades = Column(Integer)
sharpe_ratio = Column(Float)
created_at = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'))
scene_id = Column(Integer, ForeignKey('scenes.id'))
indicator_id = Column(Integer, ForeignKey('indicators.id'))
scene = relationship('Scene', back_populates='backtests')
indicator = relationship('Indicator')

class StockData(Base):
__tablename__ = "stock_data"
Expand Down
Loading

0 comments on commit 8e97082

Please sign in to comment.