__author__ = 'SJSU Students' import heapq import math import sys # Define constants. ARG_FILENAME = sys.argv[1] ARG_HEURISTIC = sys.argv[2] CHAR_AGENT = '@' CHAR_EMPTY = '.' CHAR_WALL = '#' CHAR_RAMEN= '%' HEURISTIC_EUCLIDEAN = "euclidean" HEURISTIC_MANHATTAN = "manhattan" HEURISTIC_MADEUP = "made_up" ACTION_COST = 1 # Node object indexes. # A node is defined by a list containing: # X position (X), Y position (Y), cost (C), and heuristic (H). NODE_X = 0 NODE_Y = 1 NODE_C = 2 NODE_H = 3 def read_board(): """ Reads board data from file and interprets information. :return: List of lines, board height, board width. """ boardfile = open(ARG_FILENAME, "r") list = [] for line in boardfile: # Strip trailing new lines. list.append(line.rstrip('\n')) boardfile.close() height = len(list) # The width has to account for new line character. width = len(list[0]) return list, height, width def find_agent_position(blank=False): """ Finds the X and Y position of agent in given board. :param blank: True if blank board without agent should be saved, False otherwise. :return: Position X, position Y. """ global boardList posY = 0 for line in boardList: posX = line.find(CHAR_AGENT) if posX != -1: if blank: boardList[posY] = line.replace(CHAR_AGENT, CHAR_EMPTY) return posX, posY else: posY += 1 if posX == -1: print ("Agent not found.") sys.exit(0) def find_ramen_position(list): """ Finds the X and Y position of agent in given board. :return: Position X, position Y. """ posY = 0 for line in list: posX = line.find(CHAR_RAMEN) if posX != -1: return posX, posY else: posY += 1 if posX == -1: print ("Ramen not found.") sys.exit(0) def in_frontier(frontier, node): """ Determines if given node's state exists in the frontier list. :param frontier: Frontier list. :param node: Current node. :return: Index of node in frontier, - 1 if DNE. """ for index in range(0, len(frontier)): if frontier[index][1] == node: return index return -1 def is_valid_move(x, y): """ Determines if board character denotes a valid move. :param x: X-coordinate. :param y: Y-coordinate. :return: True if valid move, False otherwise. """ global boardList, boardHeight, boardWidth if (x >= 0 and x < boardWidth and y >= 0 and y < boardHeight and boardList[y][x] != CHAR_WALL): return True else: return False def is_adjacent(node1, node2): """ Determines if two board positions are adjacent. :param node1: First node. :param node2: Second node. :return: True if adjacent, False otherwise. """ if (node1[NODE_X] == node2[NODE_X] and (node1[NODE_Y] == node2[NODE_Y] - 1 or node1[NODE_Y] == node2[NODE_Y] + 1))\ or (node1[NODE_Y] == node2[NODE_Y] and (node1[NODE_X] == node2[NODE_X] - 1 or node1[NODE_X] == node2[NODE_X] + 1)): return True else: return False def find_board_actions(node): """ Finds all valid actions from provided node. :param node: Current node. :return: List of possible action nodes. """ global ramenPos actionList = [] # Validate UP move. if is_valid_move(node[NODE_X], node[NODE_Y] - 1): actionList.append(build_node(node[NODE_X], node[NODE_Y] - 1, ACTION_COST)) # Validate RIGHT move. if is_valid_move(node[NODE_X] + 1, node[NODE_Y]): actionList.append(build_node(node[NODE_X] + 1, node[NODE_Y], ACTION_COST)) # Validate DOWN move. if is_valid_move(node[NODE_X], node[NODE_Y] + 1): actionList.append(build_node(node[NODE_X], node[NODE_Y] + 1, ACTION_COST)) # Validate LEFT move. if is_valid_move(node[NODE_X] - 1, node[NODE_Y]): actionList.append(build_node(node[NODE_X] - 1, node[NODE_Y], ACTION_COST)) return actionList def build_node(x, y, c): """ Builds a node object. A node is defined by a tuple containing: X position (X), Y position (Y), cost (C), and heuristic (H). :param x: Node X-coordinate. :param y: Node Y-coordinate. :param c: Node cost. :return: Node object, form of tuple. """ global ramenPos return x, y, c, heuristic(x, y, ramenPos[NODE_X], ramenPos[NODE_Y]) def distance_euclidean(x1, y1, x2, y2): """ Calculates the Euclidean distance between two points. :param x1: X coordinate of Point 1. :param y1: Y coordinate of Point 1. :param x2: X coordinate of Point 2. :param y2: Y coordinate of Point 2. :return: Euclidean distance. """ return math.sqrt(pow(x2 - x1, 2) + pow(y2 - y1, 2)) def distance_manhattan(x1, y1, x2, y2): """ Calculates the Manhattan distance between two points. :param x1: X coordinate of Point 1. :param y1: Y coordinate of Point 1. :param x2: X coordinate of Point 2. :param y2: Y coordinate of Point 2. :return: Manhattan distance. """ return math.fabs(x2 - x1) + math.fabs(y2 - y1) def distance_average(x1, y1, x2, y2): """ Calculates the average of Euclidean and Manhattan. :param x1: X coordinate of Point 1. :param y1: Y coordinate of Point 1. :param x2: X coordinate of Point 2. :param y2: Y coordinate of Point 2. :return: Distance. """ return ((math.sqrt(pow(x2 - x1, 2) + pow(y2 - y1, 2))) + (math.fabs(x2 - x1) + math.fabs(y2 - y1))) / 2 def output_solution(explored): """ This function is called once the solution is found to format and print the steps taken. :param explored: Explored list. """ global agentPos solution = [] # Remove goal node. previous = explored[len(explored) - 1] solution.append(previous) # Loop through explored list and generate solution list. while (previous[NODE_X] != agentPos[NODE_X] or previous[NODE_Y] != agentPos[NODE_Y]): # Get first node that is adjacent to previous node. for node in explored: if is_adjacent(previous, node): previous = node solution.append(previous) break # Get solution in start-to-finish order. solution.reverse() # Format and print steps to solution state. for index in range(0, len(solution)): if index != 0: print_board("Step " + str(index) + ":", solution[index][NODE_X], solution[index][NODE_Y]) else: print_board("Initial:", solution[index][NODE_X], solution[index][NODE_Y]) print "Problem Solved! I had some noodles!" def print_board(text, posX, posY): """ Prints the board with specified header text and agent position. :param text: Header text. :param posX: X coordinate of agent. :param posY: Y coordinate of agent. """ global boardList line = "" print "\n" + text for index in range(0, len(boardList)): line = boardList[index] if index == posY: print line[:posX] + CHAR_AGENT + line[posX + 1:] else: print line # Load board data. boardList, boardHeight, boardWidth = read_board() # Load heuristic. if ARG_HEURISTIC == HEURISTIC_EUCLIDEAN: heuristic = distance_euclidean elif ARG_HEURISTIC == HEURISTIC_MANHATTAN: heuristic = distance_manhattan elif ARG_HEURISTIC == HEURISTIC_MADEUP: heuristic = distance_average else: print ("Invalid heuristic!") sys.exit(0) # Get agent and ramen position. agentPos = find_agent_position(True) ramenPos = find_ramen_position(boardList) # ------ BEGIN A* ALGORITHM ------ # # Current node is problem's initial state. currentNode = (agentPos[NODE_X], agentPos[NODE_Y], 0, heuristic(agentPos[NODE_X], agentPos[NODE_Y], ramenPos[NODE_X], ramenPos[NODE_Y])) # Frontier list contains tuples of the total cost and node tuple. frontierList = [] heapq.heappush(frontierList, (currentNode[NODE_C] + currentNode[NODE_H], currentNode)) # Explored list is initially empty. exploredList = [] # A* main loop. while True: # No solution if frontier list is empty. if len(frontierList) == 0: print "No solution!" sys.exit(0) # Pop lowest f-value node in frontier, # index 1 to get actual node, not (f+node) tuple. currentNode = heapq.heappop(frontierList)[1] # Perform goal test. if (currentNode[NODE_X] == ramenPos[NODE_X] and currentNode[NODE_Y] == ramenPos[NODE_Y]): exploredList.append(currentNode) output_solution(exploredList) sys.exit(0) # Add current node to explored list. exploredList.append(currentNode) actionList = find_board_actions(currentNode) # For every possible child/action, check to add to frontier. for childNode in actionList: frontierIndexOfChild = in_frontier(frontierList, childNode) # Add to frontier if node not in explored or frontier list. if (childNode not in exploredList and frontierIndexOfChild < 0): heapq.heappush(frontierList, (childNode[NODE_C] + childNode[NODE_H], childNode)) # Update frontier if node is in explored and new f-value is smaller elif (frontierIndexOfChild >= 0 and frontierList[frontierIndexOfChild][0] > (childNode[NODE_C] + childNode[NODE_H])): frontierList[frontierIndexOfChild] = ( childNode[NODE_C] + childNode[NODE_H], childNode) # Maintain the priority queue invariant if node f-value is changed. heapq.heapify(frontierList) # ------ END A* ALGORITHM ------ #