An example not from the “rocking chair”


Formulation of the problem

Reinforcement learning is a young and rapidly growing discipline. This circumstance led to the fact that there is almost no information about this in Russian. Especially when it comes to the object-oriented approach, and practical tasks are not from the arsenal of the “rocking chair”.

I present to you the result of a simple task, which I hope will save you from some of the bumps you meet along this interesting path.

Suppose a task in which a nano-robot with an antibiotic must get close to an accumulation of pathogenic bacteria to destroy them.

Download Keras Reinforsment Learning and animation library.

!pip install keras-rl2
!pip install celluloid
# Базовые Модули
import time    # модуль для операций со временными характеристиками
import random
import numpy as np

# Модули Keras
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Activation, Flatten, Input, Concatenate
from tensorflow.keras.optimizers import Adam

# Модули Keras-RL2
import rl.core as krl
from rl.agents import DDPGAgent
from rl.memory import SequentialMemory
from rl.random import OrnsteinUhlenbeckProcess

# Модули визуализации
from celluloid import Camera
import matplotlib.pyplot as plt     
from matplotlib import rc
rc('animation', html="jshtml")
%matplotlib inline

Wednesday

Reinforcement learning requires an environment and an agent.

The environment in our case will be pathogens migrating in tissues. Their movement is consistent with swarming behavior.

Swarming behavior is described by Vicchek’s model (1995), which can be used to imitate the accumulation of bacteria, the behavior of a flock of birds or a school of fish, and to see how self-ordered movements emerge from simple rules.

Let’s take the description of the model from the article Create your own simulation of active matter in Python… And we will rewrite it using an object-oriented approach. It assumes that you are already familiar with OOP for Python.

# Имитация роевого поведения
class Colony:
  # положения частицы 
  x : np.ndarray
  y : np.ndarray
  # угол направления частицы
  theta : np.ndarray
  # скорость частицы по осям
  vx : np.ndarray
  vy : np.ndarray

  # Конструктор
  def __init__(self,N):
    self.reset(N)

  # расстановка N частиц на площадке LxL
  def reset(self,N):
    # положения частиц 
    self.x = np.random.rand(N,1)*L
    self.y = np.random.rand(N,1)*L
    # направление и осевые скорости частиц относительно 
    # постоянной линейной скорости v0
    self.theta = 2 * np.pi * np.random.rand(N,1)
    self.vx = v0 * np.cos(self.theta)
    self.vy = v0 * np.sin(self.theta)
  # Шаг имитации
  def step(self):
    # движение
    self.x += self.vx*dt
    self.y += self.vy*dt
    # применение периодических пограничных условий
    self.x = self.x % L
    self.y = self.y % L
    # найти средний угол соседей в диапазоне R
    mean_theta = self.theta
    for b in range(N):
        neighbors = (self.x-self.x[b])**2+(self.y-self.y[b])**2 < R**2
        sx = np.sum(np.cos(self.theta[neighbors]))
        sy = np.sum(np.sin(self.theta[neighbors]))
        mean_theta[b] = np.arctan2(sy, sx)
    # добавление случайного отклонения
    self.theta = mean_theta + eta*(np.random.rand(N,1)-0.5)
    # изменение скорости
    self.vx = v0 * np.cos(self.theta)
    self.vy = v0 * np.sin(self.theta)
    return self.theta

  # Получить список частиц в внутри радиуса r от координат x,y
  def observe(self,x,y,r):
    return (self.x-x)**2+(self.y-y)**2 < r**2
  # Вывести координаты частицы i
  def print(self,i):
    return print(self.x[i],self.y[i])
  # Получить координаты частиц
  def get_bacteria(self):
    return self.x, self.y 
  # Получить массив направлений частиц
  def get_theta(self):
    return self.theta

The described class will serve us for describing the state of the environment. state… Do not confuse observation and state… Observed data observation, this is just what the agent observes. State state the whole environment is considered: a description of all our bacteria.

To work correctly in automatic mode, it is required through the class attributes action_space and observation_space describe acceptable values action agent and environment representation observation

They must be inherited from the rl.Space class. For action_space, you need to rewrite the methods:

  • sample () – returns a random valid action. In our case, it returns a number from the range [-1,1)

  • contains(x) – проверяет x на допустимость.

В атрибуте shape классов мы будем хранить форму значений

# action - скаляр от -1 до 1
class actionSpace(krl.Space):
  def __init__(self):
    self.shape = (1,)
  def sample(self, seed=None):
    if seed: random.seed(seed)
    return random.triangular(-1,1)
  def contains(self, x):
    return  abs(x) <= 1

# observation - массив 
# допустимые значения можно не описывать.
class observationSpace(krl.Space):
  def __init__(self):
    self.shape = (5,) #
  def sample(self, seed=None): pass
  def contains(self, x): pass

Для того чтобы сделать среду нам надо создать класс наследуя ее из базового класса среды rl.Env предоставляемой керас. Это абстрактный класс, в соответствии с задуманной средой необходимо описать его методы:

  • reset() – “сотворение мира”

  • step(action) – изменение мира на шаге в соответствии с action

  • render() – вывод любой информации по состоянию мира на данном шаге.

  • close() – завершение экземпляра класса

В классе среды мы должны описать состояние state, наблюдение observation, награду reward.

В observation подадим 5 переменных:

  1. Количество “захваченных” бактерий внутри радиуса R

  2. Средний угол направления бактерий внутри R

  3. Угол направления на центр бактерий внутри R

  4. Угол направления на центр бактерий внутри круга R-1.5R

  5. Текущий угол направления нано робота

    Награда – точка приложения вашего максимального внимания. Награда должна соответствовать задаче. Мы будем строго штрафовать за потерю бактерий, тем строже чем их меньше в области видимости R. Так же, решим поощрять за приобретение и сохранение точек.

    Действием,- будет угол движения нано робота. Все переменные нормализуем делением на Pi.

# наша "чашечка Петри"
class Cure(krl.Env):
  # имитируемая колония
  bacteria : Colony
  # положение нано робота
  x: float
  y: float
  theta: float  # направление нано робота
  R: float  # область видимости бактерий нано роботом
  n_bacteria : int  # сохраняем предыдущее значение количества видимых бактерий для rewarda
  # конструктор
  def __init__(self):
    self.bacteria = Colony(N)
    self.reward_range = (-1,1) #(-np.inf, np.inf)
    self.action_space = actionSpace()
    self.observation_space = observationSpace()
    self.R = observation_R
    self.reset()

  #  Формирование вектора обзора observation.
  #  То что происходит в области видимости R от робота. 
  def observe_area(self):
    # получим список соседей в радиусе R
    observe_bacteria = self.bacteria.observe(self.x,self.y,self.R)
    # получим список соседей в радиусе R*1.5
    observe_far_bacteria = self.bacteria.observe(self.x,self.y,self.R*1.5)
    observe_far_bacteria=np.array(np.bitwise_and(observe_far_bacteria,np.invert (observe_bacteria)))

    observation = np.zeros(5)
    # подадим количество соседей    
    n_bacteria = np.sum(observe_bacteria)
    observation[0] = n_bacteria / 20 # count and average the directions of neighboring bacteria sx = np.sum (np.cos (self.bacteria.theta[observe_bacteria])) sy = np.sum (np.sin (self.bacteria.theta[observe_bacteria])) observation[1] = np.arctan2 (sy, sx) /np.pi # calculate and give the average direction from the robot to the remote bacteria sx = np.sum (self.bacteria.x[observe_bacteria]-self.x) sy = np.sum (self.bacteria.y[observe_bacteria]-self.y) observation[2] = np.arctan2 (sy, sx) /np.pi # calculate and give the average direction from the robot to the remote bacteria sx = np.sum (self.bacteria.x[observe_far_bacteria]-self.x) sy = np.sum (self.bacteria.y[observe_far_bacteria]-self.y) observation[3] = np.arctan2 (sy, sx) /np.pi if n_bacteria: observation[4]= self.theta / np.pi # set the direction of the nanorobot return np.sum (observe_bacteria), observation # start simulation def reset (self): self.bacteria.reset (N) self.x = .5 * L self.y = .5 * L self.theta = actionSpace (). Sample () self.n_bacteria, observation = self.observe_area () return observation # simulation step def step (self, action): action = action * 3.2 # np.pi # For saving time when hitting "clean water" # we calculate the simulation without releasing it to process the network while True: # step of bacteria simulation self.bacteria.step () # robot step self.theta = np.sum (action) #% (2 * np.pi) self.x = self.x + dt * v0 * np.cos (self.theta) self.y = self.y + dt * v0 * np.sin (self.theta) self.x = self. x% L self.y = self.y% L # inspect the environment nBacteria, observation = self.observe_area () if np.sum (observation)! = 0: break if self.n_bacteria> 0: break delta = nBacteria - self. n_bacteria if delta <0: reward = 50 * delta / self.n_bacteria elif delta> 0 and self.n_bacteria: reward = 1 + delta elif nBacteria> 0: rewa  rd = 1 elif nBacteria == 0: reward = 0 else: reward = nBacteria done = nBacteria> N / 7 self.n_bacteria = nBacteria return observation, reward, done, {} # get robot coordinates def get_position (self): return self .x, self.y, self.R # get coordinates of all bacteria def get_bacteria (self): return self.bacteria.get_bacteria () # display debug information def render (self, mode = "human", close = False): # print (self.n_bacteria) pass # end simulation def close (self): pass
    

At this point, let’s define the environment parameters and play random episodes.

Playing episodes
Playing episodes

Viewing episodes will give you an understanding of the variety of situations our robot has to face. Depending on this, we regulate: the number of bacteria; the size of the site; speed and number of epochs.

You can do this in a laptop Google collab

Agent and training

The environment has been determined. It remains to create an agent.

Our agent is a nano robot that will move at the same speed as bacteria, and we will control its angular direction action… The robot “sees” the neighboring bacteria and must follow them, reaching the lesion.

To solve the problem, we use the method Deep Deterministic Policy Gradient (DDPG), it can be viewed as DQN for continuous action spaces. We train 2 networks alternately Actor(performs action action) and Criticism(estimates remuneration reward).

Used for training keras-rl Class DDPGAgent… He takes care of all the technical implementation, and we just have to write a few lines of code and get the result. OOP is great power!


# Создадим среду и извлечем пространство действий
env = Cure()
np.random.seed(123)
assert len(env.action_space.shape) == 1
nb_actions = env.action_space.shape[0]

# Построим модель актера. Подаем среду, получаем действие
actor = Sequential()
actor.add(Flatten(input_shape=(1,) + env.observation_space.shape))
actor.add(Dense(4, use_bias=True))
actor.add(Activation('relu'))
actor.add(Dense(4, use_bias=True))
actor.add(Activation('relu'))
actor.add(Dense(nb_actions, use_bias=True))
actor.add(Activation('tanh'))
print(actor.summary())

# Построим модель критика. Подаем среду и действие, получаем награду
action_input = Input(shape=(nb_actions,), name="action_input")
observation_input = Input(shape=(1,) + env.observation_space.shape, name="observation_input")
flattened_observation = Flatten()(observation_input)
x = Concatenate()([action_input, flattened_observation])
x = Dense(8, use_bias=False)(x)
x = Activation('relu')(x)
x = Dense(5, use_bias=True)(x)
x = Activation('relu')(x)
x = Dense(1)(x)
x = Activation('linear')(x)
critic = Model(inputs=[action_input, observation_input], outputs=x)
print(critic.summary())

# Keras-RL предоставляет нам класс, rl.memory.SequentialMemory
# где хранится "опыт" агента:
memory = SequentialMemory(limit=100000, window_length=1)
# чтобы не застрять с локальном минимуме, действия модели полезно "встряхивать" случайным поведением 
# с помощью Процесса Орнштейна – Уленбека
random_process = OrnsteinUhlenbeckProcess(size=nb_actions, theta=.15, mu=0., sigma=.3)
# Создаем agent из класса DDPGAgent
agent = DDPGAgent(nb_actions=nb_actions, actor=actor, critic=critic, critic_action_input=action_input,
                  memory=memory, nb_steps_warmup_critic=100, nb_steps_warmup_actor=100,
                  random_process=random_process, gamma=.99, target_model_update=1e-3)

agent.compile(Adam(learning_rate=.001, clipnorm=1.), metrics=['mae'])

# Обучим процесс на nb_steps шагах, 
# nb_max_episode_steps ограничивает количество шагов в одном эпизоде
agent.fit(env, nb_steps=100000, visualize=True, verbose=1, nb_max_episode_steps=Epochs)

# Тестируем обученую сеть на 5 эпизодах
agent.test(env, nb_episodes=5, visualize=True, nb_max_episode_steps=Epochs)
env.close()

Result

Let’s take a look at the actions of a trained nano robot. Let’s change the environment parameters for clarity

v0 = 4        # линейная скорость
N = 1000      # количество бактерий
Epochs =  500 # количество шагов
L    = 300    # размер области
R    = 5      # радиус взаимодействия
observation_R = 2*R # Радиус видимости соседей

fig = plt.figure()
camera = Camera(fig)
random.seed(123)
theCure = Cure()
observation = theCure.reset()

# информационная плашка
props = dict(boxstyle="round", facecolor="wheat", alpha=0.5)
sum_reward = 0
for i in range(200):
    action = np.sum(actor.predict(observation.reshape((1,1,5))))# % (2*np.pi)
    observation, reward, done, _ = theCure.step(action)
    sum_reward += reward
    if done:
      print('Победа  на шаге',i, ' захвачено ',observation[0]*20,'бактерий. Награда ',sum_reward)
      break
    # покажем бактерий
    bacteria_x,bacteria_y = theCure.get_bacteria()
    plt.scatter(bacteria_x, bacteria_y, c="red")    #  метод, отображающий данные в виде точек
    # покажем робота
    x, y, r = theCure.get_position()
    plt.scatter(x, y, c="blue")
    fig = plt.gcf()
    ax = fig.gca()
    circle = plt.Circle((x, y), r, color="b", fill=False)
    ax.add_patch(circle)

    textstr="n".join((
    r'epoch=%d' % (i, ),
    r'points=%d' % (reward, ),
    ))

    ax.text(0.05, 0.95, textstr, transform=ax.transAxes, fontsize=14,
      verticalalignment="top", bbox=props)

    camera.snap()

print('Итоговое вознаграждение',sum_reward)
theCure.close()
animation = camera.animate()
#animation.save('celluloid_minimal.gif', writer="imagemagick")
animation
Learning Outcome
Learning Outcome

conclusions

There is a lack of information on RL even in English, they will explain the basics to you, show you a couple of standard problems from the Open Gym arsenal, that’s all. Keras-RL Documentation does not stand up to criticism.

Reinforcement learning has its own nuances, for example, a long learning of 0.5-1 million steps seems to lead to overfitting. The network begins to produce extreme values ​​of -1.1 in no way reacting to the environment.

When planning an actor, if there is a continuous allowable range of controls, it is better to clamp the last neuron with sigmoid (0,1) or tanh (-1, + 1) activations instead of linear. Then, in the step () of the environment, expand to the required range.

Separately, it should be noted that the set of supplied data must be adequate to the task. It will not work to teach an agent to drive without showing the way. In our case, we had to show the bacteria the situation slightly outside the radius R. Without this, our robot simply dragged itself after the last bacteria in the swarm, fearing to be punished and not understanding how to get a reward.

The path to the stars lies through the thorns. I would be glad if I helped someone to understand this interesting topic.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *