메뉴 건너뛰기

목록
2022.11.23 20:59

확인용

profile
조회 수 12 댓글 2 예스잼 0 노잼 0

No Attached Image

import torch
import torch.optim as optim
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
 
import matplotlib.pyplot as plt
 
torch.manual_seed(0)
 
device = torch.device("cuda:0" if torch.cuda.is_available()
                      else "cpu")
 
seq_length = 7
data_dim = 8
hidden_dim = 10
output_dim = 1
learning_rate = 0.01
epochs = 500
batch_size = 100
 
def build_dataset(data, seq_len):
    dataX = []
    dataY = []
    for i in range(len(data)-seq_len):
        x = data[i:i+seq_len, :]
        y = data[i+seq_len, [-1]]
        dataX.append(x)
        dataY.append(y)
    return np.array(dataX), np.array(dataY)
 
df = pd.read_csv('------------')
 
df = df[::-1]
df = df[['입맛에 맞게 고치세요']]
 
train_size = int(len(df)*0.7)
train_set = df[0:train_size]
test_set = df[train_size-seq_length:]
 
scaler_x = MinMaxScaler()
scaler_x.fit(train_set.iloc[:,:-1])
 
train_set.iloc[:,:-1] = scaler_x.transform(train_set.iloc[:,:-1])
test_set.iloc[:,:-1] =scaler_x.transform(test_set.iloc[:,:-1])
 
scaler_y = MinMaxScaler()
scaler_y.fit(train_set.iloc[:,[-1]])
 
trainX, trainY = build_dataset(np.array(train_set), seq_length)
testX, testY = build_dataset(np.array(test_set), seq_length)
 
trainX_tensor = torch.LongTensor(trainX).to(device)
trainY_tensor = torch.LongTensor(trainY).to(device)
 
testX_tensor = torch.LongTensor(testX).to(device)
testY_tensor = torch.LongTensor(testY).to(device)
 
dataset = TensorDataset(trainX_tensor, trainY_tensor)
 
dataloader = DataLoader(dataset,
                        batch_size=batch_size,
                        shuffle=False,
                        drop_last=True)
 
class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, seq_len, output_dim, layers):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.seq_len = seq_len
        self.output_dim = output_dim
        self.layers = layers
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=layers,
                            batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim, bias=True)
    def reset_hidden_state(self):
        self.hidden = (
            torch.zeros(self.layers, self.seq_len, self.hidden_dim),
            torch.zeros(self.layers, self.seq_len, self.hidden_dim)            
        )
   
    def forward(self, x):
        x, _status = self.lstm(x)
        x = self.fc(x[:,-1])
        return x;
LSTM = LSTM(data_dim, hidden_dim, seq_length, output_dim, 1).to(device)
 
def train_model(mode, train_df, epochs=None, lr=None, verbose=10,
                patience=10):
    criterion = nn.MSELoss().to(device)
   
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
 
    train_hist = np.zeros(epochs)
    for epoch in range(epochs):
        avg_cost = 0
        total_batch = len(train_df)
       
        for batch_idx, samples in enumerate(train_df):
            x_train, y_train = samples
            model.reset_hidden_state()
           
            outputs =model(x_train)
           
            loss = criterion(outputs, y_train)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
           
            avg_cost += loss/total_batch
        train_hist[epoch] = avg_cost
       
        if epoch%verbose==0:
            print('Epoch: ', '%04d' % (epoch),
                  'train loss : ', '{:.4f}'.format(avg_cost))
         
        if(epoch%patience==0) % (epoch):
            if train_hist[epoch-patience] < train_hist[epoch]:
                print('\n Early Stopping')
                break
        return model.eval(), train_hist
   
model, train_hist = train_model(LSTM, dataloader, epochs=epochs,
                                lr=learning_rate, verbose=20, patience=10)

with torch.no_grad():
    pred = []
    for pr in range(len(testX_tensor)):
        model.reset_hidden_state()
       
        predicted = model(torch.unsqueeze(testX_tensor[pr], 0))
        predicted = torch.flatten(predicted).item()
        pred.append(predicted)
       
    pred_inverse = scaler_y.inverse_transform(np.array(pred).reshape(-1,1))
    testY_inverse = scaler_y.inverse_transform(testY_tensor)
def MAE(true, pred):
    return np.mean(np.abs(true-pred))
print('MAE SCORE: ',MAE(pred_inverse, testY_inverse))
 
length = len(test_set)
target = np.array(test_set)[length-seq_length:]
 
target = torch.LongTensor(target)
target = target.reshape([1,seq_length, data_dim])
 
out = model(target)
pre = torch.flattern(out).item()
 
pre = round(pre, 8)
pre_inverse = scaler_y.inverse_transform(np.array(pre).reshape(-1,1))
print(pre_inverse.reshape([3])[0])