Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
FlexPool/simulator_MHRS.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
550 lines (459 sloc)
22.5 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
#### VERSION 4 | |
# In[ ]: | |
import pdb | |
import numpy as np | |
import pandas as pd | |
from pathgenerator import PathGenerator | |
import geohelper as gh | |
import geohash2 as Geohash | |
import sys | |
sys.path.append('/home/wenqi/Kaushik') | |
from utils.utils import Params | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
def index_pull(rows,trips_test_2): | |
val= trips_test_2[(trips_test_2['plat']==rows['plat'].item()) & (trips_test_2['plon']==rows['plon'].item())].index.values | |
return val | |
class FleetSimulator(object): | |
""" | |
FleetSimulator is an environment in which fleets mobility, dispatch | |
and passenger pickup / dropoff are simulated. | |
Init Variables: (type) Description | |
router: (PathGenerator) Allows for finding shortest paths for distance | |
eta_model: (h5) This model predicts eta given pickup and dropoff locations | |
cycle: (int) number of cycles **to be better understood | |
max_action_time (int) Action update cycle **to be better understood | |
List of Methods: See Individual Signatures for each of the below | |
reset(self, num_vehicles, dataset, dayofweek, minofday) | |
update_time(self) | |
step(self, actions=None) ** MAIN RUNNING METHOD | |
get_requests(self, num_steps, offset=0) | |
get_vehicles_dataframe(self) ** calls get_state from vehicle class | |
get_vehicles_location(self) ** calls get_location from vehicle class | |
get_vehicles_score(self) ** calls get_score from vehicle class | |
""" | |
def __init__(self, params G, eta_model, cycle, max_action_time=15): | |
self.init_params(params) | |
self.router = PathGenerator(G) | |
self.eta_model = eta_model | |
self.cycle = cycle | |
self.max_action_time = max_action_time | |
self.scenario = 'MHGD' | |
def init_params(self,params): | |
self.TIMESTEP = params.TIMESTEP | |
self.GEOHASH_PRECISION = params.GEOHASH_PRECISION | |
self.REJECT_DISTANCE = params.REJECT_DISTANCE | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
self.RIDE_REWARD = params.RIDE_REWARD | |
self.TRIP_REWARD = params.TRIP_REWARD | |
self.WAIT_COST = params.WAIT_COST | |
self.HOP_REWARD = params.HOP_REWARD | |
self.MIN_TRIPTIME = params.MIN_TRIPTIME # in meters | |
self.ASSIGNMENT_SPEED = params.ASSIGNMENT_SPEED # km/h (grand circle distance) | |
def reset(self, num_vehicles, dataset, dayofweek, minofday): | |
""" | |
This initiates the simulator environment taking the following arguments | |
Args: | |
self: see init | |
num_vehicles: (int) number of "resources" to initiate. | |
Ultimately instantiates this number of vehicle objects. | |
dataset:(pd.DataFrame) chunks loaded from experiments.py | |
dayofweek: (int) exactly what it sounds like | |
minofday: (int) exactly what it sounds like | |
""" | |
self.requests = dataset | |
self.current_time = 0 | |
self.minofday = minofday | |
self.dayofweek = dayofweek | |
#randomly assign initial locations based on where the first N pickups are to be made. | |
init_locations = self.requests[['plat', 'plon']].values[np.arange(num_vehicles) % len(self.requests)] | |
self.vehicles = [Vehicle(i, init_locations[i]) for i in range(num_vehicles)] | |
def update_time(self): | |
self.current_time += self.TIMESTEP | |
self.minofday += int(self.TIMESTEP / 60.0) | |
if self.minofday >= 1440: | |
self.minofday -= 1440 | |
self.dayofweek = (self.dayofweek + 1) % 7 | |
def step(self, actions=None): | |
""" | |
step forward the environment by self.TIMESTEP. | |
This is the main function that updates the state space, takes actions from dqn, and returns rewards(score). | |
""" | |
num_steps = int(self.cycle * 60.0 / self.TIMESTEP) | |
if actions: | |
self.dispatch(actions) | |
#pdb.set_trace() | |
requests = self.get_requests(num_steps) | |
wait, reject, gas,request_hop_zero = 0, 0, 0, 0 | |
for _ in range(num_steps): | |
for vehicle in self.vehicles: | |
#Is the only purpose of vehicle.transition to get gas usage? | |
gas += vehicle.transition() | |
X = self.get_vehicles_location() | |
W = requests[(requests.second >= self.current_time) &(requests.second < self.current_time + self.TIMESTEP)] | |
if len(W) == 0: | |
print("WE GOT NO REQUESTS") | |
#pdb.set_trace() | |
assignments = self.rough_match(X, W) | |
#pdb.set_trace() | |
wait_,num_vehicles,num_passengers,request_hop_zero_ = self.assign(assignments) | |
#######EDITED HERE########### setting the vehicle instance variable based on assigned passengers | |
#vehicle.passengers = num_passengers | |
#######EDITED HERE########### | |
wait += wait_ | |
assignment_length = len(assignments) | |
reject += len(W) - num_passengers | |
request_hop_zero += request_hop_zero_ | |
self.update_time() | |
vehicles = self.get_vehicles_dataframe() | |
return vehicles, requests, wait, reject, gas, request_hop_zero | |
def get_requests(self, num_steps, offset=0): | |
requests = self.requests[(self.requests.second >= self.current_time + offset * self.TIMESTEP) | |
&(self.requests.second < self.current_time + self.TIMESTEP * (num_steps + offset))] | |
return requests | |
def get_vehicles_dataframe(self): | |
vehicles = [vehicle.get_state() for vehicle in self.vehicles] | |
#######EDITED HERE########### added curr_passengers | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'available', 'geohash', 'dest_geohash', | |
'eta', 'status', 'reward', 'lat', 'lon', 'idle','eff_dist','act_dist','curr_passengers']) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_location(self): | |
vehicles = [vehicle.get_location() for vehicle in self.vehicles] | |
#######EDITED HERE########### | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'lat', 'lon', 'available', 'curr_passengers', 'capacity']) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_score(self): | |
vehicles = [vehicle.get_score() for vehicle in self.vehicles] | |
vehicles = pd.DataFrame(vehicles, columns=['id', 'reward', 'service_time', 'idle_time']) | |
return vehicles | |
def post_process(self,prelim_assignments, R): | |
assignments = {} | |
rejects = [] | |
#r:request ; v:vehicle | |
for r,v in prelim_assignments: | |
capacity = R[R.id == v]['capacity'].values[0] | |
curr_passengers = R.loc[R.id == v]['curr_passengers'].values[0] | |
seats = int(capacity-curr_passengers) | |
if len(r)>seats: | |
#print('Shortened',len(r),'to ', len(r[:seats])) | |
r = r[:seats] | |
rejected = r[seats:] | |
for i in rejected: | |
rejects.append() | |
assignments[v] = r | |
return assignments, rejects | |
def rough_match(self,Resources, tasks, reject_distance = 7000): | |
""""" | |
Rough Greedy Matching Algorithm for cars and passengers within a zone. | |
""""" | |
#print("~~~~~~~~~~~~~~~~ Running ROUGH MATCH ~~~~~~~~~~~~~~~~") | |
assignments = {} | |
Resources['remaining_seats'] = Resources['capacity'] - Resources['curr_passengers'] | |
R = Resources[Resources.available == 1] | |
# Getting the locations of cars that are available | |
it = 0 | |
#pdb.set_trace() | |
while len(tasks)>0 and len(R)>0: | |
tasks_uniq = tasks.groupby(['plat','plon']).count() | |
tasks_uniq = tasks_uniq.reset_index(level=['plat','plon']) | |
tasks_uniq = tasks_uniq[['plat','plon']] | |
tasks_uniq['index']=tasks_uniq.apply(lambda rows:tasks[(tasks['plat']==rows['plat'].item()) & (tasks['plon']==rows['plon'].item())].index.values , axis=1) | |
N = min(len(tasks_uniq), len(R)) | |
# Calculate distances from each of the cars and unique pick up locations | |
d = gh.distance_in_meters(R.lat.values, | |
R.lon.values, | |
tasks_uniq.plat.values[:, None], | |
tasks_uniq.plon.values[:, None]) | |
# Create a np zero array of the number of assignments to make | |
vids = np.zeros(N, dtype=int) | |
# For each assignment, pick the closest car considering the the reject distance | |
for i in range(N): | |
vid = d[i].argmin() | |
if d[i, vid] > reject_distance: | |
vids[i] = -1 #assign -1 for a rejection | |
else: | |
vids[i] = vid | |
d[:, vid] = float('inf') #updating the d array for some reason. | |
# Getting indices of accepted tasks. Seems like its those which vid>0 | |
index_values = tasks_uniq.index[:N][vids >= 0] | |
if len(R['id'].iloc[vids[vids >= 0]])<1: | |
print("All cars out of range") | |
break | |
prelim_assignments = list(zip(tasks_uniq.loc[index_values]['index'], R['id'].iloc[vids[vids >= 0]])) | |
true_assignments, rejects = self.post_process(prelim_assignments, R) | |
assignments.update(true_assignments) | |
for car_id in assignments.keys(): | |
passengers_added = len(assignments[car_id]) | |
#print(R.loc[R['id'].isin([car_id])]) | |
R.loc[R['id'].isin([car_id]), 'curr_passengers'] += passengers_added | |
R.loc[R['id'].isin([car_id]), 'remaining_seats'] -= passengers_added | |
#print(R.loc[R['id'].isin([car_id])]) | |
R['available'] = R.apply(lambda row: 1 if row['remaining_seats'] >= 1 else 0, axis=1) | |
#print("len(R):", len(R)) | |
#print("len(tasks): ",len(tasks)) | |
R = R[R.available == 1] | |
tasks = tasks.iloc[rejects] | |
it += 1 | |
#print("Iteration #%d"%it) | |
#print("len(R):", len(R)) | |
#print("len(tasks): ",len(tasks)) | |
return assignments | |
def greedy_match(self, Resources, Tasks, slack=0.1, reject_dist=7000): | |
""""" | |
Strict Greedy Matching Algorithm for cars and passengers within a zone. | |
""""" | |
assignments = {} | |
no_rejected = 0 | |
Resources['remaining_seats'] = Resources['capacity'] - Resources['curr_passengers'] | |
Resources = Resources[Resources.available == 1] | |
Tasks['id'] = Tasks.index | |
total_remaining_seats = np.sum(Resources['remaining_seats'].values) | |
total_tasks = len(Tasks) | |
if total_tasks<total_remaining_seats: | |
print("!!!!!!!!!CASE 1!!!!!!!!!!!!!") | |
N = total_tasks | |
for idx, task in Tasks.iterrows(): | |
Resources['available'] = Resources.apply(lambda row: 1 if row['remaining_seats'] >= 1 else 0, axis=1) | |
Resources = Resources[Resources.available == 1] | |
if len(Resources) == 0: | |
print("RAN OUT OF RESOURCES ! !! !!!") | |
break | |
d = gh.distance_in_meters(Resources.lat.values,Resources.lon.values,task.plat,task.plon) | |
if d[d.argmin()]<reject_dist: | |
car_id = int(Resources.iloc[d.argmin()]['id']) | |
pickup_id = task.id | |
#assignments.append((car_id,pickup_id)) | |
assignments.setdefault(car_id,[]).append(pickup_id) | |
Resources.loc[Resources['id'].isin([car_id]), 'curr_passengers'] += 1 | |
Resources.loc[Resources['id'].isin([car_id]), 'remaining_seats'] -= 1 | |
else: | |
no_rejected += 1 | |
else: | |
print("!!!!!!!!!CASE 2!!!!!!!!!!!!!") | |
for idx, resource in Resources.iterrows(): | |
Resources['available'] = Resources.apply(lambda row: 1 if row['remaining_seats'] >= 1 else 0, axis=1) | |
seats = int(resource.capacity-resource.curr_passengers) | |
for i in range(seats): | |
d = gh.distance_in_meters(Tasks.plat.values,Tasks.plon.values,resource.lat,resource.lon) | |
pickup_id = Tasks.iloc[d.argmin()]['id'] | |
car_id = int(resource.id) | |
#assignments.append((car_id,pickup_id)) | |
assignments.setdefault(car_id,[]).append(pickup_id) | |
Resources.loc[Resources['id'].isin([car_id]), 'curr_passengers'] += 1 | |
Resources.loc[Resources['id'].isin([car_id]), 'remaining_seats'] -= 1 | |
task_indices = Tasks[Tasks.id == pickup_id].index | |
#pdb.set_trace() | |
Tasks.drop(task_indices,inplace=True) | |
return assignments | |
def assign(self, assignments): | |
""" | |
Request to vehicle assignments are already made using match(self,tasks,resources) | |
This method is more of a helper function that does assignments of requests in dataset to the vehicle class. | |
""" | |
num_vehicles = len(assignments) | |
request_hop_zero = 0 | |
#Total Number of passengers assigned in the entire step. | |
num_passengers = 0 | |
wait = 0 | |
effective_d = 0 | |
actual_d = 0 | |
for v in assignments: | |
r = assignments.get(v) | |
vehicle = self.vehicles[v] # pointer to a Vehicle object | |
request = self.requests.loc[r] | |
num_present_vehicle_pass = len(request) | |
request_hop_zero += sum(request['hop_flag']==0) | |
num_passengers += num_present_vehicle_pass | |
request_sort = request.sort_values(by=['trip_time']) | |
first_row = request_sort.iloc[0] | |
last_row = request_sort.iloc[-1] | |
#print (last_row) | |
ploc = (first_row.plat, first_row.plon) | |
dloc = (last_row.dlat, last_row.dlon) | |
#dloc = (last_row.dlat, last_row.dlon) | |
vloc = vehicle.location | |
#print (vloc[0], vloc[1], ploc[0], ploc[1]) | |
d = gh.distance_in_meters(vloc[0], vloc[1], ploc[0], ploc[1]) | |
# wait_time = 1 + d / (ASSIGNMENT_SPEED * 1000 / 60) | |
wait_time = (d * 2 / 1.414) / (self.ASSIGNMENT_SPEED * 1000 / 60) | |
#print (d) | |
##calculation for the trip time , effective distance, total distance | |
eff_distance = sum(request.trip_distance) | |
request_sort_copy = request_sort.copy() | |
request_sort.loc[:,'index_val'] = range(len(request_sort)) | |
request_sort_copy.loc[:,'index_val'] = request_sort['index_val'].values -1 | |
request_sort_copy = request_sort_copy[['dlat','dlon','index_val']] | |
request_sort_join = pd.merge(request_sort, request_sort_copy, how='left', on=None, left_on='index_val', right_on='index_val', | |
left_index=False, right_index=False, sort=False,suffixes=('_x', '_y'), | |
copy=True, indicator=False,validate=None) | |
request_join_no_na = request_sort_join.dropna() | |
#print (len(request_join_no_na)) | |
#print (request_) | |
#request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x, | |
# request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
if len(request_join_no_na) > 0: | |
request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x,request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
actual_distance = first_row.trip_distance + (sum(request_join_no_na.dist_bw_dest))/1000 | |
trip_time = (actual_distance/self.ASSIGNMENT_SPEED)*60 | |
#print ('shared....') | |
#print (type(actual_distance),type(trip_time)) | |
#print (trip_time) | |
else: | |
actual_distance = first_row.trip_distance | |
trip_time = first_row.trip_time | |
#print ('not shared....') | |
#print (type(actual_distance),type(trip_time)) | |
#print (trip_time) | |
#trip_time = request.trip_time | |
vehicle.start_service(dloc, wait_time, trip_time,actual_distance,eff_distance,num_present_vehicle_pass) | |
wait += wait_time | |
# effective_d +=eff_distance | |
# actual_d += actual_distance | |
return wait,num_vehicles,num_passengers,request_hop_zero | |
def dispatch(self, actions): | |
cache = [] | |
distances = [] | |
vids, targets = zip(*actions) | |
vlocs = [self.vehicles[vid].location for vid in vids] | |
for vloc, tloc in zip(vlocs, targets): | |
try: | |
p, d, s, t = self.router.map_matching_shortest_path(vloc, tloc) | |
cache.append((p, s, t)) | |
distances.append(d) | |
except: | |
start_lat = vloc[0] | |
start_lon = vloc[1] | |
dest_lat = tloc[0] | |
dest_lon = tloc[1] | |
d = gh.distance_in_meters(start_lat,start_lon, dest_lat,dest_lon) | |
distances.append(d) | |
N = len(vids) | |
X = np.zeros((N, 7)) | |
X[:, 0] = self.dayofweek | |
X[:, 1] = self.minofday / 60.0 | |
X[:, 2:4] = vlocs | |
X[:, 4:6] = targets | |
X[:, 6] = distances | |
trip_times = self.eta_model.predict(X) | |
for i, vid in enumerate(vids): | |
if trip_times[i] > self.MIN_TRIPTIME: | |
#p, s, t = cache[i] | |
#step = distances[i] / (trip_times[i] * 60.0 / self.TIMESTEP) | |
#trajectory = self.router.generate_path(vlocs[i], targets[i], step, p, s, t) | |
eta = min(trip_times[i], self.max_action_time) | |
self.vehicles[vid].route([], eta) | |
return | |
class Vehicle(object): | |
""" | |
Status available location eta storage_id | |
WT: waiting 1 real 0 0 | |
MV: moving 1 real >0 0 | |
SV: serving 0 future >0 0 | |
ST: stored 0 real 0 >0 | |
CO: carry-out 0 real >0 r>0 | |
""" | |
def __init__(self, vehicle_id, location): | |
self.id = vehicle_id | |
self.status = 'WT' | |
self.location = location | |
self.zone = Geohash.encode(location[0], location[1], precision=self.GEOHASH_PRECISION) | |
self.available = True | |
self.trajectory = [] | |
self.eta = 0 | |
self.idle = 0 | |
self.total_idle = 0 | |
self.total_service = 0 | |
self.reward = 0 | |
self.effective_d =0 | |
self.actual_d = 0 | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
self.capacity = 3 | |
####### EDITED HERE ######## | |
def update_location(self, location): | |
lat, lon = location | |
self.location = (lat, lon) | |
self.zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
def transition(self): | |
cost = 0 | |
if self.status != 'SV': | |
self.idle += self.TIMESTEP/60.0 | |
if self.eta > 0: | |
time = min(self.TIMESTEP/60.0, self.eta) | |
self.eta -= time | |
# moving | |
#if self.trajectory: | |
if self.status == 'MV': | |
#self.update_location(self.trajectory.pop(0)) | |
cost = time | |
self.reward -= cost | |
if self.eta <= 0: | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
####### EDITED HERE ######## | |
# serving -> waiting | |
if self.status == 'SV': | |
self.available = True | |
self.status = 'WT' | |
# moving -> waiting | |
elif self.status == 'MV': | |
self.status = 'WT' | |
return cost | |
def start_service(self, destination, wait_time, trip_time,actual_distance,eff_distance,num_pass): | |
#print (type(wait_time)) | |
#print (type(trip_time)) | |
#print type(destination) | |
if not self.available: | |
print ("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.available = False | |
self.update_location(destination) | |
self.total_idle += self.idle + wait_time | |
######EDITED HERE######## | |
if actual_distance == 0: | |
num_hops = actual_distance | |
else: | |
num_hops = eff_distance / actual_distance | |
######EDITED HERE######## | |
self.idle = 0 | |
self.eta = wait_time + trip_time | |
self.total_service += trip_time | |
######### EDITED HERE ######### | |
self.passengers = num_pass | |
######### EDITED HERE ######### | |
self.reward += (self.RIDE_REWARD*num_pass) + (self.TRIP_REWARD * trip_time) - (self.WAIT_COST * wait_time) - (self.HOP_REWARD*num_hops) | |
self.trajectory = [] | |
self.status = 'SV' | |
self.effective_d += eff_distance | |
self.actual_d +=actual_distance | |
return True | |
def route(self, path, trip_time): | |
if not self.available: | |
print ("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.eta = trip_time | |
self.trajectory = path | |
self.status = 'MV' | |
return True | |
def get_state(self): | |
if self.trajectory: | |
lat, lon = self.trajectory[-1] | |
dest_zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
else: | |
dest_zone = self.zone | |
lat, lon = self.location | |
#######EDITED HERE########### added self.passengers at the end | |
return (self.id, int(self.available), self.zone, dest_zone, | |
self.eta, self.status, self.reward, lat, lon, self.idle,self.effective_d,self.actual_d,self.passengers) | |
#######EDITED HERE########### | |
def get_location(self): | |
lat, lon = self.location | |
if self.passengers>=self.capacity: | |
self.available = False | |
#######EDITED HERE########### | |
return (self.id, lat, lon, int(self.available), int(self.passengers), int(self.capacity)) | |
#######EDITED HERE########### | |
def get_score(self): | |
return (self.id, self.reward, self.total_service, self.total_idle) |