# -*- coding: utf-8 -*-
from accelbrainbase.samplabledata.policy_sampler import PolicySampler
import mxnet.ndarray as nd
import mxnet as mx
import numpy as np
import random
[docs]class MazeMultiAgentPolicy(PolicySampler):
'''
Policy sampler for the multi-agent Deep Q-learning to evaluate the value of the "action".
'''
SPACE = 1
WALL = -1
START = 0
GOAL = 3
START_POS = (1, 1)
__memory_num = 4
END_STATE = "running"
__inferencing_mode = False
[docs] def get_inferencing_mode(self):
''' getter '''
return self.__inferencing_mode
[docs] def set_inferencing_mode(self, value):
''' setter '''
self.__inferencing_mode = value
inferencing_mode = property(get_inferencing_mode, set_inferencing_mode)
def __init__(
self,
batch_size=25,
map_size=(50, 50),
moving_max_dist=3,
possible_n=10,
memory_num=3,
repeating_penalty=0.5,
enemy_num=2,
enemy_init_dist=5,
enemy_moving_max_dist=1,
ctx=mx.gpu(),
):
'''
Init.
Args:
map_size: Size of map.
memory_num: The number of step of agent's memory.
repeating_penalty: The value of penalty in the case that agent revisit.
enemy_num: The number of enemies.
enemy_init_dist: Minimum euclid distance of initial position of agent and enemies.
'''
self.__batch_size = batch_size
self.__map_arr = self.__create_map(map_size)
self.__agent_pos_arr = np.array(
[
self.START_POS
] * self.__batch_size
)
self.__possible_n = possible_n
self.__repeating_penalty = repeating_penalty
self.__ctx = ctx
self.__enemy_num = enemy_num
self.__enemy_init_dist = enemy_init_dist
self.__enemy_pos_list = [None] * enemy_num
self.__enemy_moving_max_dist = enemy_moving_max_dist
self.__create_enemy(self.__map_arr)
self.__state_arr = self.extract_now_state()
self.__route_memory_list = [[] for _ in range(self.__batch_size)]
self.__memory_num = memory_num
self.__moving_max_dist = moving_max_dist
[docs] def reset_agent_pos(self):
self.__agent_pos_arr = np.array(
[
self.START_POS
] * self.__batch_size
)
self.__state_arr = self.extract_now_state()
self.__create_enemy(self.__map_arr)
[docs] def create_enemy(self):
'''
Create enemies.
'''
self.__create_enemy(self.__map_arr)
def __create_map(self, map_size):
'''
Create map.
References:
- https://qiita.com/kusano_t/items/487eec15d42aace7d685
'''
import random
import numpy as np
from itertools import product
news = ['n', 'e', 'w', 's']
m, n = map_size
m = m // 2
n = n // 2
SPACE = self.SPACE
WALL = self.WALL
START = self.START
GOAL = self.GOAL
memo = np.array([i for i in range(n * m)])
memo = memo.reshape(m, n)
maze = [[SPACE for _ in range(2 * n + 1)] for _ in range(2 * m + 1)]
maze[self.START_POS[0]][self.START_POS[1]] = START
self.__goal_pos = (2 * m - 1, 2 * n - 1)
maze[2 * m - 1][2 * n - 1] = GOAL
for i, j in product(range(2 * m + 1), range(2 * n + 1)):
if i % 2 == 0 or j % 2 == 0:
maze[i][j] = WALL
while (memo != 0).any():
x1 = random.choice(range(m))
y1 = random.choice(range(n))
direction = random.choice(news)
if direction == 'e':
x2, y2 = x1, y1 + 1
elif direction == 'w':
x2, y2 = x1, y1 - 1
elif direction == 'n':
x2, y2 = x1 - 1, y1
elif direction == 's':
x2, y2 = x1 + 1, y1
if (x2 < 0) or (x2 >= m) or (y2 < 0) or (y2 >= n):
continue
if memo[x1, y1] != memo[x2, y2]:
tmp_min = min(memo[x1, y1], memo[x2, y2])
tmp_max = max(memo[x1, y1], memo[x2, y2])
memo[memo == tmp_max] = tmp_min
maze[x1 + x2 + 1][y1 + y2 + 1] = SPACE
maze_arr = np.array(maze)
return maze_arr
def __create_enemy(self, maze_arr):
'''
'''
x_arr, y_arr = np.where(maze_arr == self.SPACE)
key_arr = np.arange(x_arr.shape[0])
np.random.shuffle(key_arr)
dup_list = []
for i in range(self.__enemy_num):
for j in range(key_arr.shape[0]):
key = key_arr[j]
dist = np.sqrt(((x_arr[key] - self.START_POS[0]) ** 2) + ((y_arr[key] - self.START_POS[1])) ** 2)
if dist >= self.__enemy_init_dist and (x_arr[key], y_arr[key]) not in dup_list:
self.__enemy_pos_list[i] = (x_arr[key], y_arr[key])
dup_list.append((x_arr[key], y_arr[key]))
break
def __move_enemy(self, state_arr):
opt_list = []
for dist in range(1, self.__enemy_moving_max_dist):
for x in [-1, 0, 1]:
for y in [-1, 0, 1]:
if x == 0 and y == 0:
continue
opt_list.append((x * dist, y * dist))
for e in range(self.__enemy_num):
random.shuffle(opt_list)
for e_x, e_y in opt_list:
next_e_x = self.__enemy_pos_list[e][0] + e_x
if next_e_x < 0 or next_e_x >= self.__map_arr.shape[1]:
continue
next_e_y = self.__enemy_pos_list[e][1] + e_y
if next_e_y < 0 or next_e_y >= self.__map_arr.shape[0]:
continue
wall_flag = False
if e_x > 0:
for add_x in range(1, e_x):
if self.__map_arr[self.__enemy_pos_list[e][0] + add_x, next_e_y] == self.WALL:
wall_flag = True
elif e_x < 0:
for add_x in range(e_x, 0):
if self.__map_arr[self.__enemy_pos_list[e][0] + add_x, next_e_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if e_y > 0:
for add_y in range(1, e_y):
if self.__map_arr[next_e_x, self.__enemy_pos_list[e][1] + add_y] == self.WALL:
wall_flag = True
elif e_y < 0:
for add_y in range(e_y, 0):
if self.__map_arr[next_e_x, self.__enemy_pos_list[e][1] + add_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if self.__map_arr[next_e_x, next_e_y] == self.WALL:
continue
self.__enemy_pos_list[e] = (next_e_x, next_e_y)
break
[docs] def draw(self):
'''
Draw samples from distribtions.
Returns:
`Tuple` of `mx.nd.array`s.
'''
agent_num = self.__state_arr.shape[1]
if agent_num == 2 + self.__enemy_num:
possible_action_arr = np.zeros((
self.__state_arr.shape[0],
self.__possible_n,
self.__state_arr.shape[1],
self.__state_arr.shape[2],
self.__state_arr.shape[3],
))
else:
possible_action_arr = np.zeros((
self.__state_arr.shape[0],
self.__possible_n,
self.__state_arr.shape[1] + self.__enemy_num,
self.__state_arr.shape[2],
self.__state_arr.shape[3],
))
if self.inferencing_mode is True:
self.__move_enemy(self.__state_arr)
for batch in range(self.__batch_size):
state_arr = self.__state_arr[batch, 0].asnumpy()
agent_x, agent_y = np.where(state_arr == 1)
agent_x, agent_y = agent_x[0], agent_y[0]
if self.inferencing_mode is False:
agent_x_arr, agent_y_arr = np.where(self.__map_arr == self.SPACE)
key = np.random.randint(low=0, high=agent_x_arr.shape[0])
random_agent_x = agent_x_arr[key]
random_agent_y = agent_y_arr[key]
state_arr[agent_x, agent_y] = 0
state_arr[random_agent_x, random_agent_y] = 1
self.__state_arr[batch, 0] = nd.ndarray.array(
state_arr,
ctx=self.__ctx
)
self.__create_enemy(self.__map_arr)
x_y_list = []
for dist in range(1, self.__moving_max_dist):
for x in [-1, 0, 1]:
for y in [-1, 0, 1]:
if x == 0 and y == 0:
continue
x_y_list.append((x * dist, y * dist))
x_y_arr = np.array(x_y_list)
_possible_action_arr = None
for i in range(x_y_arr.shape[0]):
x = x_y_arr[i][0]
y = x_y_arr[i][1]
next_x = agent_x + x
if next_x < 0 or next_x >= state_arr.shape[1]:
continue
next_y = agent_y + y
if next_y < 0 or next_y >= state_arr.shape[0]:
continue
if self.inferencing_mode is True:
if (next_x, next_y) in self.__route_memory_list[batch]:
continue
wall_flag = False
if x > 0:
for add_x in range(1, x):
if self.__map_arr[agent_x + add_x, next_y] == self.WALL:
wall_flag = True
elif x < 0:
for add_x in range(x, 0):
if self.__map_arr[agent_x + add_x, next_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if y > 0:
for add_y in range(1, y):
if self.__map_arr[next_x, agent_y + add_y] == self.WALL:
wall_flag = True
elif y < 0:
for add_y in range(y, 0):
if self.__map_arr[next_x, agent_y + add_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if self.__map_arr[next_x, next_y] == self.WALL:
continue
goal_flag = False
if x > 0:
for add_x in range(1, x):
if self.__map_arr[agent_x + add_x, next_y] == self.GOAL:
goal_flag = True
elif x < 0:
for add_x in range(x, 0):
if self.__map_arr[agent_x + add_x, next_y] == self.GOAL:
goal_flag = True
if goal_flag is False:
if y > 0:
for add_y in range(1, y):
if self.__map_arr[next_x, agent_y + add_y] == self.GOAL:
goal_flag = True
elif y < 0:
for add_y in range(y, 0):
if self.__map_arr[next_x, agent_y + add_y] == self.GOAL:
goal_flag = True
if goal_flag is True:
next_x = self.__goal_pos[0]
next_y = self.__goal_pos[1]
next_action_arr = np.zeros((state_arr.shape[0], state_arr.shape[1]))
next_action_arr[next_x, next_y] = 1
next_action_arr = np.expand_dims(next_action_arr, axis=0)
if _possible_action_arr is None:
_possible_action_arr = next_action_arr
else:
_possible_action_arr = np.concatenate(
[
_possible_action_arr,
next_action_arr
],
axis=0
)
if _possible_action_arr is None:
for i in range(x_y_arr.shape[0]):
x = x_y_arr[i][0]
y = x_y_arr[i][1]
next_x = agent_x + x
if next_x < 0 or next_x >= state_arr.shape[1]:
continue
next_y = agent_y + y
if next_y < 0 or next_y >= state_arr.shape[0]:
continue
wall_flag = False
if x > 0:
for add_x in range(1, x):
if self.__map_arr[agent_x + add_x, next_y] == self.WALL:
wall_flag = True
elif x < 0:
for add_x in range(x, 0):
if self.__map_arr[agent_x + add_x, next_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if y > 0:
for add_y in range(1, y):
if self.__map_arr[next_x, agent_y + add_y] == self.WALL:
wall_flag = True
elif y < 0:
for add_y in range(y, 0):
if self.__map_arr[next_x, agent_y + add_y] == self.WALL:
wall_flag = True
if wall_flag is True:
continue
if self.__map_arr[next_x, next_y] == self.WALL:
continue
goal_flag = False
if x > 0:
for add_x in range(1, x):
if self.__map_arr[agent_x + add_x, next_y] == self.GOAL:
goal_flag = True
elif x < 0:
for add_x in range(x, 0):
if self.__map_arr[agent_x + add_x, next_y] == self.GOAL:
goal_flag = True
if goal_flag is False:
if y > 0:
for add_y in range(1, y):
if self.__map_arr[next_x, agent_y + add_y] == self.GOAL:
goal_flag = True
elif y < 0:
for add_y in range(y, 0):
if self.__map_arr[next_x, agent_y + add_y] == self.GOAL:
goal_flag = True
if goal_flag is True:
next_x = self.__goal_pos[0]
next_y = self.__goal_pos[1]
next_action_arr = np.zeros((state_arr.shape[0], state_arr.shape[1]))
next_action_arr[next_x, next_y] = 1
next_action_arr = np.expand_dims(next_action_arr, axis=0)
if _possible_action_arr is None:
_possible_action_arr = next_action_arr
else:
_possible_action_arr = np.concatenate(
[
_possible_action_arr,
next_action_arr
],
axis=0
)
if _possible_action_arr is None:
raise ValueError("No action option found. Please lower the `memory_num`.")
if _possible_action_arr.shape[0] < self.__possible_n:
row_diff = self.__possible_n - _possible_action_arr.shape[0]
while _possible_action_arr.shape[0] < self.__possible_n:
_possible_action_arr = np.concatenate(
[
_possible_action_arr,
_possible_action_arr[:row_diff]
],
axis=0
)
if _possible_action_arr.shape[0] > self.__possible_n:
key_arr = np.arange(_possible_action_arr.shape[0])
np.random.shuffle(key_arr)
_possible_action_arr = _possible_action_arr[key_arr[:self.__possible_n]]
# Forget oldest memory and do recuresive executing.
while len(self.__route_memory_list[batch]) > self.__memory_num:
self.__route_memory_list[batch] = self.__route_memory_list[batch][1:]
possible_action_arr[batch, :, 0] = _possible_action_arr
possible_action_arr[batch, :, 1] = self.__map_arr
for e in range(self.__enemy_num):
enemy_state_arr = np.zeros(state_arr.shape)
enemy_state_arr[
self.__enemy_pos_list[e][0],
self.__enemy_pos_list[e][1]
] = 1
possible_action_arr[batch, :, 2 + e] = enemy_state_arr
possible_action_arr = nd.ndarray.array(possible_action_arr, ctx=self.__ctx)
return possible_action_arr, None
[docs] def observe_state(self, state_arr, meta_data_arr):
'''
Observe states of agents in last epoch.
Args:
state_arr: Tensor of state.
meta_data_arr: meta data of the state.
'''
self.__state_arr = state_arr
self.__state_meta_data_arr = meta_data_arr
[docs] def observe_reward_value(
self,
state_arr,
action_arr,
meta_data_arr=None,
):
'''
Compute the reward value.
Args:
state_arr: Tensor of state.
action_arr: Tensor of action.
meta_data_arr: Meta data of actions.
Returns:
Reward value.
'''
reward_arr = self.__check_goal_flag(action_arr)
for i in range(action_arr.shape[0]):
_action_arr = action_arr[i, 0].asnumpy()
x, y = np.where(_action_arr == 1)
x, y = x[0], y[0]
e_dist_sum = 0.0
for e in range(self.__enemy_num):
e_dist = np.sqrt(
((x - self.__enemy_pos_list[e][0]) ** 2) + ((y - self.__enemy_pos_list[e][1]) ** 2)
)
e_dist_sum += e_dist
e_dist_penalty = e_dist_sum / self.__enemy_num
goal_x, goal_y = self.__goal_pos
if x == goal_x and y == goal_y:
distance = 0.0
else:
distance = np.sqrt(((x - goal_x) ** 2) + (y - goal_y) ** 2)
if self.inferencing_mode is False:
state_arr = self.__state_arr
if state_arr is not None:
_state_arr = state_arr[i, 0].asnumpy()
pre_x, pre_y = np.where(_state_arr == 1)
if pre_x == goal_x and pre_y == goal_y:
pre_distance = 0.0
else:
pre_distance = np.sqrt(((pre_x - goal_x) ** 2) + (pre_y - goal_y) ** 2)
distance_penalty = distance - pre_distance
if distance_penalty == 0:
distance_penalty = 1
else:
distance_penalty = 0
max_distance = (goal_x ** 2) + (goal_y ** 2)
reward_arr[i] = reward_arr[i] + (max_distance - distance) - distance_penalty + e_dist_penalty
reward_arr = nd.ndarray.array(reward_arr, ctx=self.__ctx)
reward_arr = nd.sigmoid(reward_arr / max_distance)
return reward_arr
[docs] def update_state(
self,
action_arr,
meta_data_arr=None
):
'''
Update state.
This method can be overrided for concreate usecases.
Args:
action_arr: action in `self.t`.
meta_data_arr: meta data of the action.
Returns:
Tuple data.
- state in `self.t+1`.
- meta data of the state.
'''
action_arr = action_arr.asnumpy()
for i in range(action_arr.shape[0]):
x, y = np.where(action_arr[0, 0] == 1)
self.__agent_pos_arr[i] = np.array([x[0], y[0]])
if self.inferencing_mode is True:
self.__route_memory_list[i].append((x[0], y[0]))
return self.extract_now_state(), meta_data_arr
def __check_goal_flag(self, state_arr):
goal_arr = np.zeros((state_arr.shape[0]))
state_arr = state_arr.asnumpy()
self.END_STATE_list = []
for i in range(state_arr.shape[0]):
x, y = np.where(state_arr[i, 0] == 1)
goal_x, goal_y = self.__goal_pos
if x[0] == goal_x and y[0] == goal_y:
goal_arr[i] = 1
self.END_STATE_list.append("Goal")
else:
goal_arr[i] = 0
self.END_STATE_list.append("Not goal")
return goal_arr
def __check_crash_flag(self, state_arr):
crash_arr = np.zeros((state_arr.shape[0]))
state_arr = state_arr.asnumpy()
self.END_STATE_list = []
for i in range(state_arr.shape[0]):
x, y = np.where(state_arr[i, 0] == 1)
x, y = x[0], y[0]
flag = False
for e in range(self.__enemy_num):
if x == self.__enemy_pos_list[e][0] and y == self.__enemy_pos_list[e][1]:
flag = True
break
crash_arr[i] = int(flag)
if flag is True:
self.END_STATE_list.append("Crash")
else:
self.END_STATE_list.append("Not crash")
return crash_arr
[docs] def check_the_end_flag(self, state_arr, meta_data_arr=None):
'''
Check the end flag.
If this return value is `True`, the learning is end.
As a rule, the learning can not be stopped.
This method should be overrided for concreate usecases.
Args:
state_arr: state in `self.t`.
meta_data_arr: meta data of the state.
Returns:
bool
'''
if state_arr is None:
return False
crash_arr = self.__check_crash_flag(state_arr)
goal_arr = self.__check_goal_flag(state_arr)
if goal_arr.sum() > 0 or crash_arr.sum() > 0:
return True
else:
return False
[docs] def set_readonly(self, value):
''' setter '''
raise TypeError("This property must be read-only.")
[docs] def get_map_arr(self):
''' getter '''
return self.__map_arr
map_arr = property(get_map_arr, set_readonly)