Now, we will download financial data from one of two sources, Yahoo finance (yahoo) and the St. Lois Federal Reserve, by defining a function download() and passing in the tickers corresponding to the FAANG stocks: Facebook/Meta, Amazon, Apple, Netflix and Google:
import datetime, warningsimport yfinance as yfimport pandas as pdimport numpy as npfrom scipy.stats import zscorefrom scipy.signal import detrenddefdownload(symbols,start,days): end = start + datetime.timedelta(days=days) startstr = start.strftime('%Y-%m-%d') endstr = end.strftime('%Y-%m-%d')print(f'Obtaining {symbols} data from {startstr} to {endstr}...') close = yf.download(symbols, start=startstr, end=endstr, progress=True)['Close']# Match data up with weekdays weekdays = pd.date_range(start=startstr, end=endstr, freq='B') close = close.reindex(weekdays)# For any NaN's, propogate last valid value forward (and remove first value) z = close.fillna(method='ffill').values.T[:,2:]# Make sure to always detrend and normalise your data, otherwise most statistics will give spurious results. returndetrend(zscore(z,ddof=1,nan_policy='omit',axis=1))# The FAANG tickers (Facebook/Meta, Amazon, Apple, Netflix, Google)stocks = ['META','AMZN','AAPL','NFLX','GOOGL']# We'll download 140 days of data (corresponding to ~100 observations from business days)ndays =140# Set a recent(ish) starting date for the periodstart_datetime = datetime.datetime.strptime('2014-01-01', '%Y-%m-%d')# Earliest date we will sampleprint('Begin data download.')z =download(stocks,start_datetime,ndays)print(f'Done. Obtained MTS of size {z.shape}')
2. Inspecting the MTS
Now that we've obtained our data, we can inspect it to make sure everything looks okay. Let's begin by visualising the MTS in a single heatmap:
import matplotlib.pyplot as pltfrom scipy.stats import zscoreimport seaborn as snsdefplot_data(z,labels): plt.subplots() plt.pcolormesh(z,vmin=-1.5,vmax=1.5,cmap=sns.color_palette('icefire_r',as_cmap=True)) plt.colorbar() ticks = [t+0.5for t inrange(len(labels))] plt.yticks(ticks=ticks, labels=labels) plt.xlabel('Time') plt.ylabel('Symbol') plt.show()plot_data(z,stocks)
You should obtain a plot similar to the following:
3. Applying pyspi
Next, we instantiate a Calculator object using our data and compute all pairwise interactions. Then, we can inspect the results table:
from pyspi.calculator import Calculator# These two lines show the main usage of the calculator: simply instantiate and compute.calc =Calculator(z)calc.compute()# We can now inspect the results table, which includes hundreds of pairwise interactions.print(calc.table)
As an example, below we will examine how covariance, dynamic time warping, and Granger causality differs when computing the relationship between the FAANG stock-market data.
import matplotlib.pyplot as pltfrom matplotlib import colorsdefplot_mpi(S,identifier,labels,ax=None):""" Plot a given matrix of pairwise interactions, annotating the process labels and identifier """if ax isNone: _, ax = plt.subplots() plt.sca(ax)# Use a diverging cmap if our statistic goes negative (and a sequential cmap otherwise)if np.nanmin(S)<0.: maxabsval =max(abs(np.nanmin(S)),abs(np.nanmax(S))) norm = colors.Normalize(vmin=-maxabsval, vmax=maxabsval) plt.imshow(S,cmap='coolwarm',norm=norm)else: plt.imshow(S,cmap='Reds',vmin=0) plt.xticks(ticks=range(len(stocks)),labels=labels,rotation=45) plt.yticks(ticks=range(len(stocks)),labels=labels) plt.xlabel('Symbol') plt.ylabel('Symbol') plt.title(identifier) plt.colorbar()# Iterate through the four methods (covariance, dynamic time warping, Granger causality, and convergent cross-mapping), extract and plot their matricesspis = ['cov_EmpiricalCovariance','dtw_constraint-itakura','gc_gaussian_k-max-10_tau-max-2','ccm_E-None_max']for identifier in spis:# Simply index an SPI in the output table, which will give you an MxM dataframe of pairwise interactions (where M is the number of processes) S = calc.table[identifier]# Plot this dataframeplot_mpi(S,identifier,stocks)
You should obtain four separate plots, each corresponding to one of the four methods. Here we show an example of the 5x5 matrix of pairwise interactions (MPI) for the convergent cross-mapping method (identifier ccm):
4. Classifying MTS using SPIs
Now that you know how to compute the hundreds of pairwise interactions from data, let's put them to the test!
This part of the tutorial illustrates how to use sklearn to classify between two types of time-series using a comprehensive representation of their pairwise interactions.
Here, we will try to delineate the stock-market data of earlier from foreign exchange-rate data. We will start by downloading several instances of the data type for training/testing our classifier.
# Add in some more symbols for the market ratesforex = ['USDJPY=X','USDEUR=X','CNYUSD=X','GBP=X','CADUSD=X']ninstances =20# Dict to store our results (given by the key (type,starting date))database ={}for i inrange(ninstances):# Iterate the start datetime by ndays for each instance start_datetime = start_datetime + datetime.timedelta(days=ndays)print(f'[{i}/{ninstances-1}] Downloading data for the 140 period starting at {start_datetime}.')# Download the stock data (and plot) database[('stocks',start_datetime)]=download(stocks,start_datetime,ndays)plot_data(database[('stocks',start_datetime)],stocks)# Download the forex data (and plot) database[('forex',start_datetime)]=download(forex,start_datetime,ndays)plot_data(database[('forex',start_datetime)],forex)print(f'Successfully downloaded {ninstances} datasets of both types of data.')
Now, compute all pairwise interactions for all datasets (storing the results). For the purposes of demonstration, we will use the 'fast' subset to speed up calculations.
from copy import deepcopy# First, let's create one calculator and copy it for each MTS so that we don't need to re-initialise each timecalc =Calculator(subset='fast')results ={}for key in database:# Copy the top-level calculator mycalc =deepcopy(calc)# Name the calculator after the type of data (stock/forex) plus the starting date mycalc.name = key[0]+'_'+ key[1].strftime('%Y-%m-%d')# Ensure we have no NaNs in our dataset (preferably using a more clever way of data imputation than this one) dataset = np.nan_to_num(database[key])# Load the dataset mycalc.load_dataset(dataset)# Compute all pairwise interactions mycalc.compute()# Store our results results[key]= mycalc.table
Note: This computation will take a few minutes to run. If you would like to proceed using a pre-computed results file, simply download the following .pkl file and save it in the same location as your Jupyter notebook:
[OPTIONAL]: Load pre-computed results
To load the downloaded results into your notebook, ensure it has been saved in the same location as the notebook .ipynb file and use the following code:
import dill # pip install dill if you do not already have it installedwithopen('psypi_finance_results.pkl', 'rb')as f: results = dill.load(f)
Let's see if we can tell whether our data is Forex or stock-market data based on the covariance matrix. We will train a support vector machine (SVM) to perform the classification task.
from sklearn import svmfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import balanced_accuracy_scorefrom sklearn.preprocessing import StandardScalerscaler =StandardScaler()defget_data_matrix(table,spi):""" For a given SPI, extract the off-diagonals as feature vectors """# For each dataset in the results table...for k in table:# ...Extract the matrix corresponding to this SPI... A = table[k][spi].values# ...Vectorise and join the upper and lower triangles to form a feature vector... x = np.concatenate([A[np.triu_indices(A.shape[0],1)], A[np.tril_indices(A.shape[0],-1)]]) x = np.atleast_2d(x)# ...and concatenate each feature vector together to form a data matrixtry: X = np.concatenate([X,x], axis=0)exceptNameError: X = xreturn Xdefget_accuracy(clf,X,y): nrepeats =5 score = np.full(nrepeats,np.nan)for r inrange(nrepeats):# Convert the data into half-half train/test split X_train, X_test, y_train, y_test =train_test_split(X, y, test_size=0.5, random_state=r)# Normalise the features so that the SVM behaves well X_train = scaler.fit_transform(X_train)# Fit the classifier clf.fit(X_train,y_train)# Scale the test data (with the same scaler as above) X_test = scaler.transform(X_test)# Predict the output y_pred = clf.predict(X_test)# Get the accuracy for this train/test split (should be balanced) score[r]=balanced_accuracy_score(y_test, y_pred)# Return the average scorereturn np.nanmean(score)# First check with the standard covariance matrixspi ='cov_EmpiricalCovariance'X =get_data_matrix(results,spi)# Set up the dataset labels for classification ("forex" == 0, "stocks" == 1)y = np.array([int(key[0] =='stocks') for key in results])# Linear SVM classifier using the default settingsclf = svm.SVC(kernel='linear')score =get_accuracy(clf,X,y)print(f'Average accuracy for {spi} in delineating ForEx from FAANG data: {score}')
You should obtain an average accuracy of around 98%.
Now that we have tried classifying data using a single SPI, let's try this out for every SPI and see how they all perform and plot the results:
from sklearn.model_selection import train_test_splitimport seaborn as sns# Repeat last cell for each SPIscores ={}spis = mycalc.spis.keys()for spi in spis: X =get_data_matrix(results,spi)try: scores[spi]=get_accuracy(clf, X, y)print(f'Average score for {spi}: {scores[spi]}')exceptValueErroras err:print(f'Issue with SPI {spi}: {err}')# Get a histogram sns.histplot(scores)plt.xlabel('Average score')plt.ylabel('Number of SPIs')plt.show()
You should obtain a histogram like the following:
We can also inspect the average accuracy for each SPI individually:
# Load into a pandas series to pretty-print the resultsser = pd.Series(scores)with pd.option_context('display.max_rows', None, 'display.max_columns', None):print(ser.dropna().sort_values())plt.show()
This should produce the following output with 'cov_GraphicalLassoCV' being the top performing SPI for distinguishing forex from stock market data:
We can also inspect the MPIs to see what the distinguishing feature for our top performing SPI was:
# Get our top-performing SPItopspi = ser.index[ser.argmax()]print(f'Inspecting top SPI ({topspi}) in more detail...')# Unique time periods and datatypesperiods = np.unique([k[1] for k in results])dtypes = ['stocks','forex']# Set up our axesfig, axs = plt.subplots(len(periods),2,figsize=(15,80))for (d, p) in results:# Get the MPI for this dataset S = results[(d,p)][topspi]# The axis position i = [_i for _i, _p inenumerate(periods)if _p == p][0] j = [_j for _j, _d inenumerate(dtypes)if _d == d][0]# stocks or forex labels labels =globals()[dtypes[j]]# Plot the MPIplot_mpi(S,topspi,labels,ax=axs[i,j]) plt.subplots_adjust(hspace=1.)plt.tight_layout()plt.show()
This should produce the following output (here we show a small snippet of the full output):