1. 程式人生 > >Keras實戰:基於LSTM的股價預測方法(吐血推薦,新手入門必看~)

Keras實戰:基於LSTM的股價預測方法(吐血推薦,新手入門必看~)

    Hi,這裡是一隻殫精竭慮的老鼠屎。最近在處理公交資料,模型效果非常不理想。過程中學習了師兄留下的lstm做的金融資料預測,使用的是keras框架,這裡整理一下。這篇部落格裡面交代了包括資料的處理、模型搭建、模型調參、模型評估等重要環節,十分適合新手入門。師兄留下的jupyter notebook出處不詳。

目錄

1 準備工作

2 構建模型

3 模型優化

1 準備工作

1.1 引入相關庫

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pandas import datetime
import math, time
import itertools
from sklearn import preprocessing
import datetime
from sklearn.metrics import mean_squared_error
from math import sqrt
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.recurrent import LSTM
from keras.models import load_model
import keras
import pandas_datareader.data as web
import h5py
from keras.callbacks import EarlyStopping, ModelCheckpoint

 1.2 引入引數

stock_name = '^GSPC'
seq_len = 22
d = 0.2
shape = [4, seq_len, 1] # feature, window, output
neurons = [128, 128, 32, 1]
epochs = 300

2 構建模型

2.1 下載資料並正則化

    在引入庫時,pandas_datareader.data裡面有各種金融資料。我們就是從這裡下載這次demo用的資料。使用的是1971年至現在的資料。同時,在get_stock_data函式中,我們也使用了scikit-learn中的MinMaxScaler()對資料進行了正則化。

def get_stock_data(stock_name, normalize=True):
    start = datetime.datetime(1971, 1, 1)
    end = datetime.date.today()
    df = web.DataReader(stock_name, "yahoo", start, end)
    df.drop(['Volume', 'Close'], 1, inplace=True)
    
    if normalize:        
        min_max_scaler = preprocessing.MinMaxScaler()
        df['Open'] = min_max_scaler.fit_transform(df.Open.values.reshape(-1,1))
        df['High'] = min_max_scaler.fit_transform(df.High.values.reshape(-1,1))
        df['Low'] = min_max_scaler.fit_transform(df.Low.values.reshape(-1,1))
        df['Adj Close'] = min_max_scaler.fit_transform(df['Adj Close'].values.reshape(-1,1))
    return df

df = get_stock_data(stock_name, normalize=True)

    在jupyter notebook中可以使用df.head()看一下正則化好後的資料。這裡,Adj Close是我們的標籤列,我們預測的是最後一列的資料。

2.2 繪製正則化後的標籤資料

def plot_stock(stock_name):
    df = get_stock_data(stock_name, normalize=True)
    print(df.head())
    plt.plot(df['Adj Close'], color='red', label='Adj Close')
    plt.legend(loc='best')
    plt.show()
plot_stock(stock_name)

    經過plot_stock後的結果如下圖:

2.3 把最後一天的Adjusted Close作為y值

    這裡使用到了滑窗處理。在最初定義變數時,我們定義seq_len為22,即滑窗的大小為22。在預測時,滑窗拿前21天的資料(包括Adjusted Close)預測第22天的資料。在load_data模組裡面,我們定義了滑窗,用result列表將資料放入滑窗內。同時劃分了訓練集和驗證集。

def load_data(stock, seq_len):
    amount_of_features = len(stock.columns)
    data = stock.as_matrix() 
    sequence_length = seq_len + 1 # index starting from 0
    result = []
                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
    for index in range(len(data) - sequence_length): # maxmimum date = lastest date - sequence length
        result.append(data[index: index + sequence_length]) # index : index + 22days
    
    result = np.array(result)
    row = round(0.9 * result.shape[0]) # 90% split
    
    train = result[:int(row), :] # 90% date
    X_train = train[:, :-1] # all data until day m
    y_train = train[:, -1][:,-1] # day m + 1 adjusted close price
    
    X_test = result[int(row):, :-1]
    y_test = result[int(row):, -1][:,-1] 

    X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], amount_of_features))
    X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], amount_of_features))  

    return [X_train, y_train, X_test, y_test]

X_train, y_train, X_test, y_test = load_data(df, seq_len)

2.4 搭建神經網路 

def build_model2(layers, neurons, d):
    model = Sequential()
    
    model.add(LSTM(neurons[0], input_shape=(layers[1], layers[0]), return_sequences=True))
    model.add(Dropout(d))
        
    model.add(LSTM(neurons[1], input_shape=(layers[1], layers[2]), return_sequences=False))
    model.add(Dropout(d))
        
    model.add(Dense(neurons[2],kernel_initializer="uniform",activation='relu'))        
    model.add(Dense(neurons[3],kernel_initializer="uniform",activation='linear'))
    # model = load_model('my_LSTM_stock_model1000.h5')
    adam = keras.optimizers.Adam(decay=0.2)
    model.compile(loss='mse',optimizer='adam', metrics=['accuracy'])
    model.summary()
    return model

model = build_model2(shape, neurons, d)

    這裡, model.summary()函式可以將我們搭建的神經網路結構可視化出來。在lstm傳入引數時,input_shape()中傳入兩個引數,分別是滑窗大小和資料特徵的數量。這裡分別是22和4。我們的模型結構如下圖所示。

    這裡關於神經元的個數如何計算,可以看一下LSTM的神經元個數LSTM引數個數計算兩篇文章,裡面有比較詳細的推導過程。神經元個數= 4\times \left ( \left ( x_dim+y_dim \right )\times y_dim+y_dim \right )總結來說,神經元個數=4*((輸入特徵數+輸出特徵數)*輸出特徵數+輸出特徵數)。

    以第一層為例,輸入的特徵數是4,輸出的特徵數是128,所以神經元個數代入上式為68096。

2.5 執行模型

history=model.fit(
    X_train,
    y_train,
    batch_size=512,
    epochs=epochs,
    validation_split=0.1,
    verbose=1)

    這裡有一個小問題,那就是keras是基於tensorflow的框架而來的,如果當初tensorflow安裝的是gpu版本的,這裡直接執行程式碼會報錯。

    文章中指出,這是呼叫GPU時,視訊記憶體分配遇到了問題,有些解釋說當前電腦其他程式佔用視訊記憶體(遊戲等),所以發生錯誤,具體討論見帖子。比較保險的方式是在模型訓練之前為tensorflow或者keras分配視訊記憶體空間,tensorflow就用如下語句建立session: 

import tensorflow as tf
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.333)  
sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) 

     而keras就在引入keras時進行引數設定:

import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
config = tf.ConfigProto()
config.gpu_options.allocator_type = 'BFC' #A "Best-fit with coalescing" algorithm, simplified from a version of dlmalloc.
config.gpu_options.per_process_gpu_memory_fraction = 0.3
config.gpu_options.allow_growth = True
set_session(tf.Session(config=config))

    之後就可以看到一遍遍地在跑啦。

     我們還可以對訓練過程的train和test的loss進行視覺化。

import matplotlib.pyplot as plt
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

    得到了train和test上面loss的變化。 

2.6 在訓練集和測試集上的結果

    先交代一下評估使用的指標為MSE和RMSE。

def model_score(model, X_train, y_train, X_test, y_test):
    trainScore = model.evaluate(X_train, y_train, verbose=0)
    print('Train Score: %.5f MSE (%.2f RMSE)' % (trainScore[0], math.sqrt(trainScore[0])))

    testScore = model.evaluate(X_test, y_test, verbose=0)
    print('Test Score: %.5f MSE (%.2f RMSE)' % (testScore[0], math.sqrt(testScore[0])))
    return trainScore[0], testScore[0]

model_score(model, X_train, y_train, X_test, y_test)

    可以看一下結果:

    不過,這裡需要注意的有兩點:一是這個結果是歸一化後的誤差值,會比真實值好一些。二是細心的朋友可以發現,這裡評估的方式使用的是model.evaluate()直接評估,那麼它和model.predict()預測後再計算誤差值是不是一樣的呢?下面我們來做實驗試一下。

def model_score(model, X_train, y_train, X_test, y_test):
#其實這裡只需要傳入X_test和y_test即可,只是為了和上面格式保持一致因而也將X_train和y_train傳入了    
    y_hat = model.predict(X_test)
    y_t=y_test.reshape(-1,1)
    
    temp = pd.DataFrame(y_hat)  
    temp['yhat']=y_hat
    temp['y']=y_t
    temp_rmse = sqrt(mean_squared_error(temp.y,temp.yhat))
    temp_mse=mean_squared_error(temp.y,temp.yhat)
    print('TEMP RMSE: %.3f' % temp_rmse)
    print('TEMP MSE: %.3f' % temp_mse)
    return temp_rmse,temp_mse

model_score(model, X_train, y_train, X_test, y_test)

    執行後得到如下結果,兩個值全是衡量測試集的。 

     我們可以看到,和上面的結果是一樣的。那麼model.evaluate()和model.predict()到底是否相同呢?這裡參考了StackOverflow裡面的一個答案。

    model.evaluate()函式將提供每個batch的損失值。model.predict()函式提供所有樣本的實際預測。因此,即使使用相同的資料,也會存在差異,因為損失函式的值幾乎總是與預測值不同。這是兩個不同的東西。

2.7 視覺化預測結果

    這裡就是把歸一化以後的結果可視化出來。在這之前,因為我們把資料歸一化過,所以這裡再反歸一化推回原先的值。

def denormalize(stock_name, normalized_value):
    start = datetime.datetime(2000, 1, 1)
    end = datetime.date.today()
    df = web.DataReader(stock_name, "yahoo", start, end)
    
    df = df['Adj Close'].values.reshape(-1,1)
    normalized_value = normalized_value.reshape(-1,1)
    
    #return df.shape, p.shape
    min_max_scaler = preprocessing.MinMaxScaler()
    a = min_max_scaler.fit_transform(df)
    new = min_max_scaler.inverse_transform(normalized_value)
    return new

    之後就可以進行視覺化。

def plot_result(stock_name, normalized_value_p, normalized_value_y_test):
    newp = denormalize(stock_name, normalized_value_p)
    newy_test = denormalize(stock_name, normalized_value_y_test)
    plt2.plot(newp, color='red', label='Prediction')
    plt2.plot(newy_test,color='blue', label='Actual')
    plt2.legend(loc='best')
    plt2.title('The test result for {}'.format(stock_name))
    plt2.xlabel('Days')
    plt2.ylabel('Adjusted Close')
    plt2.show()

plot_result(stock_name, p, y_test)

    效果如下:

2.8 儲存模型

model.save('LSTM_Stock_prediction-20170429.h5')

3 模型優化

3.1 定義除錯的函式

def quick_measure(stock_name, seq_len, d, shape, neurons, epochs):
    df = get_stock_data(stock_name)
    X_train, y_train, X_test, y_test = load_data(df, seq_len)
    model = build_model2(shape, neurons, d)
    model.fit(X_train, y_train, batch_size=512, epochs=epochs, validation_split=0.1, verbose=1)
    # model.save('LSTM_Stock_prediction-20170429.h5')
    trainScore, testScore = model_score(model, X_train, y_train, X_test, y_test)
    return trainScore, testScore

3.2 調節超引數

3.2.1 優化dropoout值

dlist = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
neurons_LSTM = [32, 64, 128, 256, 512, 1024, 2048]
dropout_result = {}

for d in dlist:    
    trainScore, testScore = quick_measure(stock_name, seq_len, d, shape, neurons, epochs)
    dropout_result[d] = testScore

min_val = min(dropout_result.values())
min_val_key = [k for k, v in dropout_result.items() if v == min_val]
print (dropout_result)
print (min_val_key)

    這裡得到一個dict,裡面存放著每一個dropout值對應的誤差值。如下圖。

    接下來可以把它們可視化出來,直觀地瞭解誤差隨dropout值的變化:

lists = sorted(dropout_result.items())
x,y = zip(*lists)
plt.plot(x,y)
plt.title('Finding the best hyperparameter')
plt.xlabel('Dropout')
plt.ylabel('Mean Square Error')
plt.show()

    得到下面這張圖:

3.2.2 優化神經元個數

stock_name = '^GSPC'
seq_len = 22
shape = [4, seq_len, 1] # feature, window, output
epochs = 90
dropout = 0.3
neuronlist1 = [32, 64, 128, 256, 512]
neuronlist2 = [16, 32, 64]
neurons_result = {}

for neuron_lstm in neuronlist1:
    neurons = [neuron_lstm, neuron_lstm]
    for activation in neuronlist2:
        neurons.append(activation)
        neurons.append(1)
        trainScore, testScore = quick_measure(stock_name, seq_len, d, shape, neurons, epochs)
        neurons_result[str(neurons)] = testScore
        neurons = neurons[:2]    

    可視化出來:

lists = sorted(neurons_result.items())
x,y = zip(*lists)

plt.title('Finding the best hyperparameter')
plt.xlabel('neurons')
plt.ylabel('Mean Square Error')

plt.bar(range(len(lists)), y, align='center')
plt.xticks(range(len(lists)), x)
plt.xticks(rotation=90)

plt.show()

3.2.3 優化decay值

    由於前面並沒有對decay進行過設定,因此這裡重新定義超引數中帶有decay的model.

def build_model3(layers, neurons, d, decay):
    model = Sequential()
    
    model.add(LSTM(neurons[0], input_shape=(layers[1], layers[0]), return_sequences=True))
    model.add(Dropout(d))
        
    model.add(LSTM(neurons[1], input_shape=(layers[1], layers[0]), return_sequences=False))
    model.add(Dropout(d))
        
    model.add(Dense(neurons[2],kernel_initializer="uniform",activation='relu'))        
    model.add(Dense(neurons[3],kernel_initializer="uniform",activation='linear'))
    # model = load_model('my_LSTM_stock_model1000.h5')
    adam = keras.optimizers.Adam(decay=decay)
    model.compile(loss='mse',optimizer='adam', metrics=['accuracy'])
    model.summary()
    return model
def quick_measure(stock_name, seq_len, d, shape, neurons, epochs, decay):
    df = get_stock_data(stock_name)
    X_train, y_train, X_test, y_test = load_data(df, seq_len)
    model = build_model3(shape, neurons, d, decay)
    model.fit(X_train, y_train, batch_size=512, epochs=epochs, validation_split=0.1, verbose=1)
    # model.save('LSTM_Stock_prediction-20170429.h5')
    trainScore, testScore = model_score(model, X_train, y_train, X_test, y_test)
    return trainScore, testScore

    之後就可以進行decay的除錯了。

stock_name = '^GSPC'
seq_len = 22
shape = [4, seq_len, 1] # feature, window, output
neurons = [256, 256, 32, 1]
epochs = 90
decaylist = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]

decay_result = {}

for decay in decaylist:    
    trainScore, testScore = quick_measure(stock_name, seq_len, d, shape, neurons, epochs, decay)
    decay_result[decay] = testScore

lists = sorted(decay_result.items())
x,y = zip(*lists)
plt.plot(x,y)
plt.title('Finding the best hyperparameter')
plt.xlabel('Decay')
plt.ylabel('Mean Square Error')
plt.show()

    後續其他引數再繼續調。。。