-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontosocabs_env.py
169 lines (140 loc) · 7.39 KB
/
contosocabs_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import gym
from gym.utils import seeding
import os
from os import stat
import numpy as np
import random
from gym import spaces
from numpy.lib.function_base import select
from itertools import permutations
from gym.utils import seeding
from gym import spaces
class ContosoCabs_v0 (gym.Env):
# Defining hyperparameters
m = 5 # number of cities, ranges from 0 ..... m
t = 24 # number of hours, ranges from 0 .... t-1
d = 7 # number of days, ranges from 0 ... d-1
C = 5 # Per hour fuel and other costs
R = 9 # per hour revenue from a passenger
metadata = {
"render.modes": ["human"]
}
def tm(self):
"""Time Matrix acts as the random matrix with random distance computed for (source,destination) pairs"""
return np.random.randint(1, 24, (5,5,24,7))
def __init__(self, config):
"""initialise your state and define your action space and state space"""
self.day = np.random.choice(np.arange(0, self.d))
self.time = np.random.choice(np.arange(0,self.t))
self.location = np.random.choice(np.arange(0,self.m))
self.action_space = spaces.Discrete(self.m * self.m) # APEX does not support tuple action spaces
self.action_space_values = [(p,q) for p in range(self.m) for q in range(self.m)]
self.observation_space = spaces.Tuple((spaces.Discrete(self.m), spaces.Discrete(self.t), spaces.Discrete(self.d)))
self.state_init = (self.location, self.time, self.day)
self.state = self.state_init
self.time_matrix = self.tm()
self.episode_length = 24*30
# Start the first round
self.reset()
def get_updated_time(self, time, day):
if(time > 23):
time = time % 24
day = day + 1
if (day >=7):
day = day % 7
return time, day
def step(self, action_index):
"""Takes in state, action and returns the state, reward, next_state, trip_hours
Args:
- action: the action agent wants to take
"""
# print(f'step action..{action}')
# when user is at A 10 AM On Monday, and receives a request for (B,C)
action = self.action_space_values[action_index]
current_location = self.state[0] # A
start_location = action[0] # B
end_location = action[1] # C
current_hour_of_day = self.state[1] # 10
current_day_of_week = self.state[2] # 1
reward = 0
# check if the action is invalid
if action[0] == action[1]:
next_state_location = current_location
next_state_hour_of_day = current_hour_of_day + 1
next_state_day_of_week = current_day_of_week
next_state_hour_of_day, next_state_day_of_week = self.get_updated_time(next_state_hour_of_day, next_state_day_of_week)
hours_of_trip = 1 # 1 hour the driver is idle
reward = -self.C
elif action[0] == current_location: # if the drive is at the pick up location
# this is the destination city
next_state_location = end_location
start_location_to_end_location = int(self.time_matrix[start_location][end_location][current_hour_of_day][current_day_of_week])
next_state_hour_of_day = current_hour_of_day + start_location_to_end_location
next_state_day_of_week = current_day_of_week
# the time it takes to complete the trip from the start hour could be more than 24 hours, hence
next_state_hour_of_day, next_state_day_of_week = self.get_updated_time(next_state_hour_of_day, next_state_day_of_week)
hours_of_trip = start_location_to_end_location
revenue = self.R * start_location_to_end_location
cost_of_trip = self.C * (hours_of_trip)
reward = revenue - cost_of_trip
else:
# this is the destination city
next_state_location = end_location
# the time it takes to reach the pick up location
current_location_to_start_location = int(self.time_matrix[current_location][start_location][current_hour_of_day][current_day_of_week])
# If it takes 1 hour to each reach B from A, this is 11,
start_hour_of_day = current_hour_of_day + current_location_to_start_location
start_day_of_week = current_day_of_week
# this can be more than 23 in which case its next day
start_hour_of_day, start_day_of_week = self.get_updated_time(start_hour_of_day, start_day_of_week)
# this is the time it takes to reach B -> C
start_location_to_end_location = int(self.time_matrix[start_location][end_location][start_hour_of_day][start_day_of_week])
# hour of day in next state = current time + time it takes to reach start location + time it takes to reach destination
next_state_hour_of_day = current_hour_of_day + current_location_to_start_location + start_location_to_end_location
next_state_day_of_week = start_day_of_week
# the time it takes to complete the trip from the start hour could be more than 24 hours, hence
next_state_hour_of_day, next_state_day_of_week = self.get_updated_time(next_state_hour_of_day, next_state_day_of_week)
hours_of_trip = current_location_to_start_location + start_location_to_end_location
# reward calculation
# reward = revenue (B, C) - cost of (A-> B and B-> C)
revenue = self.R * start_location_to_end_location
cost_of_trip = self.C * (hours_of_trip)
reward = revenue - cost_of_trip
self.reward = reward
self.state = (next_state_location, next_state_hour_of_day, next_state_day_of_week)
self.info["hours_of_trip"] += hours_of_trip # steps is hours completed so far.
# if the driver travelled 30 hours terminate episode.
self.done = self.info["hours_of_trip"] >= self.episode_length
return [self.state, self.reward, self.done, self.info]
def reset(self):
self.state = (self.location, self.time, self.day)
self.hoursoftrip = 0
self.reward = 0
self.done = False
self.info = {}
self.info["hours_of_trip"] = 0
return self.state
def render(self, mode="human"):
s = "state: {} reward: {:2d} info: {}"
print(s.format(self.state, self.reward, self.info))
def seed (self, seed=None):
""" Sets the seed for this env's random number generator(s).
Note:
Some environments use multiple pseudorandom number generators.
We want to capture all such seeds used in order to ensure that
there aren't accidental correlations between multiple generators.
Returns:
list<bigint>: Returns the list of seeds used in this env's random
number generators. The first value in the list should be the
"main" seed, or the value which a reproducer should pass to
'seed'. Often, the main seed equals the provided 'seed', but
this won't be true if seed=None, for example.
"""
self.np_random, seed = seeding.np_random(seed)
return [seed]
def close (self):
"""Override close in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when
garbage collected or when the program exits.
"""
pass