{ "cells": [ { "cell_type": "code", "execution_count": 3, "metadata": { "id": "AXX58hnRW3Xs" }, "outputs": [ { "ename": "ModuleNotFoundError", "evalue": "No module named 'numpy'", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", "Cell \u001b[1;32mIn[3], line 1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mnumpy\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mnp\u001b[39;00m\n", "\u001b[1;31mModuleNotFoundError\u001b[0m: No module named 'numpy'" ] } ], "source": [ "import numpy as np" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lb8WtTWGJx_R" }, "outputs": [], "source": [ "class ReinforcementGridAgent():\n", " def __init__(self,rows,cols,actions,action_to_steps,gamma,terminal_states,obstacles,REWARD_NORMAL):\n", " self.rows=rows\n", " self.cols=cols\n", " self.actions=actions\n", " self.action_to_steps=action_to_steps\n", " self.gamma=gamma\n", " self.terminal_states=terminal_states\n", " self.REWARD_NORMAL = REWARD_NORMAL\n", " self.obstacles=obstacles\n", " self.INTENDED_PROB=0.8\n", " self.PERPENDICULAR_PROB=0.1\n", "\n", " def initialize_grid(self):\n", " rewards = np.full((self.rows, self.cols),self.REWARD_NORMAL)\n", " for (r, c), reward in self.terminal_states.items():\n", " rewards[r, c] = reward\n", " values = np.zeros((self.rows, self.cols))\n", " return rewards, values\n", "\n", " def is_terminal(self,state):\n", " return state in self.terminal_states\n", "\n", " def is_obstacle(self,state):\n", " return state in self.obstacles\n", "\n", " def next_state(self,state, action):\n", " delta = self.action_to_steps[action]\n", " next_r, next_c = state[0] + delta[0], state[1] + delta[1]\n", " if 0 <= next_r < self.rows and 0 <= next_c < self.cols and not self.is_obstacle((next_r, next_c)):\n", " return (next_r, next_c)\n", " return state\n", "\n", "\n", " def get_transitions(self,state, action):\n", " if self.is_terminal(state):\n", " return [(1.0, state)]\n", " transitions = []\n", " for act, prob in [(action, self.INTENDED_PROB),\n", " (self.left_turn(action), self.PERPENDICULAR_PROB),\n", " (self.right_turn(action), self.PERPENDICULAR_PROB)]:\n", " transitions.append((prob, self.next_state(state, act)))\n", " return transitions\n", "\n", "\n", " def left_turn(self,action):\n", " return {'U': 'L', 'L': 'D', 'D': 'R', 'R': 'U'}[action]\n", " def right_turn(self,action):\n", " return {'U': 'R', 'R': 'D', 'D': 'L', 'L': 'U'}[action]\n", "\n", "\n", " def policy_evaluation(self,policy, rewards, values, theta=1e-3):\n", " while True:\n", " delta = 0\n", " for r in range(self.rows):\n", " for c in range(self.cols):\n", " state = (r, c)\n", " if self.is_terminal(state) or self.is_obstacle(state):\n", " continue\n", " v = values[r, c]\n", "\n", " new_value = sum(prob * (rewards[s_next] + self.gamma * values[s_next])\n", " for prob, s_next in self.get_transitions(state, policy[state]))\n", " values[r, c] = new_value\n", " delta = max(delta, abs(v - new_value))\n", " if delta < theta:\n", " break\n", "\n", "\n", "\n", " def policy_improvement(self,policy, rewards, values):\n", " stable = True\n", " for r in range(self.rows):\n", " for c in range(self.cols):\n", " state = (r, c)\n", " if self.is_terminal(state) or self.is_obstacle(state):\n", " continue\n", " old_action = policy[state]\n", " new_action = max(self.actions, key=lambda a: sum(prob * (rewards[s_next] + self.gamma * values[s_next])\n", " for prob, s_next in self.get_transitions(state, a)))\n", " policy[state] = new_action\n", " if old_action != new_action:\n", " stable = False\n", " return stable\n", "\n", " def policy_iteration(self):\n", " rewards, values = self.initialize_grid()\n", " policy = { (r, c): np.random.choice(self.actions) for r in range(self.rows) for c in range(self.cols)\n", " if not self.is_terminal((r, c)) and not self.is_obstacle((r, c)) }\n", " policy.update({s: None for s in self.terminal_states})\n", " while True:\n", " self.policy_evaluation(policy, rewards, values)\n", " if self.policy_improvement(policy, rewards, values):\n", " break\n", " return policy, values\n", "\n", "\n", " def value_iteration(self,theta=1e-3):\n", " rewards, values = self.initialize_grid()\n", " while True:\n", " delta = 0\n", " for r in range(self.rows):\n", " for c in range(self.cols):\n", " state = (r, c)\n", " if self.is_terminal(state) or self.is_obstacle(state):\n", " continue\n", " v = values[r, c]\n", " values[r, c] = max(sum(prob * (rewards[s_next] + self.gamma * values[s_next])\n", " for prob, s_next in self.get_transitions(state, a)) for a in self.actions)\n", " delta = max(delta, abs(v - values[r, c]))\n", " if delta < theta:\n", " break\n", "\n", " policy={}\n", "\n", " for r in range(self.rows):\n", " for c in range(self.cols):\n", " state = (r, c)\n", " if self.is_terminal(state) or self.is_obstacle(state):\n", " policy[state] = None\n", " else:\n", " policy[state] = max(self.actions, key=lambda a: sum(prob * (rewards[s_next] + self.gamma * values[s_next])\n", " for prob, s_next in self.get_transitions(state, a)))\n", " return policy,values" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NjW9QI_tFUeo" }, "outputs": [], "source": [ "\n", "ROWS, COLS = 3, 4\n", "REWARD_NORMAL = -0.01\n", "REWARD_POSITIVE = 1.0\n", "REWARD_NEGATIVE = -1.0\n", "TERMINAL_STATES = {(2, 3): 1.0, (1, 3): -1.0}\n", "OBSTACLES = {(1, 1):None}\n", "GAMMA = 0.7\n", "INTENDED_PROB = 0.8\n", "PERPENDICULAR_PROB = 0.1\n", "ACTIONS = ['U', 'D', 'L', 'R']\n", "ACTION_TO_Changes = {'U': (1, 0), 'D': (-1, 0), 'L': (0, -1), 'R': (0, 1)}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "FxmV55b_W1uQ" }, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "7DoqTqbdZTom", "outputId": "4abd956f-5be0-4f9d-f019-dc1bb68f93c6" }, "outputs": [], "source": [ "model = ReinforcementGridAgent(ROWS,COLS,ACTIONS,ACTION_TO_Changes,GAMMA,TERMINAL_STATES,OBSTACLES,-0.01)\n", "policy,values=model.policy_iteration()\n", "state_names=[\"s1\",\"s2\",\"s3\",\"s4\",\"s5\",\"s6\",\"s7\",\"s8\",\"s9\",\"s10\",\"s11\",\"s12\"]\n", "states={}\n", "k=0\n", "for (r,c) in policy:\n", " states[state_names[k]]=policy[(r,c)]\n", " k+=1\n", "print(states)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "2fiCNb4HZXF0", "outputId": "a50844d4-5a73-4218-805d-0f65da2c5b93" }, "outputs": [], "source": [ "policy1,values1=model.value_iteration()\n", "\n", "k=0\n", "for (r,c) in policy:\n", " states[state_names[k]]=policy[(r,c)]\n", " k+=1\n", "print(states)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "apjoPCe6W-2R" }, "outputs": [], "source": [] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.11" } }, "nbformat": 4, "nbformat_minor": 0 }