import torch
import torch.optim as optim
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
torch.manual_seed(0)
device = torch.device("cuda:0" if torch.cuda.is_available()
else "cpu")
seq_length = 7
data_dim = 8
hidden_dim = 10
output_dim = 1
learning_rate = 0.01
epochs = 500
batch_size = 100
def build_dataset(data, seq_len):
dataX = []
dataY = []
for i in range(len(data)-seq_len):
x = data[i:i+seq_len, :]
y = data[i+seq_len, [-1]]
dataX.append(x)
dataY.append(y)
return np.array(dataX), np.array(dataY)
df = pd.read_csv('------------')
df = df[::-1]
df = df[['입맛에 맞게 고치세요']]
train_size = int(len(df)*0.7)
train_set = df[0:train_size]
test_set = df[train_size-seq_length:]
scaler_x = MinMaxScaler()
scaler_x.fit(train_set.iloc[:,:-1])
train_set.iloc[:,:-1] = scaler_x.transform(train_set.iloc[:,:-1])
test_set.iloc[:,:-1] =scaler_x.transform(test_set.iloc[:,:-1])
scaler_y = MinMaxScaler()
scaler_y.fit(train_set.iloc[:,[-1]])
trainX, trainY = build_dataset(np.array(train_set), seq_length)
testX, testY = build_dataset(np.array(test_set), seq_length)
trainX_tensor = torch.LongTensor(trainX).to(device)
trainY_tensor = torch.LongTensor(trainY).to(device)
testX_tensor = torch.LongTensor(testX).to(device)
testY_tensor = torch.LongTensor(testY).to(device)
dataset = TensorDataset(trainX_tensor, trainY_tensor)
dataloader = DataLoader(dataset,
batch_size=batch_size,
shuffle=False,
drop_last=True)
class LSTM(nn.Module):
def __init__(self, input_dim, hidden_dim, seq_len, output_dim, layers):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.seq_len = seq_len
self.output_dim = output_dim
self.layers = layers
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=layers,
batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim, bias=True)
def reset_hidden_state(self):
self.hidden = (
torch.zeros(self.layers, self.seq_len, self.hidden_dim),
torch.zeros(self.layers, self.seq_len, self.hidden_dim)
)
def forward(self, x):
x, _status = self.lstm(x)
x = self.fc(x[:,-1])
return x;
LSTM = LSTM(data_dim, hidden_dim, seq_length, output_dim, 1).to(device)
def train_model(mode, train_df, epochs=None, lr=None, verbose=10,
patience=10):
criterion = nn.MSELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_hist = np.zeros(epochs)
for epoch in range(epochs):
avg_cost = 0
total_batch = len(train_df)
for batch_idx, samples in enumerate(train_df):
x_train, y_train = samples
model.reset_hidden_state()
outputs =model(x_train)
loss = criterion(outputs, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
avg_cost += loss/total_batch
train_hist[epoch] = avg_cost
if epoch%verbose==0:
print('Epoch: ', '%04d' % (epoch),
'train loss : ', '{:.4f}'.format(avg_cost))
if(epoch%patience==0) % (epoch):
if train_hist[epoch-patience] < train_hist[epoch]:
print('\n Early Stopping')
break
return model.eval(), train_hist
model, train_hist = train_model(LSTM, dataloader, epochs=epochs,
lr=learning_rate, verbose=20, patience=10)
with torch.no_grad():
pred = []
for pr in range(len(testX_tensor)):
model.reset_hidden_state()
predicted = model(torch.unsqueeze(testX_tensor[pr], 0))
predicted = torch.flatten(predicted).item()
pred.append(predicted)
pred_inverse = scaler_y.inverse_transform(np.array(pred).reshape(-1,1))
testY_inverse = scaler_y.inverse_transform(testY_tensor)
def MAE(true, pred):
return np.mean(np.abs(true-pred))
print('MAE SCORE: ',MAE(pred_inverse, testY_inverse))
length = len(test_set)
target = np.array(test_set)[length-seq_length:]
target = torch.LongTensor(target)
target = target.reshape([1,seq_length, data_dim])
out = model(target)
pre = torch.flattern(out).item()
pre = round(pre, 8)
pre_inverse = scaler_y.inverse_transform(np.array(pre).reshape(-1,1))
print(pre_inverse.reshape([3])[0])