AI/Optimization

[강화학습] Reinforcement learning for Process Control (Python, torch)

슈퍼짱짱 2023. 3. 15. 15:44
반응형

강화학습으로 공정 운전 조건 자동 제어 실습

 

강화학습은 실제 상황을 environment로 정의하고, 정의한 environment에 따라 직접 episode를 만들어 학습하기 때문에 게임과 같은 상황에 많이 사용된다.

 

게임은 현재 상황이 성공이지 실패인지, 다음 action은 어떤걸 취할 수 있는지, ~상황에서 ~action을 취하면 다음은 어떤 상황이 될 지 등 모든 environment 정의를 개발자가 직접 하면된다. 즉, simulation 할 수 있는 모든 가상의 상황을 정의할 수 있다.

 

그러나, 화학공정과 같은 상황에서는 모든 random한 상황에 대해 결과가 어떨지 직접 실험해보기는 불가능에 가깝다.

 

따라서 본 포스팅은 이러한 공정 운전 조건 최적화에 강화학습을 어떻게 적용하는지 알아보고자 한다.

단, 복잡한 실제 공정 상황이 아닌, 한 개의 y값을 특정 값으로 만들기 위한 최적의 x값을 구하는 아주 간략한 예시로 진행한다.

 

강화학습에 대한 이론보다는 실습위주로 진행한다.

코드는 Pytorch REINFORCEMENT LEARNING (DQN) TUTORIAL을 참고했으며, gym 게임 environment에 구현되어있는걸 본 과정에 맞게 수정했다.

 


1. 먼저 필요한 라이브러리를 불러온다.

 

# import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# GPU를 사용할 경우
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

 

 

2. 예제 데이터를 생성한다.

 

어떤 품질(Property, y)을 Setting한 Point에 맞추기 위해 온도(Temperature, x)를 어떻게 운전해야 하나? 를 최적화 하는 것을 목적으로 하는 상황이며, x와 y는 아주 강한 선형성을 띈다고 가정한다.

 

아래 생성한 데이터는 "그동안 temperature(x)를 이런식으로 운전해왔을 때 제품의 품질(y)이 이랬다" 에 대한 데이터라고 이해하면된다.

 

# 예제 데이터 생성
# property를 target point에 맞추기 위해 온도(temp)를 어떻게 조절해야 하는가?

import pandas as pd
temp = [random.randint(-10,100) for _ in range(100)]
property = [i*3.5 + 10  + random.random()*2 for i in temp]

data = pd.DataFrame({'x':temp,
                    'y':property})

 

 

 

3. Environment 정의

 

episode를 스스로 생성하기위한 state, reward, action 등 environment를 정의한다.

 

위에서 언급했듯, 게임과 같은 경우는 현재 상태가 ~할 때 성공이라던지, 실패라던지 하는 상황을 개발자가 원하는대로 정의하면되지만, 

 

공정의 경우는 모든 episode 상황에 대한 결과가 어떨지 실제로 실험해보기에는 현실적으로 불가능에 가깝다.

따라서 관심 있는 공정에서 x값이 ~할 때 y값이 어떨지를 추정할 수 있는 x와 y의 Model이 필요하다.

 

애초에 선형성이 강한 가상의 환경을 정의했으므로 Linear Regression 모델을 생성한다.

(당연히 x와 y의 Linear Regression 모델을 알고있다면 y값을 어떤 setting point로 만들기위해 x를 어떤 값으로 해야할지는 굳이 강화학습과 같은 최적화 알고리즘 없이도 너무나도 쉽게 계산이 가능하다. 하지만, 지금은 최대한 단순한 가상의 환경을 가정하여 진행한다.)

 

# simulation을 위한 model 생성
from sklearn.linear_model import LinearRegression

model = LinearRegression().fit(np.array(temp).reshape(-1, 1), np.array(property).reshape(-1, 1))

 

model의 coef를 확인해보면 3.5에 거의 근접했다.

애초에 x*3.5로 y를 만들었으므로 학습이 아주 잘 되었음을 확인할 수 있다.

 

 

 

이제 위에서 생성한 모델을 기반으로 Environment Class를 정의한다.

Environment Class에는 생성자 외에 새 개 function이 필요하다.

 

class Environment :
    def __init__(self, z, setPoint) :
        self.z = z # 현재 x값 
        self.setPoint = setPoint # 물성 target point
        self.terminated = False # 달성 여부 
        self.state = None
        
    def reset(self) :
        yPred = model.predict(np.array([self.z]).reshape(-1,1)).item()
        self.state = torch.tensor([self.setPoint - yPred])
        return self.state
    
    def step(self, action) :
        # 0 or 1이 action으로 들어옴 
        if action == 0 :
            self.z -= 1
        elif action == 1 :
            self.z +=1
        
        if self.z < -10 or self.z > 100 : # x의 범위를 벗어나면 끝
            reward = -100.
            self.terminated = True
            return None, torch.tensor([reward]), self.terminated
        
        self.state = torch.tensor([self.setPoint - model.predict(np.array([self.z]).reshape(-1,1)).item()])
        if abs(self.state) <= 5 : # 최적화 됐으면 성공
            self.terminated = True
        reward = 1/abs(self.state)*5
        return self.state, reward, self.terminated
    
    def render(self) :
        return self.z

 

 

먼저 생성자를 보면 z setPoint를 입력으로 받는데, z는 현재 x값을 의미하고 setPoint는 y값을 어떤 값으로 Setting하고 싶은지에 대한 값을 의미한다. 

 

reset() 은 매 episode 마다 맨 처음 state를 reset하는 역할을한다. 가능한 범위 내 random한 가상의 state 값을 return한다. 

state는 (setPoint - y)값으로 한다.

즉, 이 state를 최소화하는것이 목적이다.

 

step()은 action을 input으로 받아 현 state에서 action을 취한 다음 state, reward, 종료인지 아닌지에 대한 정보를 return한다.

 

본 과정에서 action은 0과 1로 정의한다.

action이 0이면 x를 1만큼 낮추고, 1이면 x를 1만큼 키운다.

 

reward는 1/abs(setPoint - y)로 정의한다.

setting한 값과 현재 y값의 차이가 작을수록 큰 reward를 받게된다.

 

y는 위에서 생성한 모델에 x를 넣어 prediction하여 계산한다. 이 과정을 위해 위에서 x와 y의 model을 만들었다고 이해하면된다.

 

abs(setPoint - y)값이 5보다 작으면 최적화 되었다고 판단하여 episode를 종료한다.

또한, action을 취한 후 x가 정의한 min~max range를 벗어나면 -100의 reward를 return하며 episode는 종료된다.

 

render()는 episode가 어떻게 진행되고 있는지 과정을 찍어보기 위한 function으로, 꼭 필요한 function은 아니며 원하는 부분을 자유롭게 구현하면 된다. 글쓴이는 x값을 return하도록 구현했다. 

 

이렇듯 state, action, reward, episode terminate 조건 등 모든 환경이 실제 상황과 최대한 유사하게, 효율적으로 모델이 최적화 할 수 있도록 정의되어야하며 이 과정에는 Biz.domain이 꼭 필요하다.

 

 

4. Reinforcement Model 정의

 

정의한 가상환경이 굉장히 단순하므로 Reinforecement Model도 아주 단순하게 구현했다.

현 state를 입력으로 받아 최적의 action을 return하는 역할을 한다.

 

class DQN(nn.Module):    
    # Constructor
    def __init__(self, outputs):
        # outputs : action 개수 
        super(DQN, self).__init__()
        self.linear1 = nn.Linear(1,16,bias=True)
        self.linear2 = nn.Linear(16,outputs,bias=True)

    def forward(self,x):
        x = x.to(device)
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return torch.unsqueeze(F.log_softmax(x, dim=0),0)

 

input layer, output layer 사이에 hidden layer(size 16) 하나만 더 추가했다. 

activation function은 relu를 사용했으며, 마지막에는 softmax 취해주었다.

 

output layer의 size는 2개로

0번째 값이 더 크면 x를 1 낮추라는 action이며, 1번째 값이 더 크면 x를 1 키우라는 action이다.

 

 

5. Select Action 정의

 

바로 위에서 정의한 reinforcement model에 기반하여 현 state를 입력으로 받아 다음 action을 return하는 function을 정의한다.

 

BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

 

n_actions = 2 # action 2개 
policy_net = DQN(n_actions).to(device)
target_net = DQN(n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

 

steps_done = 0
def select_action(state): # state : setPoint - y, action : x(o or 1)
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1]
    else:
        return torch.tensor([random.randrange(n_actions)], device=device, dtype=torch.long)

 

policy network에 의해서만 action을 취하다보면 local minimum에 빠질 수 있으므로 가끔은 random한 선택을 하기도 한다.

 

eps_threshold가 random한 어떤 값보다 작으면 policy network가 return한 action을 return하고, 

random한 어떤 값보다 크면 random한 action을 return한다.

 

 

6. Training

 

이제 위에서 만든 모든 것들을 조합하여 학습을 진행한다.

 

먼저 학습할 episode들을 저장할 memory를 정의한다.

 

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """transition 저장"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)

 

optimize function도 정의한다.

(이부분은 강화학습 Q-Learning 을 이해하고있어야 이해할 수 있다.)

 

def optimize_model():
    
    if len(memory) < BATCH_SIZE:
        return
    
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    
    state_batch = torch.stack(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Q(s_t, a) 계산
    state_action_values = policy_net(state_batch).squeeze().gather(1, action_batch.unsqueeze(1))

    # 모든 다음 상태를 위한 V(s_{t+1}) 계산
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states.reshape(non_final_next_states.size()[0],1)).squeeze().max(1)[0].detach()
    
    # 기대 Q 값 계산
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Huber 손실 계산
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # 모델 최적화
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

 

이제 실제 episode를 진행하면서 강화학습 모델을 학습한다.

 

def Action(x) :
    if x.item() == 0 :
        return "Down"
    else :
        return "Up"
        
num_episodes = 100 
for i_episode in range(num_episodes):
    epMemory = list()
    
    z = random.randrange(-10,100)
    setPoint = random.randrange(math.floor(min(data['y'])), math.ceil(max(data['y'])))
    env = Environment(z = z, setPoint = setPoint)
    state = env.reset()
    print( "setPoint", setPoint, "을 맞추기 위해")
    for t in count():
        # 행동 선택과 수행
        action = select_action(torch.tensor([state]).float())
        next_state, reward, done = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        print(t,"- 현재 x", env.render(),"에서", Action(action),"하면 'setPoint - y'값은", next_state,"reward는", round(reward.item(),2))

        # 메모리에 변이 저장
        epMemory.append([state, action, next_state, reward])

        # 다음 상태로 이동
        state = next_state

        optimize_model()
        if done:
            if env.render() >= -10 and env.render() <= 100 :
                _ = [memory.push(epMemory[i][0],epMemory[i][1],epMemory[i][2],epMemory[i][3]) for i in range(len(epMemory))]
                print("성공!")
            else :
                print("실패!")
            print("")
            break
        
        if t >= 10000 :
            print("중단!")
            break

    # 목표 네트워크 업데이트, 모든 웨이트와 바이어스 복사
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')

 

episode를 수행하기 위해 현재 x(코드에서는 z)와 setPoint 모두 random하게 생성하고, 

현 state에서 reward를 최대화하기 위해 가장 좋은 다음 action을 취하는 방법을 학습한다.

 

성공한 episode만 memory에 저장하여 학습했다. (Pytorch TUTORIAL에서는 모든 episode를 다 저장한다.)

또한, 한 episode에서 step이 10,000번이 넘어가면 중단하고 다음 episode를 새롭게 시작하도록 구현했다.

 

참고로 마지막 episode에 대한 print 결과는 다음과 같다.

 

 

(print가 좀 잘못됐는데.. 현재 x가 아니라 action을 취한 다음 x이다. )

 

 

7. Simulation

 

이제 이 학습된 모델을 실제 환경에 적용한다.

 

# if 현재 x값이 3.0
# 그럼 실제 y값은 21.5482244; model.predict(np.array([z]).reshape(-1,1))
z = random.randrange(-10,100)
setPoint = random.randrange(math.floor(min(data['y'])), math.ceil(max(data['y'])))

env = Environment(z = z, setPoint = setPoint)

state = torch.tensor([setPoint - model.predict(np.array([z]).reshape(-1,1)).item()]) # 지금 y값 
# state에서 setPoint로 하기 위해서 action을 어떻게 해라

with torch.no_grad() :
    print( "setPoint", setPoint, "을 맞추기 위해")
    for t in count():
        # 행동 선택과 수행
        action = select_action(torch.tensor([state]).float())
        next_state, reward, done = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        print(t,"- 현재 x", env.render(),"에서", Action(action),"하면 'setPoint - y'값은", next_state,"reward는", round(reward.item(),2))


        # 다음 상태로 이동
        state = next_state

        if done:
            if env.render() >= -10 and env.render() <= 100 :
                print("성공!")
            else :
                print("실패!")
            print("")
            break

 

현재 x가 아니라 action을 취한 다음 x임

 

 

실제상황이 아니므로 역시 random한 값을 입력한다.

print 결과를 보면 성공한 것을 볼 수 있고,

맨 처음 만든 x와 y 모델로 강화학습 모델이 추천해준 x값대로 운전하면 setting한 y값을 만들 수 있는지도 검증해 볼 수 있다.

 

반응형