Cet article est la suite de celui-ci : Créer un bot de trading - Introduction. Si vous ne l'avez pas encore lu, je vous recommande donc vivement de le faire afin de comprendre la suite ! Il faut également connaître Backtrader pour comprendre, c'est pourquoi j'ai écrit des articles sur cette librairie sur le site !
Voici le code que nous avions à la fin de l'article précédent :
Dans un premier fichier, nommé DatafeedParams.py
:
from dataclasses import dataclass
import datetime as dt
import backtrader as bt
import ccxt
@dataclass
class DatafeedParams:
mode: str
symbol: str
timeframe: bt.TimeFrame
compression: int = 1
end_date: dt.datetime or str = dt.datetime.utcnow()
start_date: dt.datetime or str = None
timedelta: dt.timedelta = None
debug: bool = False
exchange: any = ccxt.bitfinex()
Dans un second fichier, nommé DatafeedGenerator.py
:
import datetime as dt
import backtrader as bt
timeframes_mapper = {
bt.TimeFrame.Minutes: "m",
bt.TimeFrame.Days: "d",
bt.TimeFrame.Months: "M",
}
class DatafeedGenerator:
def __init__(self, datafeed_params):
self.p = datafeed_params
if type(self.p.start_date) == str:
self.p.start_date = dt.datetime.strptime(self.p.start_date, "%Y/%m/%d %H:%M:%S")
if type(self.p.end_date) == str:
self.p.end_date = dt.datetime.strptime(self.p.end_date, "%Y/%m/%d %H:%M:%S")
if self.p.timedelta:
self.p.start_date = self.p.end_date - self.p.timedelta
def format_timeframe(self):
if self.p.timeframe == bt.TimeFrame.Minutes:
if self.p.compression == 60:
return "1h"
if self.p.compression == 120:
return "2h"
if self.p.compression == 240:
return "4h"
return f"{self.p.compression}{timeframes_mapper[self.p.timeframe]}"
def extract_klines(self):
exchange = self.p.exchange
timestamp = int(dt.datetime.timestamp(self.p.start_date) * 1000) # Timestamp formatting in ms
data = exchange.fetch_ohlcv(symbol=self.p.symbol, timeframe=self.format_timeframe(), since=timestamp,
limit=10000)
while data[-1][0] < dt.datetime.timestamp(self.p.end_date) * 1000: # If more than 10 000 candles
data2 = exchange.fetch_ohlcv(symbol=self.p.symbol, timeframe=self.format_timeframe(), since=data[-1][0],
limit=10000)
for i in range(len(data2)):
if i != 0:
data.append(data2[i])
for i in range(len(data)):
data[i] = tuple(data[i])
return data
Cependant, vous avez peut-être un code différent. En effet, nous avions dit qu'en fonction des marchés que vous souhaitez utiliser, les API peuvent être différentes, et donc les méthodes pour récupérer les données différentes également. Pour ma part, je souhaite utiliser mon bot pour les cryptomonnaies, donc j'utilise la librairie ccxt
. Si ça se trouve, vous récupérez des données avec requests
, ou une autre librairie, c'est pourquoi le code est susceptible d'être différent.
Par contre, dans tous les cas, le code suivant doit fonctionner et vous renvoyer des données :
symbol = "BTCEUR"
timeframe = bt.TimeFrame.Minutes
compression = 240
start_date = "2021/01/01 0:0:0"
end_date = "2022/01/01 0:0:0"
params = DatafeedParams(
mode="BACKTEST",
symbol=symbol,
timeframe=timeframe,
compression=compression,
start_date=start_date,
end_date=end_date
)
generator = DatafeedGenerator(params)
data = generator.extract_klines()
print(data)
En gros, actuellement, vous pouvez :
L'objectif va maintenant être de convertir les données brutes en Datafeed, réellement exploitable.
La première étape est de formatter les données brutes. Pour ma part, j'obtiens une liste de tuples, et les tuples contiennent les données OHLCV dont j'ai besoin. L'objectif est donc de convertir ces données en un Dataframe ayant pour index la date et pour colonnes Open High Low Close Volume.
Voilà à quoi ressemblent mes données :
[(1609459200000, 23690.39141334, 24227.0, 23508.0, 24084.0, 365.16678168), (1609473600000, 24107.0, 24146.0, 23698.0, 23907.0, 65.81648789), ...]
Les vôtres sont peut-être sous forme différente (par exemple sous forme de dictionnaire, ou sous forme de liste de listes), mais l'objectif est d'obtenir le même résultat final.
On crée donc une fonction format_klines
(qui n'appartient pas forcément à la classe DatafeedGenerator), qui prend en entrée des données de prix sous une forme dépendant de comment vous les extrayez, et qui renvoie ces données formattées sous cette forme :
Date Open ... Close Volume
0 2021-01-01 01:00:00 23690.391413 ... 24084.0 365.166782
1 2021-01-01 05:00:00 24107.000000 ... 23907.0 65.816488
...
Voici ma fonction :
def format_klines(self, klines):
columns = 'Date Open High Low Close Volume'.split(
' ')
df = pd.DataFrame(klines, columns=columns)
df = df.astype('float64')
df.loc[:, "Date"] = df.loc[:, "Date"].apply(epoch_to_datetime)
after = df[df["Date"] >= self.p.start_date]
before = df[df["Date"] <= self.p.end_date]
between = after.merge(before)
between.set_index(["Date"], inplace=True)
return between
Je convertis ma liste de tuples en Dataframe dont je précise les colonnes, ensuite je convertis le type de ces colonnes en float64
, puis je convertis mes dates qui sont sous format epoch en millisecondes en datetimes. Finalement, je filtre mon Dataframe pour qu'il corresponde aux dates renseignées en paramètres, et je mets la date en index. Encore une fois, si vous avez utilisé les mêmes codes que moi jusqu'ici, vous pouvez simplement copier coller ma fonction, par contre si vous avez utilisé d'autres moyens d'extraction de données, cette fonction sera sûrement différente car l'entrée ne sera pas sous la même forme, par contre faites en sorte que la sortie soit sous forme de Dataframe OHLCV, je préfère le rappeler.
Nous avons donc maintenant un Dataframe très propre. Ce que nous pouvons faire ensuite, c'est sauvegarder ce Dataframe dans un fichier afin de ne pas perdre du temps et de la connexion pour extraire des informations en ligne à chaque fois que vous utilisez votre bot. Ainsi, en fonction des DatafeedParams utilisés, on aimerait vérifier s'il existe un fichier associé à ces paramètres, si c'est le cas on le charge, par contre s'il n'y en a pas on extrait les informations en ligne, et on crée le fichier à partir de ces informations.
En fait, on peut tout simplement sauvegarder notre Dataframe avec sa méthode to_csv
. La question est plutôt de savoir comment lors du chargement de fichiers je peux savoir quel fichier contient les informations associées à mes DatafeedParams. Pour cela, on va identifier chacun de nos Dataframes avec un titre bien particulier. Les paramètres seront donc contenus dans le titre ! Voici un format de titre que je vous propose : symbole_date-de-début_date-de-fin_intervalle.csv
, où "symbole" est le titre (BTC-EUR, BNB-BTC, MSFT, APPL, EUR-USD) et "intervalle" l'intervalle (5m, 1h, 4h, 1jour, etc...). De cette façon, tous nos paramètres sont contenus dans le titre.
Voici donc la fonction pour créer un titre à partir des paramètres :
def get_file_title(self):
start_date_str, end_date_str = self.p.start_date.strftime("%Y-%m-%d"), self.p.end_date.strftime("%Y-%m-%d")
symbol_str = self.p.symbol.replace("/", "-")
tf_str = self.format_timeframe()
return f"{symbol_str}_{start_date_str}_{end_date_str}_{tf_str}.csv"
Cette fonction appartient à la classe DatafeedGenerator car elle doit accéder aux paramètres.
Ce qu'il nous reste à faire maintenant, c'est simplement de créer un modèle de Datafeed, pour que Backtrader puisse créer un Datafeed à partir de nos fichiers csv (il préciser qu'en première colonne on a la date, ensuite les prix, ensuite le volume, etc...).
On crée donc une classe CustomOHLC
qui contient le modèle de nos fichiers csv :
import backtrader.feeds as btfeed
class CustomOHLC(btfeed.GenericCSVData):
params = (
('datetime', 0), # la date est dans la 1ère colonne
('open', 1), # le prix d'ouverture dans la 2ème
('high', 2), # etc...
('low', 3),
('close', 4),
('volume', 5),
('openinterest', -1) # on n'a pas de colonne openinterest, donc on met -1
)
Maintenant on a tout pour générer nos Datafeed ! Il ne nous reste plus qu'à tout rassembler dans une méthode de notre classe, qu'on va appeler generate_datafeed
.
def generate_datafeed(self):
title = self.get_file_title()
if not os.path.isfile(f"data/datasets/{title}"):
klines = self.extract_klines()
klines_formatted = format_klines(klines)
klines_formatted.to_csv(f"data/datasets/{title}")
return CustomOHLC(dataname=f"data/datasets/{title}", timeframe=self.p.timeframe, compression=self.p.compression,
sessionstart=self.p.start_date)
Voici un petit recap de ce code :
sessionstart
.Attention, j'ai décidé d'organiser mon projet de façon à ne pas tout avoir à la racine, j'ai donc créé des dossiers pour stocker les données. Si vous utilisez le même code que moi, créez donc ces dossiers également.
Il faut maintenant vérifier si le Datafeed généré est correct. Pour cela, on va créer un cerebro et une stratégie basique pour afficher nos données, voici donc le code total (sans les imports) :
class MyStrat(bt.Strategy):
def next(self):
print(f"Close : {self.datas[0].close[0]}, Date : {self.datas[0].datetime.date(0)}")
symbol = "BTCEUR"
timeframe = bt.TimeFrame.Minutes
compression = 240
start_date = "2021/01/01 0:0:0"
end_date = "2022/01/01 0:0:0"
params = DatafeedParams(
mode="BACKTEST",
symbol=symbol,
timeframe=timeframe,
compression=compression,
start_date=start_date,
end_date=end_date
)
generator = DatafeedGenerator(params)
datafeed = generator.generate_datafeed()
cerebro = bt.Cerebro()
cerebro.adddata(datafeed)
cerebro.addstrategy(MyStrat)
cerebro.run()
Tout marche, et normalement un fichier est créé contenant les données du Datafeed ! Vous pouvez essayer en changeant les paramètres, ça marchera aussi. On va maintenant pouvoir passer au cœur du bot, et programmer l'Engine, c'est à dire le moteur du bot, ce qui mix tout pour le faire tourner.
Le moteur est la pièce maîtresse de notre bot. On va lui passer des paramètres en entrée, qu'il va convertir en résultats de backtests et en trades. C'est une pièce sur laquelle viennent donc se greffer les autres, comme notre génération de Datafeed, mais aussi plus tard nos stratégies, nos analyzers, etc...
Comme pour le générateur de Datafeed, on va commencer par définir une configuration pour notre moteur. On doit donc établir la liste des paramètres qu'on peut lui passer en entrée.
Voici la classe EngineConfiguration
, qui est en fait la classe utilisée pour représenter la configuration de notre moteur :
from dataclasses import dataclass
from dataclasses import field
import backtrader as bt
import datetime as dt
import ccxt
@dataclass
class EngineConfiguration:
mode: str # Mode de fonctionnement (BACKTEST, OPTIMIZE, PAPER, LIVE)
symbol: str # Symbole
timeframe: bt.TimeFrame # Timeframe
end_date: dt.datetime or str = dt.datetime.utcnow() # Date de fin pour le Datafeed
start_date: dt.datetime or str = None # Date de début pour le Datafeed
compression: int = 1 # Compression
timedelta: dt.timedelta = None # Timedelta utilisé pour le Datafeed
debug: bool = False # Debug utilisé pour le Datafeed
analyzers: list = field(default_factory=list) # Analyzers
cash: float = 100_000 # Cash virtuel pour les simulations
commission: float = 0.2 # Commission du broker pour les simulations
kwargs: dict = field(default_factory=dict) # Kwargs pour cerebro.run
currency: str = None # Monnaie utilisée pour passer les ordres
write_to: str = None # Chemin où enregistrer les résultats des simulations
stdstats: bool = True # Statistiques de base utilisées pour l'affichage graphique
observers: list = field(default_factory=list) # Observers
timers: list = field(default_factory=list) # Timers
strategies: list = field(default_factory=list) # Stratégies
save_results: str = None # Nom du fichier de résultats des simulations
exchange: ccxt.bitfinex or ccxt.binance = ccxt.bitfinex() # Exchange à utiliser pour le Datafeed si besoin
use_bokeh: bool = True # Pour utiliser Bokeh pour l'affichage graphique
Comme vous pouvez le remarquer, la configuration est assez complexe, il y a beaucoup de paramètres. Les Observers, les Timers et les Analyzers sont des objets Backtrader dont nous parlerons plus tard, si jamais vous vous demandez à quoi ils correspondent. Egalement, Bokeh est une libraire utilisée pour faire de beaux affichages graphiques simplement, nous verrons comment l'intégrer afin de représenter nos données et nos stratégies.
Maintenant que notre configuration est bien définie, on va pouvoir développer notre moteur.
class Engine:
Pour gérer la configuration, nous allons utiliser écrire une méthode et un attribut dédiés à ça. Donc on ne va pas gérer ça lors de l'initialisation, ce qui signifie qu'on a un constructeur très simple :
def __init__(self):
self.config = None
On va intégrer tout de suite notre méthode pour mettre à jour la configuration, au moins ce sera fait :
def set_configuration(self, config):
self.config = config
Il nous faut des méthodes pour générer et interagir avec notre Datafeed.
Tout d'abord, pour le générer, on utilise simplement notre configuration qu'on passe dans notre DatafeedGenerator :
def generate_datafeed(self):
datafeed_params = DatafeedParams(mode=self.config.mode, symbol=self.config.symbol,
timeframe=self.config.timeframe,
end_date=self.config.end_date,
start_date=self.config.start_date,
compression=self.config.compression,
timedelta=self.config.timedelta,
debug=self.config.debug, exchange=self.config.exchange,)
datafeed_generator = DatafeedGenerator(datafeed_params)
return datafeed_generator.generate_datafeed()
Ensuite, on peut de temps en temps avoir besoin de resampler notre Datafeed, c'est à dire changer sa résolution (passer d'un Datafeed 5m à un Datafeed 1h par exemple). Backtrader nous permet de réaliser ceci simplement avec une méthode intégrée à la classe Datafeed. Mais comment savoir si on a besoin de changer la résolution ? Ce sont les stratégies qu'on utilise qui nous donnent cette information. Certaines stratégies auront en effet besoin de resampler des Datafeeds, cela sera donc stocké dans les paramètres des stratégies sous la forme d'un tuple contenant le timeframe et la compression. Voici donc comment resampler notre Datafeed :
def resample_datafeed(self, datafeed):
timeframes = []
for strategy in self.config.strategies:
if "timeframes" in strategy.parameters:
for timeframe in strategy.parameters["timeframes"]:
timeframes.append(timeframe)
timeframes = set(timeframes)
for timeframe in timeframes:
self.cerebro.resampledata(datafeed, timeframe=timeframe[0], compression=timeframe[1])
On veut également réaliser une méthode pour gérer l'affichage graphique de nos résultats. On va d'abord installer la librairie dont on a besoin :
pip install backtrader_plotting
Cette librairie contient une fonction nous permettant de créer un shéma à utiliser dans la méthode plot
d'un cerebro. Voici donc ce qu'on fait :
def plot(self):
if self.config.use_bokeh:
b = backtrader_plotting.Bokeh(style="bar", plot_mode="single", scheme=backtrader_plotting.schemes.Tradimo())
self.cerebro.plot(b)
else:
scheme = {"style": 'candlestick', "barup": "green"}
self.cerebro.plot(**scheme)
On crée simplement un shéma qu'on passe à notre cerebro (dont on n'a d'ailleurs pas encore parlé, ça arrive bientôt).
Le broker est ce qui fait l'intermédiaire entre l'utilisateur et les marchés financiers. Dans notre cas, c'est ce qui va nous permettre de configurer nos simulations en choisissant un montant de départ à utiliser, et une valeur de commission. On crée donc une méthode agissant directement sur le broker de notre cerebro en utilisant notre configuration :
def configure_broker(self):
self.cerebro.broker.setcash(self.config.cash)
self.cerebro.broker.setcommission(self.config.commission / 100)
Bon, on a créé toutes nos méthodes, maintenant il faut tout mélanger afin de finaliser notre moteur. On commence par créer une méthode run
qui sera utilisée afin de lance notre moteur, dans laquelle on instancie un Cerebro, qui est l'objet de base de Backtrader.
def run(self):
self.cerebro = EngineCerebro()
Ensuite, on peut configurer notre broker, générer notre datafeed, et l'ajouter au cerebro.
self.configure_broker()
datafeed = self.generate_datafeed()
self.cerebro.adddata(datafeed)
Ensuite, si on a précisé dans la configuration qu'on souhaite enregistrer les résultats, on ajoute un Writer de Backtrader.
if self.config.write_to:
self.cerebro.addwriter(bt.WriterFile, out="data/backtesting_results/" + self.config.write_to)
On ajoute nos différents objets. Pour commencer, les Analyzers et les Observers (sur lesquelles on reviendra plus tard).
for analyzer in self.config.analyzers:
self.cerebro.addanalyzer(analyzer.analyzer, **analyzer.parameters)
for observer in self.config.observers:
self.cerebro.addobserver(observer.observer, **observer.parameters)
Puis notre Sizer (pour l'instant on va ajouter un sizer par défaut, mais plus tard on rendra ça personnalisable) :
self.cerebro.addsizer(bt.sizers.AllInSizer)
Puis nos Timers :
for timer in self.config.timers:
self.cerebro.add_timer(timername=timer.timername, function=timer.function,
**timer.parameters)
Puis nos stratégies :
for strategy in self.config.strategies:
if self.config.mode == "BACKTEST" or self.config.mode == "OPTIMIZE":
self.cerebro.optstrategy(strategy.strategy, **strategy.parameters)
else:
self.cerebro.addstrategy(strategy.strategy, **strategy.parameters)
Notez qu'il faut traiter les deux cas : soit on fait du backtesting classique, soit on optimise (voir la série de tutos sur Backtrader pour plus de détails).
D'ailleurs, vous avez également dû remarquer que j'ai personnalisé les différents objets Backtrader, comme les Sizers, les Analyzers, etc... J'ai créé mes propres objets, nous verrons ceci en détail dans les prochains articles du projet.
Ensuite on resample le Datafeed si nécessaire :
self.resample_datafeed(datafeed)
Puis maintenant on peut lancer nos backtests et stocker nos résultats, tout est configuré :
if self.config.mode == "BACKTEST":
results = self.cerebro.run(maxcpus=1, optreturn=False, **self.config.kwargs)
else:
results = self.cerebro.run(maxcpus=1, optreturn=True, **self.config.kwargs)
Encore une fois on distingue les deux modes possibles, car dans un cas on aura des résultats optimisés et dans l'autre non.
Ensuite il faut sauvegarder nos résultats, si ceci est précisé dans la configuration :
if self.config.save_results:
with open(f"data/backtesting_results/{self.config.save_results}", "wb") as file:
pickle.dump(results, file)
(si vous ne savez pas ce qu'est pickle dans ce code, j'en parle ici).
Finalement, on n'a plus qu'à renvoyer les résultats du backtest.
return results
Voici donc notre classe Engine, complète :
Maintenant, on aimerait tester notre moteur. On commence par notre configuration :
mode = "BACKTEST"
symbol = "BTCEUR"
timeframe = bt.TimeFrame.Minutes
compression = 240
start_date = "2021/01/01 0:0:0"
end_date = "2022/01/01 0:0:0"
strategies = []
configuration = EngineConfiguration(
mode=mode,
symbol=symbol,
timeframe=timeframe,
compression=compression,
start_date=start_date,
end_date=end_date,
strategies=strategies
)
Mais notre tableau de stratégies est vide, cela ne fonctionnera pas. Voici une stratégie que vous pouvez implémenter, je n'explique pas tout de suite comment ça fonctionne, ce sera l'objet du prochain article :
from dataclasses import dataclass
class MyStrat(bt.Strategy):
def next(self):
print(f"Close : {self.datas[0].close[0]}, Date : {self.datas[0].datetime.date(0)}")
@dataclass
class Strat:
def __init__(self):
self.strategy = MyStrat
self.parameters = locals()
del self.parameters["self"]
Cette stratégie est simplement celle que nous avons testé avec notre Datafeed toute à l'heure, mais cette fois elle est adaptée à notre bot. On peut donc modifier notre tableau de stratégies :
strategies = [Strat()]
On peut maintenant instancier notre Engine, lui mettre sa configuration, et le lancer, tout devrait fonctionner. Voici donc le code qui devrait marcher pour vous (sans les imports) :
class MyStrat(bt.Strategy):
def next(self):
print(f"Close : {self.datas[0].close[0]}, Date : {self.datas[0].datetime.date(0)}")
@dataclass
class Strat:
def __init__(self):
self.strategy = MyStrat
self.parameters = locals()
del self.parameters["self"]
mode = "BACKTEST"
symbol = "BTCEUR"
timeframe = bt.TimeFrame.Minutes
compression = 240
start_date = "2021/01/01 0:0:0"
end_date = "2022/01/01 0:0:0"
strategies = [Strat()]
configuration = EngineConfiguration(
mode=mode,
symbol=symbol,
timeframe=timeframe,
compression=compression,
start_date=start_date,
end_date=end_date,
strategies=strategies
)
engine = Engine()
engine.set_configuration(configuration)
results = engine.run()
A ce stade, nous avons donc un programme qu'on peut configurer simplement et qui nous permet de faire tourner nos stratégies. Mais nous ne savons pas encore comment faire des stratégies. Ni comment analyser nos résultats. Nous verrons ceci dans les prochains articles.
Si cet article vous a plu, n'hésitez-pas à laisser un commentaire, et si vous avez des questions vous pouvez me contacter par mail ou via l'onglet disponible sur le site.
On se retrouve prochainement pour la suite de ce projet !