Cet article est la suite directe de celui-ci : Backtrader - Introduction. Si vous ne l'avez pas lu, je vous recommande donc fortement d'aller le voir afin de bien comprendre la suite.
Aujourd'hui, nous allons apprendre à développer des stratégies de base afin de les tester dans Backtrader pour étudier leur rentabilité éventuelle.
En fait, le développement d'une stratégie commence par une simple réflexion algorithmique. En effet, il faut que vous réussissiez à traduire votre stratégie en différentes conditions bien définies et explicites.
La stratégie que nous allons développer pour l'exemple consistera à prendre une position acheteuse lorsque le RSI sera inférieur à 30, une position vendeuse lorsque le RSI sera supérieur à 70, et à clôturer une position 5 chandeliers après l'ouverture. On obtient très facilement nos conditions qui peuvent se traduire de cette façon algorithmiquement :
SI RSI < 30 => ACHAT
SI RSI > 70 => VENTE
SI PositionOuverte et ChandelierActuel = ChandelierOuverturePosition + 5 => FERMETURE
Maintenant, nous n'avons plus qu'à implémenter cela en Python !
Voici le code complet de l'article précédent, afin de reprendre de là :
import yfinance as yf
import backtrader as bt
import backtrader.feeds as btfeeds
class MyFirstStrat(bt.Strategy):
def next(self):
print(self.datas[0].close[0])
btc_eur = yf.Ticker("BTC-EUR")
data = btc_eur.history()
pandas_data = btfeeds.PandasData(dataname=data)
cerebro = bt.Cerebro()
cerebro.adddata(pandas_data)
cerebro.addstrategy(MyFirstStrat)
cerebro.run()
Si jamais vous avez une erreur lors de l'extraction des données avec Yfinance, il faut sûrement modifier le code de Yfinance (je vous dis ça car sur mon PC fixe je n'ai pas eu d'erreur mais sur mon portable j'ai eu une erreur, alors que je n'ai rien changé au code ni aux packages utilisés, donc cela peut arriver). Remplacez la ligne 295 du fichier base.py
dans yfinance
par la ligne suivante :
df.index = _pd.DatetimeIndex(df.index).tz_localize("UTC").tz_convert(
J'ai juste supprimé l'instruction que nous avions mis dans la méthode next
puisqu'elle n'est pas vraiment utile dans notre cas.
Notre première problématique est d'obtenir le RSI. Nous pourrions nous amuser à le recalculer. En effet, grâce à l'article précédent, vous savez comment accéder à des données au sein d'une stratégie. Mais heureusement pour nous, Backtrader nous fournit déjà de nombreux indicateurs, dont le RSI. Nous allons donc l'ajouter à notre stratégie. Pour cela, il nous suffit de définir un constructeur et de l'initialiser dans ce constructeur. Il faut également importer le module indicators
de Backtrader dans lequel on trouve le RSI.
import backtrader.indicators as btind
class MyFirstStrat(bt.Strategy):
def __init__(self):
self.rsi = btind.RSI(period=14)
def next(self):
pass
On peut précisier sa période en paramètres lors de son initialisation.
OK, mais il a pas besoin de données pour fonctionner ce RSI ?
Si si, évidemment. En fait il a déjà accès aux données puisqu'il est initialisé dans votre stratégie. Il y a accès comme vous vous y avez accès lorsque vous écrivez self.datas[0]
, donc pas besoin de lui passer des données en paramètres ! Par défaut, il utilise les données sur le prix de fermeture de votre premier Datafeed (self.datas[0].close
). Si vous souhaitez changer ce comportement, c'est à ce moment là qu'il faut lui passer des données en paramètres de cette façon :
self.rsi = RSI(self.datas[0].high[0], period=14)
self.rsi = RSI(self.datas[1].close[0], period=14)
Dans le premier exemple, le RSI utilisera les prix les plus hauts du premier Datafeed, et dans le deuxième exemple il utilisera les prix de fermeture du second Datafeed. Voici comment personnaliser les données prises par les indicateurs dans Backtrader.
Et maintenant, pour accéder aux données du RSI, le principe est le même que pour accéder aux données du Datafeed. C'est à dire que self.rsi[0]
vous renvoie le RSI à l'instant actuel, self.rsi[-1]
à l'instant précédent, etc... vous avez compris le principe maintenant.
Afin de vérifier si notre RSI est fonctionnel, nous allons voir la méthode log
.
Afin de débugger une stratégie, nous allons écrire une méthode pour afficher à chaque itération les informations que nous aurons défini. Dans notre exemple, l'idée serait d'afficher le RSI actuel et le prix à chaque itération. Pour cela, on implémente la fonction log
dans Backtrader. Elle se nomme log
par convention, mais vous pouvez l'appeler comme vous voulez.
def log(self, txt):
dt_formatted = self.datas[0].datetime.date(0).isoformat()
print(dt_formatted + " - " + txt)
On passe en paramètre le texte qu'on souhaite afficher. Ensuite, on récupère la date actuelle qu'on formatte de sorte qu'elle soit affichable. Puis on affiche notre date, et notre texte grâce à print
.
Ensuite, on modifie notre méthode next
pour appeler log
et lui passer le RSI en paramètres.
def next(self):
self.log("RSI : " + str(self.rsi[0]))
Attention à bien cast le RSI en chaîne de caractères, sinon vous aurez une erreur car on ne peut pas concaténer un str
et un float
.
On peut maintenant exécuter notre stratégie ! Voici le code que vous devriez avoir actuellement :
import yfinance as yf
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.indicators as btind
class MyFirstStrat(bt.Strategy):
def __init__(self):
self.rsi = btind.RSI(period=14)
def log(self, txt):
dt_formatted = self.datas[0].datetime.date(0).isoformat()
print(dt_formatted+" - "+txt)
def next(self):
self.log("RSI : " + str(self.rsi[0]))
btc_eur = yf.Ticker("BTC-EUR")
data = btc_eur.history()
pandas_data = btfeeds.PandasData(dataname=data)
cerebro = bt.Cerebro()
cerebro.adddata(pandas_data)
cerebro.addstrategy(MyFirstStrat)
cerebro.run()
Et voici la sortie obtenue en exécutant le code :
...
2022-01-24 - RSI : 25.76266717987049
2022-01-25 - RSI : 27.714095073654292
2022-01-26 - RSI : 28.508595377661436
Tout marche ! On a les bonnes dates, et notre RSI qui semble prendre des valeurs correctes, on peut passer à la suite.
On souhaite acheter quand le RSI passe est inférieur à 30, et vendre lorsqu'il est supérieur à 70. Et ceci pour chaque itération dans notre stratégie. On implémente donc ces conditions dans la méthode next
:
def next(self):
self.log("RSI : " + str(self.rsi[0]))
if self.rsi[0] < 30:
# BUY
pass
if self.rsi[0] > 70:
# SELL
pass
On va maintenant remplir le contenu de ces conditions. Pour dire à Backtrader de placer un ordre d'achat, on utilise self.buy()
. Pour un ordre de vente, on utilise self.sell()
. Mais lorsqu'on passe un ordre, il faut mémoriser cet ordre. En effet, si le RSI reste inférieur à 30 pour deux chandeliers, on ne va pas passer deux ordres d'achat. On en déduit une condition supplémentaire : si on est déjà dans le marché, on ne peut pas ouvrir de position. On doit donc stocker l'ordre passé dans une variable. Ça tombe bien, les méthodes ci-dessus nous renvoient un ordre. Mais il faut donc également déclarer cette variable dans le constructeur. Et en plus de ça, on va appeler notre méthode log
quand on passera un ordre afin d'en garder une trace. On va également vérifier si une position est déjà ouverte ou non avant de passer un ordre, pour cela on dispose de l'attribut position
de la classe Strategy
qui nous renvoie les informations de la position ouverte, sinon 0. Voici le code à modifier :
def __init__(self):
self.rsi = btind.RSI(period=14)
self.order = None
def next(self):
self.log("RSI : " + str(self.rsi[0]))
# Si nous n'avons pas encore pris de position
if not self.position:
if self.rsi[0] < 30:
# On achète !
self.order = self.buy()
self.log("ACHAT SOUMIS : " + str(self.datas[0].close[0]))
if self.rsi[0] > 70:
# On vend !
self.order = self.sell()
self.log("VENTE SOUMISE : " + str(self.datas[0].close[0]))
Mais l'ordre est uniquement soumis à ce stade, et pas encore exécuté. En effet, si on a pas assez d'argent pour acheter par exemple, l'ordre sera annulé. On aimerait pouvoir être informé sur le statut de l'ordre. Heureusement, on dispose de la méthode notify_order
intégrée dans la classe de base Strategy
qui s'exécute à chaque fois qu'un ordre est créé ou modifié.
Cette méthode est une méthode qu'on peut réécrire afin de modifier son comportement. Elle prend en paramètre un ordre. Nous allons l'utiliser afin de vérifier la bonne exécution de nos ordres.
En effet, un ordre peut avoir plusieurs statuts. Il peut être soumis, accepté, exécuté, annulé, rejeté, etc... Nous aimerions savoir quand un ordre est exécuté. Au passage, nous aimerions aussi mémoriser le moment où il est exécuté, puisque notre stratégie implique de fermer un ordre 5 chandeliers après son ouverture. Voici donc une implémentation de cette méthode à ajouter à notre classe :
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
if order.isbuy():
self.log("ACHAT EXECUTE : " + str(order.executed.price))
if order.issell():
self.log("VENTE EXECUTE : " + str(order.executed.price))
self.time_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Problème lors du traitement de l\'ordre')
self.order = None
Notez qu'on accède au statut d'un ordre en utilisant order.status
. Dans le code ci-dessus vous avez tous les statuts disponibles.
Ensuite on peut vérifier la nature d'un ordre avec order.isbuy()
et order.issell()
qui nous renvoient des booléens.
Finalement, on utilise len(self)
pour connaître le temps actuel et le stocker en mémoire. Cette syntaxe est un peu bizarre, mais en fait tout à fait logique au vu de comment notre classe est définie.
On avance ! Il ne nous manque plus que la condition de fermeture.
On souhaite fermer notre ordre 5 chandeliers après qu'il ait été exécuté. Pour fermer notre ordre, on va simplement prendre une position contraire (si notre ordre était vendeur, on prend une position acheteuse, et inversement). On a déjà un if
pour vérifier si nous ne sommes pas rentrés dans le marché, on peut donc rajouter un else
afin de vérifier notre condition de fermeture si jamais nous avons un ordre ouvert. Si c'est le cas, il faut vérifier si l'ordre est un ordre d'achat ou de vente, pour cela on utilise self.position.size
qui est supérieur à 0 si nous avons une position acheteuse ou inférieur à 0 si nous avons une positions vendeuse. Pour vérifier le temps actuel, on utilise len(self)
comme toute à l'heure. Si ce temps est égal au temps d'exécution de l'ordre + 5, on peut déclencher notre condition. Voici le code :
def next(self):
self.log("RSI : " + str(self.rsi[0]))
# Si nous n'avons pas encore pris de position
if not self.position:
if self.rsi[0] < 30:
# On achète !
self.order = self.buy()
self.log("ACHAT SOUMIS : " + str(self.datas[0].close[0]))
if self.rsi[0] > 70:
# On vend !
self.order = self.sell()
self.log("VENTE SOUMISE : " + str(self.datas[0].close[0]))
else:
if len(self) == self.time_executed + 5:
if self.position.size > 0:
self.order = self.sell()
elif self.position.size < 0:
self.order = self.buy()
Finalement, il faut préciser à notre cerebro
quelle quantité acheter. Il ne peut pas l'inventer ! Pour ça, Backtrader met à notre disposition des Sizers, qui sont des objets définissant la taille d'une position. Dans notre cas, on va utiliser un AllInSizer
, qui permet de dépenser tout le cash disponible à chaque fois (ne faites jamais ça en vrai trading !). Voici comment ajouter un Sizer :
cerebro.addsizer(bt.sizers.AllInSizer)
On va également ajouter une quantité initiale de cash disponible dans notre broker. Pour cela, on utilise cerebro.broker.set_cash()
.
cerebro.broker.set_cash(100000)
Vous pouvez maintenant tester, normalement il se passe des choses et vous voyez vos ordres s'exécuter !
Nous avons pu voir que tout marche jusqu'ici, en revanche, si vous avez repris le même code que moi, vous avez du remarquer que nous n'avons pas beaucoup de valeurs pour tester. En effet, nous avons moins de 30 valeurs, ce qui est vraiment très très peu. Si vous souhaitez vous entraîner, essayez d'obtenir vous-mêmes un autre dataset contenant plus de valeurs. Sinon, vous pouvez simplement télécharger un dataset de plus de 16 000 valeurs en cliquant ici.
Si vous avez téléchargé le dataset du site, il faut un peu modifier les paramètres du Datafeed avant de l'utiliser.
data = btfeeds.GenericCSVData(dataname="BNB-BTC_2020-01-01_2021-12-01_1h.csv", timeframe=bt.TimeFrame.Minutes, compression=60, openinterest=-1)
Si vous avez un peu observé le dataset, vous avez dû constater que l'intervalle temporel est 1 heure. Il faut indiquer ceci à notre Datafeed, car l'intervalle par défaut est 1 jour. On précise donc le timeframe qu'on met en minutes, et la compression qu'on met à 60 (60 minutes = 1h). Ce système vous permet de créer des intervalles de temps totalement personnalisés (j'aurai pu mettre la compression à 30, 45, 53, 240, etc... pour avoir différents intervalles de temps). Aussi, notre dataset ne comporte pas de colonne openinterest, il faut donc le préciser.
Voici donc le code total à ce stade :
import yfinance as yf
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.indicators as btind
import pandas as pd
class MyFirstStrat(bt.Strategy):
def __init__(self):
self.rsi = btind.RSI(period=14)
self.order = None
def log(self, txt):
dt_formatted = self.datas[0].datetime.date(0).isoformat()
print(dt_formatted+" - "+txt)
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
if order.isbuy():
self.log("ACHAT EXECUTE : " + str(order.executed.price))
if order.issell():
self.log("VENTE EXECUTE : " + str(order.executed.price))
self.time_executed = len(self)
elif order.status in [order.Canceled, order.Margin, order.Rejected]:
self.log('Problème lors du traitement de l\'ordre')
self.order = None
def next(self):
self.log("RSI : " + str(self.rsi[0]))
# Si nous n'avons pas encore pris de position
if not self.position:
if self.rsi[0] < 30:
# On achète !
self.order = self.buy()
self.log("ACHAT SOUMIS : " + str(self.datas[0].close[0]))
if self.rsi[0] > 70:
# On vend !
self.order = self.sell()
self.log("VENTE SOUMISE : " + str(self.datas[0].close[0]))
else:
if len(self) == self.time_executed + 5:
if self.position.size > 0:
self.order = self.sell()
elif self.position.size < 0:
self.order = self.buy()
data = btfeeds.GenericCSVData(dataname="BNB-BTC_2020-01-01_2021-12-01_1h.csv", timeframe=bt.TimeFrame.Minutes, compression=60, openinterest=-1)
cerebro = bt.Cerebro()
cerebro.broker.cash = 100000
cerebro.adddata(data)
cerebro.addsizer(bt.sizers.FixedSize)
cerebro.addstrategy(MyFirstStrat)
cerebro.run()
Vous pouvez le tester, ça marche ! En revanche, on aimerait bien avoir des informations sur notre stratégie, par exemple les profits. Une façon simple de voir cela est de récupérer le cash disponible après le cerebro.run
. Pour cela, on utilise cerebro.broker.cash
.
Donc on peut ajouter en toute dernière ligne :
print("Cash restant : " + str(cerebro.broker.cash))
De mon côté, lorsque j'exécute le code, je vois que le cash restant est environ 17 000. Sur un dépôt initial de 100 000, c'est catastrophique, on perd quasiment tout en un an... Stratégie à fuir apparemment. Pour en être certain, il faudrait l'analyser plus en détail, car peut-être qu'on peut modifier certains paramètres pour passer d'une stratégie perdante à une stratégie gagnante ? C'est ce que nous verrons dans le prochain article !
Pfiou, un article un peu complexe. Mais il est important de comprendre les bases afin de pouvoir élaborer des stratégies plus complexes, avec des indicateurs personnalisés ou des stop loss par exemple. Nous verrons cela prochainement.
Si vous avez des questions, n'hésitez pas à laisser un commentaire ou à me contacter, par mail ou via l'onglet disponible sur le site.
On se retrouve prochainement pour apprendre à optimiser et analyser nos stratégies !