8 minutes
Trading in Python: Part 1
What is the difference between Trading and Investing?
Trading and investing both involve seeking profit in the stock market, but they pursue that goal in different ways.
Traders jump in and out of stocks within weeks, days, even minutes, with the aim of short-term profits. They often focus on a stock’s technical factors rather than a company’s long-term prospects. What matters to traders is which direction the stock will move next and how the trader can profit from that move.
Investors have a longer-term outlook. They think in terms of years and often hold stocks through the market’s ups and downs.
For this activity, I will be using Jupyter Lab, and this article is a Markdown version of my notebook.
In the first place, we need to install the investpy
library (you can find it here). The library is a API to connect to investing.com and retrieve information about an ETF or a stock security.
After that, we can make our own wrapper to facilitate the treatment of data, and convert the data to EUR, as it will be the currency we will use in this example.
Save the code in the file Security.py
in the same location of your Jupyter Notebook
from datetime import datetime, timedelta
import pandas as pd
# import sys
# sys.path.append('/home/luis/projects/TradingMachine/lib/investpy')
import investpy
from currency_converter import CurrencyConverter, RateNotFoundError
cc = CurrencyConverter('http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip',
fallback_on_missing_rate=True,
fallback_on_missing_rate_method='linear_interpolation')
class Security:
@staticmethod
def get_investingcom_instrument(search_code, country=None, exchange=None):
results = []
countries = None
if country is not None:
countries = [country]
try:
for i in investpy.search_quotes(text=search_code, countries=countries):
if exchange is not None:
if exchange.lower() == i.exchange.lower():
results.append(i)
else:
results.append(i)
except:
pass
if len(results) == 0:
raise Exception('No results found on investing.com for {} / {} / {}'.format(search_code, country, exchange))
if len(results) > 1:
print_results = []
for result in results:
print_results.append((result.country, result.exchange))
raise Exception('Several results found for {} / {} / {} \n'.format(search_code, country, exchange) +
'{}'.format(print_results))
return results[0]
def _get_data(self):
date_format='%d/%m/%Y'
today = datetime.today()
time_ago = datetime.today() - timedelta(days=2000)
today_str = today.strftime(date_format)
time_ago_str = time_ago.strftime(date_format)
data = self.investingapi.retrieve_historical_data(from_date=time_ago_str, to_date=today_str)
exchange = list()
for element in data.index:
try:
exchange.append(cc.convert(1, self.currency, 'EUR', element.to_pydatetime()))
except RateNotFoundError as e:
print('INFO: {}. Taking yesterday value'.format(e))
exchange.append(cc.convert(1, self.currency, 'EUR', element.to_pydatetime() - timedelta(days=1)))
data['Exchange Rate'] = exchange
data['EUR Value'] = data['Close'] * data['Exchange Rate']
return data
def _moving_averages_holder(self):
data = pd.DataFrame(index=self.quotes.index.copy())
data['Current Value'] = self.quotes['EUR Value'].copy()
moving_avg = [1, 10, 50, 100, 200, 400]
return data
def _get_dividends(self):
try:
data = investpy.stocks.get_stock_dividends(self.ticker, self.country)
data = data.set_index(['Date'])
except RuntimeError:
print('INFO: No dividends for {} ({}/{})'.format(self.name, self.ticker, self.country))
data = None
finally:
return data
def __init__(self, isin, currency='EUR', country=None, exchange=None):
self.investingapi = self.get_investingcom_instrument(search_code=isin, country=country, exchange=exchange)
self.currency = currency
self.isin = isin
self.security_type = self.investingapi.pair_type
self.name = self.investingapi.name
self.country = self.investingapi.country
self.exchange = self.investingapi.exchange
self.ticker = self.investingapi.symbol
self.quotes = self._get_data()
self._averages = self._moving_averages_holder()
self.dividends = self._get_dividends()
self.last_performance = {
'dSMA_200/dt': round(self.moving_average('SMA', 200).diff()[-5:].mean(), 2),
'dSMA_20/dt': round(self.moving_average('SMA', 20).diff()[-5:].mean(), 2)
}
def moving_average(self, avg_type, days):
name = '{} {}'.format(avg_type, days)
data = self._averages
if not name in data.columns:
if avg_type == 'SMA':
data[name] = data['Current Value'].rolling(window=days).mean()
elif avg_type == 'EMA':
data[name] = data['Current Value'].ewm(span=days, adjust=False).mean()
else:
raise Exception('Moving Average {} not recognised'.format(avg_type))
return data[name]
def get_last_quote(self):
return self.quotes.iloc[-1]
def get_last_date(self):
result = self.quotes.iloc[-1]
return result.name.to_pydatetime().strftime('%Y-%m-%d')
def test():
stock1 = Security('IBM', 'United States')
print('Quotes:')
print(stock1.quotes)
print('----------------')
print('Averages:')
print(stock1.moving_average('SMA', 10))
print('----------------')
print('Dividends:')
print(stock1.dividends)
print('----------------')
if __name__ == '__main__':
test()
We can try now the Security Class, creating an object called melia
with the data of a popular hotel chain in Spain
from Security import Security
melia = Security('Melia', country='Spain', exchange='Madrid')
print(melia.ticker)
print(melia.get_last_quote())
MEL
Open 6.777
High 6.777
Low 6.565
Close 6.615
Volume 0.000
Exchange Rate 1.000
EUR Value 6.615
Name: 2021-03-22 00:00:00, dtype: float64
We can see that we can retrieve the ticker
of the security and the last quotation avaliable in investing.com
Our objective is to simulate buying and sharing securities, for that, we need a place to store the transactions. We can create therefore a wallet
to record the date, concept and amount of the transaction.
We, therefore, create a new file Wallet.py
with the following class:
import pandas as pd
class Wallet:
def __init__(self):
self._wallet = pd.DataFrame(pd.DataFrame(columns=['Date', 'Concept', 'Total']))
self._counter=0
def add(self, date, concept, value):
self._wallet.loc[self._counter] = [date, concept, value]
self._counter = self._counter + 1
def balance(self):
return self._wallet['Total'].sum()
def transactions(self):
return self._wallet.copy()
def test():
wallet = Wallet()
wallet.add('2020-12-24', 'Gifts given', -100)
wallet.add('2020-12-25', 'Gifts received', 50)
print('Balance: {}'.format(wallet.balance()))
print('Transactions:')
print(wallet.transactions())
if __name__ == '__main__':
test()
Last, we need some trading strategies. I have implemented the strategy to buy/sell when the price crosses a moving averages (configurable by number of days) and when the simple moving average of the month crosses the moving average of the week.
Here is the code:
from Security import Security
from Wallet import Wallet
from Global import *
import json
class Strategy_Common:
def _get_date_or_next_index(self, date):
return self.security.quotes.index.get_loc(date, method='backfill')
def _get_date_or_previous_index(self, date):
return self.security.quotes.index.get_loc(date, method='pad')
def _get_start_stop_index(self, start_date, stop_date):
start = self._get_date_or_next_index(start_date)
stop = self._get_date_or_previous_index(stop_date)
return start, stop
def __init__(self, security, config):
self.security = security
self.wallet = Wallet()
self.keep_always = False
self.config = config
self.cash_funds = 10000
def return_results(self, results):
if results is not None:
result = {
'Security Type': self.security.security_type.title(),
'Security Name': self.security.name.encode('ascii',errors='ignore').decode("utf-8"),
'ISIN': self.security.isin,
'Country': self.security.country.title(),
'Exchange': self.security.exchange,
'Strategy': self.description,
'Simulated Balance': round(self.wallet.balance(), 4),
'Yield': round((self.wallet.balance() - self.cash_funds)/self.cash_funds, 2),
'Strategy Class Name': self.__class__.__name__,
'Strategy Config': json.dumps(self.config),
}
results.append(result)
def calculate_dividends(self, date, amount_shares):
if self.security.dividends is not None:
try:
dividend = self.security.dividends.loc[date]['Dividend'].sum()
except KeyError:
dividend = 0
if dividend > 0:
self.wallet.add(date, 'DIVIDEND {} SHARES'.format(amount_shares), dividend * amount_shares)
self.wallet.add(date, 'DIVIDEND TAX {} SHARES'.format(amount_shares), - dividend * amount_shares * TAX_RATE)
class Strategy_Hold_And_Sell(Strategy_Common):
# Buy at date_0 and Sell at date_1
def __init__(self, security, config=()):
super().__init__(security, config)
self.description = 'Buy, Hold and Sell'
self.keep_always = True
def should_buy(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
return True
def should_sell(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
return False
def simulate(self, start_date, stop_date, commission, results=None):
cash_funds = self.cash_funds
self.wallet.add(start_date, 'CASH DEPOSIT', cash_funds)
start_index, stop_index = self._get_start_stop_index(start_date, stop_date)
purchased = 0
for index, rows in self.security.quotes.iloc[start_index:stop_index].iterrows():
date = rows.name.to_pydatetime().strftime('%Y-%m-%d')
if purchased == 0 and self.should_buy(date):
amount_shares = int(self.wallet.balance()/self.security.quotes.loc[date]['EUR Value'])
self.wallet.add(date, 'BUY {} SHARES'.format(amount_shares),
- self.security.quotes.loc[date]['EUR Value'] * amount_shares * (1 + MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
purchased = amount_shares
self.calculate_dividends(date, purchased)
self.wallet.add(date, 'SELL {} SHARES'.format(amount_shares),
self.security.quotes.loc[date]['EUR Value'] * amount_shares * (1 - MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
self.return_results(results)
class Strategy_Buy_Value0_LT_Value1(Strategy_Common):
# Buy and sell between periods start_date and stop_date. Actual buy/sell order given by cross of rolling averages
def should_buy(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
value0 = self.security.moving_average(self._type_avg, self._value0).loc[date]
value1 = self.security.moving_average(self._type_avg, self._value1).loc[date]
if value0 < value1:
return True
return False
def should_sell(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
return not self.should_buy(date)
def __init__(self, security, config):
super().__init__(security, config)
self._type_avg = config[0]
self._value0 = int(config[1])
self._value1 = int(config[2])
self.description = 'Buy on {} days LT {} days'.format(self._type_avg + ' ' + str(self._value0),
self._type_avg + ' ' + str(self._value1))
def simulate(self, start_date, stop_date, commission, results=None):
cash_funds = self.cash_funds
self.wallet.add(start_date, 'CASH DEPOSIT', cash_funds)
start_index, stop_index = self._get_start_stop_index(start_date, stop_date)
purchased = 0
for index, rows in self.security.quotes.iloc[start_index:stop_index].iterrows():
date = rows.name.to_pydatetime().strftime('%Y-%m-%d')
value = rows['EUR Value']
if purchased == 0 and self.should_buy(date):
amount_shares = int(self.wallet.balance()/value)
self.wallet.add(date, 'BUY {} SHARES'.format(amount_shares),
- value*amount_shares*(1 + MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
purchased = amount_shares
elif purchased > 0 and self.should_sell(date):
self.wallet.add(date, 'SELL {} SHARES'.format(purchased),
value*purchased*(1 - MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
purchased = 0
self.calculate_dividends(date, purchased)
if purchased>0:
self.wallet.add(date, 'SELL {} SHARES'.format(purchased),
value*purchased*(1 - MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
self.return_results(results)
class Strategy_Buy_dSMA20(Strategy_Common):
# Buy and sell between periods start_date and stop_date. Actual buy/sell order given by cross of rolling averages
def should_buy(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
index = self._get_date_or_previous_index(date)
dSMA_20 = self.security.moving_average('SMA', 20).diff()[index-5:index].mean()
if dSMA_20 > self._trigger:
return True
return False
def should_sell(self, date=None):
if date is None:
date = self.security.get_last_date()
print('INFO No date specified, using: {}'.format(date))
index = self._get_date_or_previous_index(date)
dSMA_20 = self.security.moving_average('SMA', 20).diff()[index-5:index].mean()
if dSMA_20 < self._trigger:
return True
return False
def __init__(self, security, config=0):
super().__init__(security, config)
self._trigger = float(config)
self.description = 'Buy on dSMA20 > {}'.format(str(self._trigger))
def simulate(self, start_date, stop_date, commission, results=None):
cash_funds = self.cash_funds
self.wallet.add(start_date, 'CASH DEPOSIT', cash_funds)
start_index, stop_index = self._get_start_stop_index(start_date, stop_date)
purchased = 0
for index, rows in self.security.quotes.iloc[start_index:stop_index].iterrows():
date = rows.name.to_pydatetime().strftime('%Y-%m-%d')
value = rows['EUR Value']
if purchased == 0 and self.should_buy(date):
amount_shares = int(self.wallet.balance()/value)
self.wallet.add(date, 'BUY {} SHARES'.format(amount_shares),
- value*amount_shares*(1 + MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
purchased = amount_shares
elif purchased > 0 and self.should_sell(date):
self.wallet.add(date, 'SELL {} SHARES'.format(purchased),
value*purchased*(1 - MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
purchased = 0
self.calculate_dividends(date, purchased)
if purchased>0:
self.wallet.add(date, 'SELL {} SHARES'.format(purchased),
value*purchased*(1 - MARKET_FLUCTUATION))
self.wallet.add(date, 'COMMISSION', - commission)
self.wallet.add(date, 'ANALYSIS WAGE', - WAGE)
self.return_results(results)
In the next post, I will glue everything together to run a simulation over a period of one year.