Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
#!/usr/bin/env python
# coding: utf-8
import numpy as np
import random
from collections import deque
class Map():
EMPTY = 0
OBSTACLE = 1
AGENT = 2
GOAL = 3
class Action():
LEFT = 0
RIGHT = 1
UP = 2
DOWN = 3
def get_name(num):
if num == Action.LEFT:
return "LEFT"
elif num == Action.RIGHT:
return "RIGHT"
elif num == Action.UP:
return "UP"
elif num == Action.DOWN:
return "DOWN"
else:
raise Exception(f"Invalid action number: {num}")
class EnvGrid():
def __init__(self, config):
self.config = config
if config.obs_type == "3D":
self.state_dim = config.height*config.width*3
elif config.obs_type == "2D":
self.state_dim = config.height*config.width
elif config.obs_type == "1D":
self.state_dim = 4
else:
raise Exception(f"Invalid obs_type: {config.obs_type}")
self.action_dim = 4
self.map_size = config.height * config.width
self.timeout = config.timeout
self.obstacle_list = config.obstacle_list
# self.default_reward = -0.1
self.obstacle_reward = -1.0
self.default_reward = config.default_reward
self.goal_reward = self.map_size
def view(self):
print(self.obstacle_map*Map.OBSTACLE + self.agent_map*Map.AGENT + self.goal_map*Map.GOAL)
def get_observation(self):
if self.config.obs_type == "3D":
return np.array([self.obstacle_map, self.agent_map, self.goal_map])
elif self.config.obs_type == "2D":
return np.array(self.obstacle_map*Map.OBSTACLE + self.agent_map*Map.AGENT + self.goal_map*Map.GOAL)
elif self.config.obs_type == "1D":
# return np.concatenate([self.agent_position, self.goal_position])
return np.array(self.agent_position)
def get_optimal_score(self):
"""
searching from goal backwards to agent filling scores of the best route
score at the agent position represents cumulative reward of the best policy
"""
score_map = np.zeros((self.config.height, self.config.width))
# An agent adjacent to goal only receives goal reward but not default reward.
# Thus, if the value of the goal position is goal_reward-default_reward, the value of the adjacent positions will be goal_reward.
score_map[self.goal_position] = self.goal_reward-self.default_reward
exp_queue = deque([self.goal_position])
# Start exploring from goal position
while exp_queue:
pos = exp_queue.popleft()
for dir in [(-1,0), (0, -1), (1,0), (0,1)]:
# explore into four directions
next_pos, score = self.get_next_score(score_map, pos, dir)
if score:
# if next position is valid move
if self.agent_map[next_pos] > 0:
# if next position is agent, we are done with this route
score_map[next_pos] = score
else:
# if not continue exploring from the position
score_map[next_pos] = score
exp_queue.append(next_pos)
return score_map[self.agent_position]
def get_next_score(self, score_map, pos, dir):
next_pos = (pos[0]+dir[0], pos[1]+dir[1])
if next_pos[0] < 0 or next_pos[1] < 0:
# out of map
return None, None
elif next_pos[0] >= self.config.height or next_pos[1] >= self.config.width:
# out of map
return None, None
elif score_map[next_pos] >= score_map[pos]:
# not the best route (there is a better one)
return None, None
elif self.obstacle_map[next_pos] > 0:
# encountered obstacle
return next_pos, score_map[pos]+self.obstacle_reward
return next_pos, score_map[pos]+self.default_reward
def reset(self, move_obstacle=False):
self.done = False
self.truncated = False
self.elapsed = 0
# Initializing map
self.obstacle_map = np.zeros((self.config.height, self.config.width), int)
self.agent_map = np.zeros((self.config.height, self.config.width), int)
self.goal_map = np.zeros((self.config.height, self.config.width), int)
# Place goal
self.goal_position = (self.config.height-1, self.config.width-1)
self.goal_map[self.goal_position] = 1
# place_goal = False
# while not place_goal:
# self.goal_position = (random.randint(0, self.config.height-1), random.randint(0, self.config.width-1))
# if self.obstacle_map[self.goal_position] == 0 and self.agent_map[self.goal_position] == 0:
# place_goal = True
# self.goal_map[self.goal_position] = 1
# Place agent
self.agent_position = (0,0)
self.agent_map[self.agent_position] = 1
# place_agent = False
# while not place_agent:
# self.agent_position = (random.randint(0, self.config.height-1), random.randint(0, self.config.width-1))
# if self.obstacle_map[self.agent_position] == 0:
# place_agent = True
# self.agent_map[self.agent_position] = 1
# Place predefined obstacles
new_obstacle_list = []
for obstacle in self.obstacle_list:
new_obstacle = obstacle
if move_obstacle or random.random() < self.config.obstacle_stochasticity:
direction = random.randint(0, 3)
if direction == Action.LEFT:
if obstacle[1] > 0:
new_obstacle = (obstacle[0], obstacle[1]-1)
elif direction == Action.RIGHT:
if obstacle[1] < self.config.width-1:
new_obstacle = (obstacle[0], obstacle[1]+1)
elif direction == Action.UP:
if obstacle[0] > 0:
new_obstacle = (obstacle[0]-1, obstacle[1])
elif direction == Action.DOWN:
if obstacle[0] < self.config.height-1:
new_obstacle = (obstacle[0]+1, obstacle[1])
if new_obstacle[0] <= 1 and new_obstacle[1] <= 0:
# Too near the starting point
self.obstacle_map[obstacle] = 1
new_obstacle_list.append(obstacle)
elif new_obstacle[0] >= 3 and new_obstacle[1] >= 3:
# Too near the goal point
self.obstacle_map[obstacle] = 1
new_obstacle_list.append(obstacle)
elif self.goal_map[new_obstacle] == 0 and self.agent_map[new_obstacle] == 0:
self.obstacle_map[new_obstacle] = 1
new_obstacle_list.append(new_obstacle)
else:
self.obstacle_map[obstacle] = 1
new_obstacle_list.append(obstacle)
self.obstacle_list = new_obstacle_list
return self.get_observation(), {}
def save_map(self):
return [self.obstacle_list, self.agent_position, self.goal_position]
def load_map(self, preset):
self.done = False
self.truncated = False
self.elapsed = 0
# Initializing map
self.obstacle_map = np.zeros((self.config.height, self.config.width), int)
self.agent_map = np.zeros((self.config.height, self.config.width), int)
self.goal_map = np.zeros((self.config.height, self.config.width), int)
# Place predefined obstacles
for obstacle in preset[0]:
self.obstacle_map[obstacle] = 1
# Place agent
self.agent_position = preset[1]
self.agent_map[self.agent_position] = 1
# Place goal
self.goal_position = preset[2]
self.goal_map[self.goal_position] = 1
return self.get_observation(), {}
def get_reward(self, state, action):
agent_position = (state[0],state[1])
if action == Action.LEFT:
if agent_position[1] < 1:
if self.obstacle_map[agent_position] == 1:
return self.obstacle_reward
elif self.goal_map[agent_position] == 1:
return self.goal_reward
else:
return self.default_reward
if self.obstacle_map[(agent_position[0], agent_position[1]-1)] == 1:
return self.obstacle_reward
elif self.goal_map[(agent_position[0], agent_position[1]-1)] == 1:
return self.goal_reward
else:
return self.default_reward
elif action == Action.RIGHT:
if agent_position[1] > self.config.width-2:
if self.obstacle_map[agent_position] == 1:
return self.obstacle_reward
elif self.goal_map[agent_position] == 1:
return self.goal_reward
else:
return self.default_reward
if self.obstacle_map[(agent_position[0], agent_position[1]+1)] == 1:
return self.obstacle_reward
elif self.goal_map[(agent_position[0], agent_position[1]+1)] == 1:
return self.goal_reward
else:
return self.default_reward
elif action == Action.UP:
if agent_position[0] < 1:
if self.obstacle_map[agent_position] == 1:
return self.obstacle_reward
elif self.goal_map[agent_position] == 1:
return self.goal_reward
else:
return self.default_reward
if self.obstacle_map[(agent_position[0]-1, agent_position[1])] == 1:
return self.obstacle_reward
elif self.goal_map[(agent_position[0]-1, agent_position[1])] == 1:
return self.goal_reward
else:
return self.default_reward
elif action == Action.DOWN:
if agent_position[0] > self.config.height-2:
if self.obstacle_map[agent_position] == 1:
return self.obstacle_reward
elif self.goal_map[agent_position] == 1:
return self.goal_reward
else:
return self.default_reward
if self.obstacle_map[(agent_position[0]+1, agent_position[1])] == 1:
return self.obstacle_reward
elif self.goal_map[(agent_position[0]+1, agent_position[1])] == 1:
return self.goal_reward
else:
return self.default_reward
else:
raise Exception(f"Unsupported action type: {action}")
def step(self, action):
if self.done or self.truncated:
print("Please start a new episode")
return None, None, self.done, self.truncated, {}
self.elapsed += 1
if self.elapsed > self.timeout-1:
# self.view()
# print(f"[Timeout] action: {Action.get_name(action)}")
self.truncated = True
if action == Action.LEFT:
if self.agent_position[1] < 1:
if self.obstacle_map[self.agent_position] == 0:
return self.get_observation(), self.default_reward, self.done, self.truncated, {}
else:
return self.get_observation(), self.obstacle_reward, self.done, self.truncated, {}
elif self.goal_map[(self.agent_position[0], self.agent_position[1]-1)] > 0:
self.done = True
return self.get_observation(), self.goal_reward, self.done, self.truncated, {}
else:
self.agent_map[self.agent_position] = 0
self.agent_position = (self.agent_position[0], self.agent_position[1]-1)
self.agent_map[self.agent_position] = 1
elif action == Action.RIGHT:
if self.agent_position[1] > self.config.width-2:
if self.obstacle_map[self.agent_position] == 0:
return self.get_observation(), self.default_reward, self.done, self.truncated, {}
else:
return self.get_observation(), self.obstacle_reward, self.done, self.truncated, {}
elif self.goal_map[(self.agent_position[0], self.agent_position[1]+1)] > 0:
self.done = True
return self.get_observation(), self.goal_reward, self.done, self.truncated, {}
else:
self.agent_map[self.agent_position] = 0
self.agent_position = (self.agent_position[0], self.agent_position[1]+1)
self.agent_map[self.agent_position] = 1
elif action == Action.UP:
if self.agent_position[0] < 1:
if self.obstacle_map[self.agent_position] == 0:
return self.get_observation(), self.default_reward, self.done, self.truncated, {}
else:
return self.get_observation(), self.obstacle_reward, self.done, self.truncated, {}
elif self.goal_map[(self.agent_position[0]-1, self.agent_position[1])] > 0:
self.done = True
return self.get_observation(), self.goal_reward, self.done, self.truncated, {}
else:
self.agent_map[self.agent_position] = 0
self.agent_position = (self.agent_position[0]-1, self.agent_position[1])
self.agent_map[self.agent_position] = 1
elif action == Action.DOWN:
if self.agent_position[0] > self.config.height-2:
if self.obstacle_map[self.agent_position] == 0:
return self.get_observation(), self.default_reward, self.done, self.truncated, {}
else:
return self.get_observation(), self.obstacle_reward, self.done, self.truncated, {}
elif self.goal_map[(self.agent_position[0]+1, self.agent_position[1])] > 0:
self.done = True
return self.get_observation(), self.goal_reward, self.done, self.truncated, {}
else:
self.agent_map[self.agent_position] = 0
self.agent_position = (self.agent_position[0]+1, self.agent_position[1])
self.agent_map[self.agent_position] = 1
else:
raise Exception(f"Unsupported action type: {action}")
if self.obstacle_map[self.agent_position] == 0:
return self.get_observation(), self.default_reward, self.done, self.truncated, {}
else:
return self.get_observation(), self.obstacle_reward, self.done, self.truncated, {}
def stop(self):
pass