-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCrypto_Trade_Backtest.py
233 lines (194 loc) · 9.93 KB
/
Crypto_Trade_Backtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Crypto_Trade_Backtest.py
# Copyright (C) 2024 Adam P Baguley
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License Version 3 as published by
# the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License Version 3 for more details.
# https://www.gnu.org/licenses/.
# Directories for coin data and output
COIN_CSV_DIR = "D:/Historic_prices/hour/"
OUTPUT_DIR = "D:/Process_wallet_data/Backtest/"
import pandas as pd
import os
import random
from itertools import product
from multiprocessing import Pool, cpu_count
import tqdm
# User-configurable parameters
MA_LENGTHS = [5, 10, 15, 20, 30, 40, 50, 60, 70, 80, 90, 100, 240, 960, 1440, 2880]
TAKE_PROFIT_PERCENTAGES = [5, 10, 15, 20, 50, 75, 100, 150] # Fixed TP percentages
PARTIAL_SELL_PERCENTAGES = [10, 20, 50, 75] # Percentages to sell at TP
TRADING_FEES_PERCENT = 0.1 # Adjustable trading fees
SLIPPAGE_PERCENT = 0.05 # Adjustable slippage
CPU_CORE_LIMIT = min(20, cpu_count())
def calculate_ma(df, ma_type, length):
if ma_type == "sma":
return df["close"].rolling(window=length).mean()
elif ma_type == "ema":
return df["close"].ewm(span=length, adjust=False).mean()
def backtest_combination(params):
"""Test a single combination of MAs and TP settings and return results."""
data, short_ma_type, short_ma_length, long_ma_type, long_ma_length, tp_percent, partial_sell = params
data = data.copy()
# Calculate MAs
data[f"short_ma"] = calculate_ma(data, short_ma_type, short_ma_length)
data[f"long_ma"] = calculate_ma(data, long_ma_type, long_ma_length)
# Backtest logic
balance = 1000
position = 0 # BTC holdings
trades = []
entry_price = None # Initialize outside the loop
for i in range(max(short_ma_length, long_ma_length), len(data)):
if pd.notna(data[f"short_ma"].iloc[i]) and pd.notna(data[f"long_ma"].iloc[i]):
if position == 0 and data[f"short_ma"].iloc[i] > data[f"long_ma"].iloc[i] and data[f"short_ma"].iloc[i - 1] <= data[f"long_ma"].iloc[i - 1]:
# Buy signal
entry_price = data["close"].iloc[i] * (1 + SLIPPAGE_PERCENT / 100)
fee = entry_price * TRADING_FEES_PERCENT / 100
position = (balance - fee) / entry_price
balance = 0
trades.append({"action": "buy", "price": entry_price, "fee": fee, "timestamp": data["timestamp"].iloc[i]})
elif position > 0:
# Check for Take Profit
current_price = data["close"].iloc[i]
sell_portion = 0
for percent in PARTIAL_SELL_PERCENTAGES:
tp_target = entry_price * (1 + tp_percent / 100)
if current_price >= entry_price * (1 + tp_percent / 100):
exit_price = current_price * (1 - SLIPPAGE_PERCENT / 100)
sell_amount = position * (partial_sell / 100) # Use partial_sell directly
fee = exit_price * sell_amount * TRADING_FEES_PERCENT / 100
balance += (sell_amount * exit_price) - fee
position -= sell_amount # Reduce the position
trades.append({
"action": f"partial_sell_{partial_sell}%",
"price": exit_price,
"fee": fee,
"remaining_position": position,
"balance": balance,
"timestamp": data["timestamp"].iloc[i]
})
# Final sell logic
if data[f"short_ma"].iloc[i] < data[f"long_ma"].iloc[i] and data[f"short_ma"].iloc[i - 1] >= data[f"long_ma"].iloc[i - 1]:
exit_price = current_price * (1 - SLIPPAGE_PERCENT / 100)
fee = exit_price * position * TRADING_FEES_PERCENT / 100
balance += (position * exit_price) - fee
position = 0
trades.append({"action": "sell", "price": exit_price, "fee": fee, "timestamp": data["timestamp"].iloc[i]})
# Final balance
if position > 0:
exit_price = data["close"].iloc[-1] * (1 - SLIPPAGE_PERCENT / 100)
fee = exit_price * position * TRADING_FEES_PERCENT / 100
balance += (position * exit_price) - fee
trades.append({"action": "sell (final)", "price": exit_price, "fee": fee, "timestamp": data["timestamp"].iloc[-1]})
total_pnl = balance - 1000
max_drawdown = calculate_max_drawdown(trades)
return {
"short_ma": short_ma_type,
"short_ma_length": short_ma_length,
"long_ma": long_ma_type,
"long_ma_length": long_ma_length,
"tp_percent": tp_percent,
"partial_sell": partial_sell,
"final_balance": balance,
"total_pnl": total_pnl,
"max_drawdown": max_drawdown,
"trades": trades,
}
def calculate_max_drawdown(trades):
"""Calculate max drawdown from trade data."""
if not trades:
return 0
buy_prices = [trade['price'] for trade in trades if trade['action'] == 'buy']
sell_prices = [trade['price'] for trade in trades if trade['action'].startswith('sell')]
if not buy_prices or not sell_prices:
return 0
return min(buy_prices) - max(sell_prices)
def run_optimization(data, coin_name):
"""Run optimization using multiprocessing."""
combinations = []
for short_ma_type, long_ma_type in product(["sma", "ema"], repeat=2):
for short_ma_length, long_ma_length in product(MA_LENGTHS, MA_LENGTHS):
if short_ma_length >= long_ma_length:
continue
for tp_percent, partial_sell in product(TAKE_PROFIT_PERCENTAGES, PARTIAL_SELL_PERCENTAGES):
combinations.append((data, short_ma_type, short_ma_length, long_ma_type, long_ma_length, tp_percent, partial_sell))
# Use tqdm for a progress bar
results = []
with Pool(CPU_CORE_LIMIT) as pool:
for result in tqdm.tqdm(pool.imap_unordered(backtest_combination, combinations), total=len(combinations)):
result["coin"] = coin_name
results.append(result)
# Convert results to DataFrame
return pd.DataFrame(results)
def run_random_iterations(data, iterations, coin_name):
"""Run the optimization over random time periods and aggregate results."""
aggregated_results = []
for iteration in range(iterations):
print(f"[INFO] Running iteration {iteration + 1}/{iterations} for {coin_name}...")
# Randomly sample start and end dates
start_index = random.randint(0, len(data) // 2)
end_index = random.randint(start_index + 1, len(data) - 1)
sampled_data = data.iloc[start_index:end_index]
print(f"[DEBUG] Start index: {start_index}, End index: {end_index}")
# Optimize on the sampled data
results = run_optimization(sampled_data, coin_name)
aggregated_results.extend(results.to_dict(orient="records"))
# Aggregate all results into a single DataFrame
aggregated_df = pd.DataFrame(aggregated_results)
aggregated_df.sort_values(by="final_balance", ascending=False, inplace=True)
aggregated_file = f"{OUTPUT_DIR}aggregated_{coin_name}_performance.csv"
aggregated_df.to_csv(aggregated_file, index=False)
print(f"[INFO] Aggregated results for {coin_name} saved.")
return aggregated_df
def aggregate_overall_results():
"""Aggregate results across all coins and rank settings."""
all_results = []
for file in os.listdir(OUTPUT_DIR):
if file.startswith("aggregated_") and file.endswith("_performance.csv"):
file_path = os.path.join(OUTPUT_DIR, file)
df = pd.read_csv(file_path)
all_results.append(df)
combined_results = pd.concat(all_results, ignore_index=True)
# Group by settings and sum final balances
grouped_results = (
combined_results.groupby(['short_ma', 'short_ma_length', 'long_ma', 'long_ma_length', 'tp_percent', 'partial_sell'])
.agg(total_final_balance=('final_balance', 'sum'))
.reset_index()
)
# Sort by total final balance
grouped_results.sort_values(by='total_final_balance', ascending=False, inplace=True)
# Save the ranked list to a CSV
ranked_file = os.path.join(OUTPUT_DIR, "ranked_settings_performance.csv")
grouped_results.to_csv(ranked_file, index=False)
print(f"[INFO] Ranked settings saved to {ranked_file}")
print("[INFO] Top-performing settings:")
print(grouped_results.head(10))
def main():
"""Main function to load data and run optimization."""
try:
iterations = int(input("Enter the number of random sampling iterations per coin: "))
except ValueError:
print("[ERROR] Invalid number of iterations. Please enter a valid integer.")
return
for file in os.listdir(COIN_CSV_DIR):
if file.endswith(".csv"):
coin_name = file.replace(".csv", "")
print(f"[INFO] Processing {file}...")
try:
# Load data and parse dates
data = pd.read_csv(os.path.join(COIN_CSV_DIR, file), parse_dates=["Open time"])
data.rename(columns={"Open time": "timestamp", "Close": "close"}, inplace=True)
data.sort_values(by="timestamp", inplace=True)
# Run random iterations for optimization
run_random_iterations(data, iterations, coin_name)
except Exception as e:
print(f"[ERROR] Failed to process {file}: {e}")
# Aggregate results across all coins
aggregate_overall_results()
if __name__ == "__main__":
main()