Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
FlexPool/simulator_v3.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
457 lines (375 sloc)
18.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# In[ ]: | |
import pdb | |
import numpy as np | |
import pandas as pd | |
from pathgenerator import PathGenerator | |
import geohelper as gh | |
import geohash2 as Geohash | |
TIMESTEP = 60 | |
GEOHASH_PRECISION = 7 | |
REJECT_DISTANCE = 7000 | |
MIN_TRIPTIME = 1.0 # in meters | |
ASSIGNMENT_SPEED = 50 # km/h (grand circle distance) | |
######~~REWARD_PARAMETERS~~###### | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
# REWARD IN CODE BELOW: (B1 * btn)RIDE_REWARD*num_pass + (B2 * ctn)TRIP_REWARD * trip_time - (B3 * dtnl)WAIT_COST * wait_time - (B5 * Ht)HOP_REWARD*num_hops | |
#ORIGINALLY IN CODE: RIDE_REWARD = 10.0 #5,6,7,8,9,10 | |
RIDE_REWARD = 5 #As In Paper | |
TRIP_REWARD = 1.0 #As In Paper | |
#ORIGINALLY IN CODE: WAIT_COST = 0.05 | |
WAIT_COST = 3.5 #As In Paper | |
#ORIGINALLY IN CODE: HOP_REWARD = 3.5 | |
HOP_REWARD = 2 #As In Paper | |
######~~REWARD_PARAMETERS~~###### | |
def index_pull(rows,trips_test_2): | |
val= trips_test_2[(trips_test_2['plat']==rows['plat'].item()) & (trips_test_2['plon']==rows['plon'].item())].index.values | |
return val | |
class FleetSimulator(object): | |
""" | |
FleetSimulator is an environment in which fleets mobility, dispatch | |
and passenger pickup / dropoff are simulated. | |
Init Variables: (type) Description | |
router: (PathGenerator) Allows for finding shortest paths for distance | |
eta_model: (h5) This model predicts eta given pickup and dropoff locations | |
cycle: (int) number of cycles **to be better understood | |
max_action_time (int) Action update cycle **to be better understood | |
List of Methods: See Individual Signatures for each of the below | |
reset(self, num_vehicles, dataset, dayofweek, minofday) | |
update_time(self) | |
step(self, actions=None) ** MAIN RUNNING METHOD | |
get_requests(self, num_steps, offset=0) | |
get_vehicles_dataframe(self) ** calls get_state from vehicle class | |
get_vehicles_location(self) ** calls get_location from vehicle class | |
get_vehicles_score(self) ** calls get_score from vehicle class | |
""" | |
def __init__(self, G, eta_model, cycle, max_action_time=15): | |
self.router = PathGenerator(G) | |
self.eta_model = eta_model | |
self.cycle = cycle | |
self.max_action_time = max_action_time | |
def reset(self, num_vehicles, dataset, dayofweek, minofday): | |
""" | |
This initiates the simulator environment taking the following arguments | |
Args: | |
self: see init | |
num_vehicles: (int) number of "resources" to initiate. | |
Ultimately instantiates this number of vehicle objects. | |
dataset:(pd.DataFrame) chunks loaded from experiments.py | |
dayofweek: (int) exactly what it sounds like | |
minofday: (int) exactly what it sounds like | |
""" | |
self.requests = dataset | |
self.current_time = 0 | |
self.minofday = minofday | |
self.dayofweek = dayofweek | |
#randomly assign initial locations based on where the first N pickups are to be made. | |
init_locations = self.requests[['plat', 'plon']].values[np.arange(num_vehicles) % len(self.requests)] | |
self.vehicles = [Vehicle(i, init_locations[i]) for i in range(num_vehicles)] | |
def update_time(self): | |
self.current_time += TIMESTEP | |
self.minofday += int(TIMESTEP / 60.0) | |
if self.minofday >= 1440: | |
self.minofday -= 1440 | |
self.dayofweek = (self.dayofweek + 1) % 7 | |
def step(self, actions=None): | |
""" | |
step forward the environment by TIMESTEP. | |
This is the main function that updates the state space, takes actions from dqn, and returns rewards(score). | |
""" | |
num_steps = int(self.cycle * 60.0 / TIMESTEP) | |
if actions: | |
self.dispatch(actions) | |
requests = self.get_requests(num_steps) | |
wait, reject, gas,request_hop_zero = 0, 0, 0, 0 | |
for _ in range(num_steps): | |
for vehicle in self.vehicles: | |
#Is the only purpose of vehicle.transition to get gas usage? | |
gas += vehicle.transition() | |
X = self.get_vehicles_location() | |
W = requests[(requests.second >= self.current_time) | |
&(requests.second < self.current_time + TIMESTEP)] | |
if len(W) == 0: | |
print("WE GOT NO REQUESTS") | |
pdb.set_trace() | |
assignments = self.match(X, W) | |
#pdb.set_trace() | |
wait_,num_vehicles,num_passengers,request_hop_zero_ = self.assign(assignments) | |
#######EDITED HERE########### setting the vehicle instance variable based on assigned passengers | |
#vehicle.passengers = num_passengers | |
#######EDITED HERE########### | |
wait += wait_ | |
assignment_length = len(assignments) | |
reject += len(W) - num_passengers | |
request_hop_zero += request_hop_zero_ | |
self.update_time() | |
vehicles = self.get_vehicles_dataframe() | |
return vehicles, requests, wait, reject, gas, request_hop_zero | |
def get_requests(self, num_steps, offset=0): | |
requests = self.requests[(self.requests.second >= self.current_time + offset * TIMESTEP) | |
&(self.requests.second < self.current_time + TIMESTEP * (num_steps + offset))] | |
return requests | |
def get_vehicles_dataframe(self): | |
vehicles = [vehicle.get_state() for vehicle in self.vehicles] | |
#######EDITED HERE########### added curr_passengers | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'available', 'geohash', 'dest_geohash', | |
'eta', 'status', 'reward', 'lat', 'lon', 'idle','eff_dist','act_dist','curr_passengers']) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_location(self): | |
vehicles = [vehicle.get_location() for vehicle in self.vehicles] | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'lat', 'lon', 'available']) | |
return vehicles | |
def get_vehicles_score(self): | |
vehicles = [vehicle.get_score() for vehicle in self.vehicles] | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'reward', 'service_time', 'idle_time']) | |
return vehicles | |
def match(self, resources, tasks): | |
""""" | |
Greedy Matching Algorithm for cars and passengers within a zone. | |
""""" | |
#non_hop_tasks = tasks[tasks['hop_flag']==0] | |
#non_hop_tasks_col = non_hop_tasks['plat','plon'] | |
#hop_tasks = tasks[tasks['hop_flag']==1] | |
# Getting the locations of cars that are available | |
R = resources[resources.available == 1] | |
# Amongst the requests, group the ones with similar pickup locations and get counts | |
tasks_uniq = tasks.groupby(['plat','plon']).count() | |
# Reset Index for this new counts dataframe | |
tasks_uniq = tasks_uniq.reset_index(level=['plat','plon']) | |
# Only take the lat and lon columns | |
tasks_uniq = tasks_uniq[['plat','plon']] | |
# Reset the index of the counts dataframe by this apply function. Not sure what its significance is. If its important, i'll come back to this. | |
###### EDITED THIS######### | |
tasks_uniq['index']=tasks_uniq.apply(lambda rows:tasks[(tasks['plat']==rows['plat'].item()) & (tasks['plon']==rows['plon'].item())].index.values , axis=1) | |
###### EDITED THIS######### | |
# Calculate distances from each of the cars and unique pick up locations | |
d = gh.distance_in_meters(R.lat.values, | |
R.lon.values, | |
tasks_uniq.plat.values[:, None], | |
tasks_uniq.plon.values[:, None]) | |
# The number of assignments to make is determined by whichever (R or tasks_uniq) is lower. | |
# If there are two few cars in the area, more passengers will get rejected | |
N = min(len(tasks_uniq), len(R)) | |
# Create a np zero array of the number of assignments to make | |
vids = np.zeros(N, dtype=int) | |
# For each assignment, pick the closest car considering the the reject distance | |
for i in range(N): | |
vid = d[i].argmin() | |
if d[i, vid] > REJECT_DISTANCE: | |
vids[i] = -1 #assign -1 for a rejection | |
else: | |
vids[i] = vid | |
d[:, vid] = float('inf') #updating the d array for some reason. | |
# Getting indices of accepted tasks. Seems like its those which vid>0 | |
index_values = tasks_uniq.index[:N][vids >= 0] | |
#trips_indices = tasks.loc[index_values]['index'] | |
#pdb.set_trace() | |
# Zip to the accepted tasks (using indices) to the resources for which vid>0. | |
assignments = list(zip(tasks_uniq.loc[index_values]['index'], R['id'].iloc[vids[vids >= 0]])) | |
#assignments = zip(tasks.index[:N][vids >= 0], R['id'].iloc[vids[vids >= 0]]) | |
return assignments | |
def assign(self, assignments): | |
""" | |
Request to vehicle assignments are already made using match(self,tasks,resources) | |
This method is more of a helper function that does assignments of requests in dataset to the vehicle class. | |
""" | |
num_vehicles = len(assignments) | |
request_hop_zero = 0 | |
#Total Number of passengers assigned in the entire step. | |
num_passengers = 0 | |
wait = 0 | |
effective_d = 0 | |
actual_d = 0 | |
for r, v in assignments: | |
vehicle = self.vehicles[v] # pointer to a Vehicle object | |
request = self.requests.loc[r] | |
num_present_vehicle_pass = len(request) | |
request_hop_zero += sum(request['hop_flag']==0) | |
num_passengers += num_present_vehicle_pass | |
request_sort = request.sort_values(by=['trip_time']) | |
first_row = request_sort.iloc[0] | |
last_row = request_sort.iloc[-1] | |
#print (last_row) | |
ploc = (first_row.plat, first_row.plon) | |
dloc = (last_row.dlat, last_row.dlon) | |
#dloc = (last_row.dlat, last_row.dlon) | |
vloc = vehicle.location | |
#print (vloc[0], vloc[1], ploc[0], ploc[1]) | |
d = gh.distance_in_meters(vloc[0], vloc[1], ploc[0], ploc[1]) | |
# wait_time = 1 + d / (ASSIGNMENT_SPEED * 1000 / 60) | |
wait_time = (d * 2 / 1.414) / (ASSIGNMENT_SPEED * 1000 / 60) | |
#print (d) | |
##calculation for the trip time , effective distance, total distance | |
eff_distance = sum(request.trip_distance) | |
request_sort_copy = request_sort.copy() | |
request_sort.loc[:,'index_val'] = range(len(request_sort)) | |
request_sort_copy.loc[:,'index_val'] = request_sort['index_val'].values -1 | |
request_sort_copy = request_sort_copy[['dlat','dlon','index_val']] | |
request_sort_join = pd.merge(request_sort, request_sort_copy, how='left', on=None, left_on='index_val', right_on='index_val', | |
left_index=False, right_index=False, sort=False,suffixes=('_x', '_y'), | |
copy=True, indicator=False,validate=None) | |
request_join_no_na = request_sort_join.dropna() | |
#print (len(request_join_no_na)) | |
#print (request_) | |
#request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x, | |
# request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
if len(request_join_no_na) > 0: | |
request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x,request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
actual_distance = first_row.trip_distance + (sum(request_join_no_na.dist_bw_dest))/1000 | |
trip_time = (actual_distance/ASSIGNMENT_SPEED)*60 | |
#print ('shared....') | |
#print (type(actual_distance),type(trip_time)) | |
#print (trip_time) | |
else: | |
actual_distance = first_row.trip_distance | |
trip_time = first_row.trip_time | |
#print ('not shared....') | |
#print (type(actual_distance),type(trip_time)) | |
#print (trip_time) | |
#trip_time = request.trip_time | |
vehicle.start_service(dloc, wait_time, trip_time,actual_distance,eff_distance,num_present_vehicle_pass) | |
wait += wait_time | |
# effective_d +=eff_distance | |
# actual_d += actual_distance | |
return wait,num_vehicles,num_passengers,request_hop_zero | |
def dispatch(self, actions): | |
cache = [] | |
distances = [] | |
vids, targets = zip(*actions) | |
vlocs = [self.vehicles[vid].location for vid in vids] | |
for vloc, tloc in zip(vlocs, targets): | |
try: | |
p, d, s, t = self.router.map_matching_shortest_path(vloc, tloc) | |
cache.append((p, s, t)) | |
distances.append(d) | |
except: | |
start_lat = vloc[0] | |
start_lon = vloc[1] | |
dest_lat = tloc[0] | |
dest_lon = tloc[1] | |
d = gh.distance_in_meters(start_lat,start_lon, dest_lat,dest_lon) | |
distances.append(d) | |
N = len(vids) | |
X = np.zeros((N, 7)) | |
X[:, 0] = self.dayofweek | |
X[:, 1] = self.minofday / 60.0 | |
X[:, 2:4] = vlocs | |
X[:, 4:6] = targets | |
X[:, 6] = distances | |
trip_times = self.eta_model.predict(X) | |
for i, vid in enumerate(vids): | |
if trip_times[i] > MIN_TRIPTIME: | |
#p, s, t = cache[i] | |
#step = distances[i] / (trip_times[i] * 60.0 / TIMESTEP) | |
#trajectory = self.router.generate_path(vlocs[i], targets[i], step, p, s, t) | |
eta = min(trip_times[i], self.max_action_time) | |
self.vehicles[vid].route([], eta) | |
return | |
class Vehicle(object): | |
""" | |
Status available location eta storage_id | |
WT: waiting 1 real 0 0 | |
MV: moving 1 real >0 0 | |
SV: serving 0 future >0 0 | |
ST: stored 0 real 0 >0 | |
CO: carry-out 0 real >0 r>0 | |
""" | |
def __init__(self, vehicle_id, location): | |
self.id = vehicle_id | |
self.status = 'WT' | |
self.location = location | |
self.zone = Geohash.encode(location[0], location[1], precision=GEOHASH_PRECISION) | |
self.available = True | |
self.trajectory = [] | |
self.eta = 0 | |
self.idle = 0 | |
self.total_idle = 0 | |
self.total_service = 0 | |
self.reward = 0 | |
self.effective_d =0 | |
self.actual_d = 0 | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
self.cap = 3 | |
####### EDITED HERE ######## | |
def update_location(self, location): | |
lat, lon = location | |
self.location = (lat, lon) | |
self.zone = Geohash.encode(lat, lon, precision=GEOHASH_PRECISION) | |
def transition(self): | |
cost = 0 | |
if self.status != 'SV': | |
self.idle += TIMESTEP/60.0 | |
if self.eta > 0: | |
time = min(TIMESTEP/60.0, self.eta) | |
self.eta -= time | |
# moving | |
#if self.trajectory: | |
if self.status == 'MV': | |
#self.update_location(self.trajectory.pop(0)) | |
cost = time | |
self.reward -= cost | |
if self.eta <= 0: | |
# serving -> waiting | |
if self.status == 'SV': | |
self.available = True | |
self.status = 'WT' | |
# moving -> waiting | |
elif self.status == 'MV': | |
self.status = 'WT' | |
return cost | |
#vehicle.start_service(dloc, wait_time, trip_time,actual_distance,eff_distance,num_present_vehicle_pass) | |
def start_service(self, destination, wait_time, trip_time,actual_distance,eff_distance,num_pass): | |
#print (type(wait_time)) | |
#print (type(trip_time)) | |
#print type(destination) | |
if not self.available: | |
print ("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.available = False | |
self.update_location(destination) | |
self.total_idle += self.idle + wait_time | |
######EDITED HERE######## | |
if actual_distance == 0: | |
num_hops = actual_distance | |
else: | |
num_hops = eff_distance / actual_distance | |
######EDITED HERE######## | |
self.idle = 0 | |
self.eta = wait_time + trip_time | |
self.total_service += trip_time | |
######### EDITED HERE ######### | |
self.passengers = num_pass | |
######### EDITED HERE ######### | |
#ORIGINAL : self.reward += RIDE_REWARD*num_pass + TRIP_REWARD * trip_time - WAIT_COST * wait_time - HOP_REWARD*num_hops | |
self.reward += RIDE_REWARD*num_pass - (TRIP_REWARD * trip_time) - (WAIT_COST * wait_time) - (HOP_REWARD*num_hops) | |
self.trajectory = [] | |
self.status = 'SV' | |
self.effective_d += eff_distance | |
self.actual_d +=actual_distance | |
return True | |
def route(self, path, trip_time): | |
if not self.available: | |
print ("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.eta = trip_time | |
self.trajectory = path | |
self.status = 'MV' | |
return True | |
def get_state(self): | |
if self.trajectory: | |
lat, lon = self.trajectory[-1] | |
dest_zone = Geohash.encode(lat, lon, precision=GEOHASH_PRECISION) | |
else: | |
dest_zone = self.zone | |
lat, lon = self.location | |
#######EDITED HERE########### added self.passengers at the end | |
return (self.id, int(self.available), self.zone, dest_zone, | |
self.eta, self.status, self.reward, lat, lon, self.idle,self.effective_d,self.actual_d,self.passengers) | |
#######EDITED HERE########### | |
def get_location(self): | |
lat, lon = self.location | |
return (self.id, lat, lon, int(self.available)) | |
def get_score(self): | |
return (self.id, self.reward, self.total_service, self.total_idle) |