2303.transformer180D
2024-06-19
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import math
from sklearn.metrics import mean_squared_error
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Flatten, LayerNormalization, MultiHeadAttention, Dropout
# Load data
df = pd.read_csv('/content/2303_stock_data.csv')
# Check the column names
print(df.columns)
# Adjust column names if necessary
if 'date' not in df.columns:
df.columns = [col.strip().lower() for col in df.columns]
print(df.columns)
# Ensure 'date' column is in datetime format
df['date'] = pd.to_datetime(df['date'])
df2 = df.reset_index()[['date', 'close']]
# Create more features
df2['Year'] = df2['date'].dt.year
df2['Month'] = df2['date'].dt.month
df2['Weekday'] = df2['date'].dt.weekday
df2['MA5'] = df2['close'].rolling(window=5).mean()
df2['MA10'] = df2['close'].rolling(window=10).mean()
df2['MA20'] = df2['close'].rolling(window=20).mean()
df2 = df2.dropna()
# Data normalization
scaler = MinMaxScaler(feature_range=(0, 1))
df2_scaled = scaler.fit_transform(df2[['close', 'Year', 'Month', 'Weekday', 'MA5', 'MA10', 'MA20']])
# Create train and test sets
train_size = int(len(df2_scaled) * 0.65)
test_size = len(df2_scaled) - train_size
train_data, test_data = df2_scaled[0:train_size, :], df2_scaled[train_size:len(df2_scaled), :]
# Create dataset
def create_dataset(dataset, time_step=1):
dataX, dataY = [], []
for i in range(len(dataset) - time_step - 1):
a = dataset[i:(i + time_step), :]
dataX.append(a)
dataY.append(dataset[i + time_step, 0]) # Predict 'close' column
return np.array(dataX), np.array(dataY)
time_step = 100
X_train, Y_train = create_dataset(train_data, time_step)
X_test, Y_test = create_dataset(test_data, time_step)
# Reshape data for Transformer
X_train = X_train.reshape(X_train.shape[0], time_step, -1)
X_test = X_test.reshape(X_test.shape[0], time_step, -1)
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
# Positional Encoding
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, pos, i, d_model):
angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
return pos * angle_rates
def positional_encoding(self, position, d_model):
angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
# Transformer block
class TransformerBlock(tf.keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
super(TransformerBlock, self).__init__()
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = tf.keras.Sequential(
[Dense(ff_dim, activation="relu"), Dense(embed_dim)]
)
self.layernorm1 = LayerNormalization(epsilon=1e-6)
self.layernorm2 = LayerNormalization(epsilon=1e-6)
self.dropout1 = Dropout(rate)
self.dropout2 = Dropout(rate)
def call(self, inputs, training):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
# Build Transformer model
def build_transformer_model(input_shape, num_heads, ff_dim):
inputs = Input(shape=input_shape)
x = PositionalEncoding(input_shape[0], input_shape[1])(inputs)
transformer_block = TransformerBlock(input_shape[1], num_heads, ff_dim)
x = transformer_block(x)
x = Flatten()(x)
outputs = Dense(1)(x)
model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss="mean_squared_error")
return model
input_shape = (time_step, X_train.shape[2])
num_heads = 4
ff_dim = 128
model = build_transformer_model(input_shape, num_heads, ff_dim)
model.summary()
# Train model
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
model.fit(X_train, Y_train, validation_data=(X_test, Y_test), epochs=120, batch_size=64, verbose=1, callbacks=[early_stop])
# Predict
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
# Inverse transform predictions
train_predict = scaler.inverse_transform(np.concatenate((train_predict, np.zeros((train_predict.shape[0], df2_scaled.shape[1]-1))), axis=1))[:, 0]
test_predict = scaler.inverse_transform(np.concatenate((test_predict, np.zeros((test_predict.shape[0], df2_scaled.shape[1]-1))), axis=1))[:, 0]
# Calculate RMSE
rmse_train = math.sqrt(mean_squared_error(Y_train, train_predict))
print("訓練數據的RMSE:", rmse_train)
rmse_test = math.sqrt(mean_squared_error(Y_test, test_predict))
print("測試數據的RMSE:", rmse_test)
# Plot predictions
look_back = time_step
trainPredictPlot = np.empty_like(df2_scaled[:, 0])
trainPredictPlot[:] = np.nan
trainPredictPlot[look_back:len(train_predict) + look_back] = train_predict
testPredictPlot = np.empty_like(df2_scaled[:, 0])
testPredictPlot[:] = np.nan
testPredictPlot[len(train_predict) + (look_back * 2) + 1:len(df2_scaled) - 1] = test_predict
plt.plot(scaler.inverse_transform(df2_scaled)[:, 0], color='blue', label='ACT')
plt.plot(trainPredictPlot, color='orange', label='Training')
plt.plot(testPredictPlot, color='green', label='Testing')
plt.title('Transformer1 Predict')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
![](https://thepearl.ghost.io/content/images/2024/06/transformer_2303.png)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import math
from sklearn.metrics import mean_squared_error
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Flatten, LayerNormalization, MultiHeadAttention, Dropout
# Load data
df = pd.read_csv('/content/2303_stock_data.csv')
# Check the column names
print(df.columns)
# Adjust column names if necessary
if 'date' not in df.columns:
df.columns = [col.strip().lower() for col in df.columns]
print(df.columns)
# Ensure 'date' column is in datetime format
df['date'] = pd.to_datetime(df['date'])
df2 = df.reset_index()[['date', 'close']]
# Create more features
df2['Year'] = df2['date'].dt.year
df2['Month'] = df2['date'].dt.month
df2['Weekday'] = df2['date'].dt.weekday
df2['MA5'] = df2['close'].rolling(window=5).mean()
df2['MA10'] = df2['close'].rolling(window=10).mean()
df2['MA20'] = df2['close'].rolling(window=20).mean()
df2 = df2.dropna()
# Data normalization
scaler = MinMaxScaler(feature_range=(0, 1))
df2_scaled = scaler.fit_transform(df2[['close', 'Year', 'Month', 'Weekday', 'MA5', 'MA10', 'MA20']])
# Create train and test sets
train_size = int(len(df2_scaled) * 0.65)
test_size = len(df2_scaled) - train_size
train_data, test_data = df2_scaled[0:train_size, :], df2_scaled[train_size:len(df2_scaled), :]
# Create dataset
def create_dataset(dataset, time_step=1):
dataX, dataY = [], []
for i in range(len(dataset) - time_step - 1):
a = dataset[i:(i + time_step), :]
dataX.append(a)
dataY.append(dataset[i + time_step, 0]) # Predict 'close' column
return np.array(dataX), np.array(dataY)
time_step = 100
X_train, Y_train = create_dataset(train_data, time_step)
X_test, Y_test = create_dataset(test_data, time_step)
# Reshape data for Transformer
X_train = X_train.reshape(X_train.shape[0], time_step, -1)
X_test = X_test.reshape(X_test.shape[0], time_step, -1)
print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
# Positional Encoding
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, pos, i, d_model):
angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
return pos * angle_rates
def positional_encoding(self, position, d_model):
angle_rads = self.get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
# Transformer block
class TransformerBlock(tf.keras.layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
super(TransformerBlock, self).__init__()
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = tf.keras.Sequential(
[Dense(ff_dim, activation="relu"), Dense(embed_dim)]
)
self.layernorm1 = LayerNormalization(epsilon=1e-6)
self.layernorm2 = LayerNormalization(epsilon=1e-6)
self.dropout1 = Dropout(rate)
self.dropout2 = Dropout(rate)
def call(self, inputs, training):
attn_output = self.att(inputs, inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
# Build Transformer model
def build_transformer_model(input_shape, num_heads, ff_dim):
inputs = Input(shape=input_shape)
x = PositionalEncoding(input_shape[0], input_shape[1])(inputs)
transformer_block = TransformerBlock(input_shape[1], num_heads, ff_dim)
x = transformer_block(x)
x = Flatten()(x)
outputs = Dense(1)(x)
model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss="mean_squared_error")
return model
input_shape = (time_step, X_train.shape[2])
num_heads = 4
ff_dim = 128
model = build_transformer_model(input_shape, num_heads, ff_dim)
model.summary()
# Train model
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
model.fit(X_train, Y_train, validation_data=(X_test, Y_test), epochs=120, batch_size=64, verbose=1, callbacks=[early_stop])
# Predict
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
# Inverse transform predictions
train_predict = scaler.inverse_transform(np.concatenate((train_predict, np.zeros((train_predict.shape[0], df2_scaled.shape[1]-1))), axis=1))[:, 0]
test_predict = scaler.inverse_transform(np.concatenate((test_predict, np.zeros((test_predict.shape[0], df2_scaled.shape[1]-1))), axis=1))[:, 0]
# Calculate RMSE
rmse_train = math.sqrt(mean_squared_error(Y_train, train_predict))
print("訓練數據的RMSE:", rmse_train)
rmse_test = math.sqrt(mean_squared_error(Y_test, test_predict))
print("測試數據的RMSE:", rmse_test)
# Plot predictions
look_back = time_step
trainPredictPlot = np.empty_like(df2_scaled[:, 0])
trainPredictPlot[:] = np.nan
trainPredictPlot[look_back:len(train_predict) + look_back] = train_predict
testPredictPlot = np.empty_like(df2_scaled[:, 0])
testPredictPlot[:] = np.nan
testPredictPlot[len(train_predict) + (look_back * 2) + 1:len(df2_scaled) - 1] = test_predict
plt.plot(scaler.inverse_transform(df2_scaled)[:, 0], color='blue', label='ACT')
plt.plot(trainPredictPlot, color='orange', label='Training')
plt.plot(testPredictPlot, color='green', label='Testing')
# Future prediction
future_steps = 50
last_data = test_data[-time_step:]
future_predictions = []
for _ in range(future_steps):
last_data = np.expand_dims(last_data, axis=0)
prediction = model.predict(last_data)
future_predictions.append(prediction[0, 0])
last_data = np.append(last_data[:, 1:, :], prediction.reshape(1, 1, -1), axis=1)
future_predictions = scaler.inverse_transform(np.concatenate((np.array(future_predictions).reshape(-1, 1), np.zeros((future_steps, df2_scaled.shape[1]-1))), axis=1))[:, 0]
futurePredictPlot = np.empty_like(df2_scaled[:, 0])
futurePredictPlot[:] = np.nan
futurePredictPlot[len(df2_scaled) - 1:len(df2_scaled) + future_steps - 1] = future_predictions
plt.plot(futurePredictPlot, color='red', label='Future Prediction')
plt.title('Transformer Predict')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
![](https://thepearl.ghost.io/content/images/2024/06/tran2.png)