Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
FlexPool/simulator_DeepPool.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
498 lines (431 sloc)
18.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# DEEPPOOL SIMULATOR | |
import numpy as np | |
import pandas as pd | |
from pathgenerator import PathGenerator | |
import geohelper as gh | |
import geohash2 as Geohash | |
import sys | |
sys.path.append("/home/wenqi/Kaushik") | |
from utils.utils import Params | |
import pdb | |
def index_pull(rows, trips_test_2): | |
val = trips_test_2[ | |
(trips_test_2["plat"] == rows["plat"].item()) & (trips_test_2["plon"] == rows["plon"].item()) | |
].index.values | |
return val | |
class FleetSimulator(object): | |
""" | |
FleetSimulator is an environment in which fleets mobility, dispatch | |
and passenger pickup / dropoff are simulated. | |
""" | |
def __init__(self, params, G, eta_model, cycle, max_action_time=15): | |
self.params = params | |
self.init_params(params) | |
self.router = PathGenerator(G) | |
self.eta_model = eta_model | |
self.cycle = cycle | |
self.max_action_time = max_action_time | |
self.scenario = "DeepPool" | |
def init_params(self, params): | |
self.TIMESTEP = params.TIMESTEP | |
self.GEOHASH_PRECISION = params.GEOHASH_PRECISION | |
self.REJECT_DISTANCE = params.REJECT_DISTANCE | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
self.RIDE_REWARD = params.RIDE_REWARD | |
self.TRIP_REWARD = params.TRIP_REWARD | |
self.WAIT_COST = params.WAIT_COST | |
self.MIN_TRIPTIME = params.MIN_TRIPTIME # in meters | |
self.ASSIGNMENT_SPEED = params.ASSIGNMENT_SPEED # km/h (grand circle distance) | |
def reset(self, num_vehicles, dataset, dayofweek, minofday): | |
self.requests = dataset | |
self.current_time = 0 | |
self.minofday = minofday | |
self.dayofweek = dayofweek | |
init_locations = self.requests[["plat", "plon"]].values[np.arange(num_vehicles) % len(self.requests)] | |
self.vehicles = [Vehicle(i, init_locations[i], self.params) for i in range(num_vehicles)] | |
def update_time(self): | |
self.current_time += self.TIMESTEP | |
self.minofday += int(self.TIMESTEP / 60.0) | |
if self.minofday >= 1440: | |
self.minofday -= 1440 | |
self.dayofweek = (self.dayofweek + 1) % 7 | |
def step(self, actions=None): | |
""" | |
step forward the environment by TIMESTEP | |
""" | |
num_steps = int(self.cycle * 60.0 / self.TIMESTEP) | |
if actions: | |
self.dispatch(actions) | |
requests = self.get_requests(num_steps) | |
wait, reject, gas, idle = 0, 0, 0, 0 | |
for _ in range(num_steps): | |
for vehicle in self.vehicles: | |
gas += vehicle.transition() | |
idle += vehicle.idle | |
X = self.get_vehicles_location() | |
W = requests[(requests.second >= self.current_time) & (requests.second < self.current_time + self.TIMESTEP)] | |
# pdb.set_trace() | |
assignments = self.rough_match(X, W) | |
wait_, num_vehicles, num_passengers, original_requests = self.assign(assignments) | |
wait += wait_ | |
assignment_length = len(assignments) | |
reject += len(W) - num_passengers | |
self.update_time() | |
vehicles = self.get_vehicles_dataframe() | |
v_score = self.get_vehicles_score() | |
return vehicles, requests, wait, reject, gas, idle, original_requests, v_score | |
def get_requests(self, num_steps, offset=0): | |
requests = self.requests[ | |
(self.requests.second >= self.current_time + offset * self.TIMESTEP) | |
& (self.requests.second < self.current_time + self.TIMESTEP * (num_steps + offset)) | |
] | |
return requests | |
def get_vehicles_dataframe(self): | |
vehicles = [vehicle.get_state() for vehicle in self.vehicles] | |
#######EDITED HERE########### added curr_passengers | |
vehicles = pd.DataFrame( | |
vehicles, | |
columns=[ | |
"id", | |
"available", | |
"geohash", | |
"dest_geohash", | |
"eta", | |
"status", | |
"reward", | |
"lat", | |
"lon", | |
"idle", | |
"eff_dist", | |
"act_dist", | |
"curr_passengers", | |
], | |
) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_location(self): | |
vehicles = [vehicle.get_location() for vehicle in self.vehicles] | |
#######EDITED HERE########### | |
vehicles = pd.DataFrame(vehicles, columns=["id", "lat", "lon", "available", "curr_passengers", "capacity"]) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_score(self): | |
vehicles = [vehicle.get_score() for vehicle in self.vehicles] | |
vehicles = pd.DataFrame(vehicles, columns=["id", "reward_score", "service_time", "idle_time"]) | |
return vehicles | |
def post_process(self, prelim_assignments, R): | |
assignments = {} | |
rejects = [] | |
# r:request ; v:vehicle | |
for r, v in prelim_assignments: | |
capacity = R[R.id == v]["capacity"].values[0] | |
curr_passengers = R.loc[R.id == v]["curr_passengers"].values[0] | |
seats = int(capacity - curr_passengers) | |
if len(r) > seats: | |
# print('Shortened',len(r),'to ', len(r[:seats])) | |
r = r[:seats] | |
rejected = r[seats:] | |
for i in rejected: | |
rejects.append() | |
assignments[v] = r | |
return assignments, rejects | |
def rough_match(self, Resources, tasks, reject_distance=7000): | |
"""""s | |
Rough Greedy Matching Algorithm for cars and passengers within a zone. | |
""" "" | |
# print("~~~~~~~~~~~~~~~~ Running ROUGH MATCH ~~~~~~~~~~~~~~~~") | |
assignments = {} | |
Resources["remaining_seats"] = Resources["capacity"] - Resources["curr_passengers"] | |
R = Resources[Resources.available == 1] | |
# Getting the locations of cars that are available | |
it = 0 | |
while len(tasks) > 0 and len(R) > 0: | |
tasks_uniq = tasks.groupby(["plat", "plon"]).count() | |
tasks_uniq = tasks_uniq.reset_index(level=["plat", "plon"]) | |
tasks_uniq = tasks_uniq[["plat", "plon"]] | |
tasks_uniq["index"] = tasks_uniq.apply( | |
lambda rows: tasks[ | |
(tasks["plat"] == rows["plat"].item()) & (tasks["plon"] == rows["plon"].item()) | |
].index.values, | |
axis=1, | |
) | |
N = min(len(tasks_uniq), R.remaining_spots.sum()) | |
# Calculate distances from each of the cars and unique pick up locations | |
d = gh.distance_in_meters( | |
R.lat.values, R.lon.values, tasks_uniq.plat.values[:, None], tasks_uniq.plon.values[:, None] | |
) | |
# Create a np zero array of the number of assignments to make | |
vids = np.zeros(N, dtype=int) | |
# For each assignment, pick the closest car considering the the reject distance | |
for i in range(N): | |
vid = d[i].argmin() | |
if d[i, vid] > reject_distance: | |
vids[i] = -1 # assign -1 for a rejection | |
else: | |
vids[i] = vid | |
d[:, vid] = float("inf") # updating the d array for some reason. | |
# Getting indices of accepted tasks. Seems like its those which vid>0 | |
index_values = tasks_uniq.index[:N][vids >= 0] | |
if len(R["id"].iloc[vids[vids >= 0]]) < 1: | |
print("All cars out of range") | |
break | |
prelim_assignments = list(zip(tasks_uniq.loc[index_values]["index"], R["id"].iloc[vids[vids >= 0]])) | |
true_assignments, rejects = self.post_process(prelim_assignments, R) | |
assignments.update(true_assignments) | |
for car_id in assignments.keys(): | |
passengers_added = len(assignments[car_id]) | |
# print(R.loc[R['id'].isin([car_id])]) | |
R.loc[R["id"].isin([car_id]), "curr_passengers"] += passengers_added | |
R.loc[R["id"].isin([car_id]), "remaining_seats"] -= passengers_added | |
# print(R.loc[R['id'].isin([car_id])]) | |
R["available"] = R.apply(lambda row: 1 if row["remaining_seats"] >= 1 else 0, axis=1) | |
# print("len(R):", len(R)) | |
# print("len(tasks): ",len(tasks)) | |
R = R[R.available == 1] | |
tasks = tasks.iloc[rejects] | |
it += 1 | |
# print("Iteration #%d"%it) | |
# print("len(R):", len(R)) | |
# print("len(tasks): ",len(tasks)) | |
return assignments | |
def assign(self, assignments): | |
""" | |
assign ride requests to selected vehicles | |
""" | |
num_vehicles = len(assignments) | |
num_passengers = 0 | |
wait = 0 | |
effective_d = 0 | |
actual_d = 0 | |
for v in assignments: | |
r = assignments.get(v) | |
vehicle = self.vehicles[v] # pointer to a Vehicle object | |
request = self.requests.loc[r] | |
num_present_vehicle_pass = len(request) | |
num_passengers += num_present_vehicle_pass | |
request_sort = request.sort_values(by=["trip_time"]) | |
first_row = request_sort.iloc[0] | |
last_row = request_sort.iloc[-1] | |
# print (last_row) | |
ploc = (first_row.plat, first_row.plon) | |
dloc = (last_row.dlat, last_row.dlon) | |
# dloc = (last_row.dlat, last_row.dlon) | |
vloc = vehicle.location | |
# print (vloc[0], vloc[1], ploc[0], ploc[1]) | |
d = gh.distance_in_meters(vloc[0], vloc[1], ploc[0], ploc[1]) | |
# wait_time = 1 + d / (ASSIGNMENT_SPEED * 1000 / 60) | |
wait_time = (d * 2 / 1.414) / (self.ASSIGNMENT_SPEED * 1000 / 60) | |
# print (d) | |
##calculation for the trip time , effective distance, total distance | |
eff_distance = sum(request.trip_distance) | |
request_sort_copy = request_sort.copy() | |
request_sort.loc[:, "index_val"] = range(len(request_sort)) | |
request_sort_copy.loc[:, "index_val"] = request_sort["index_val"].values - 1 | |
request_sort_copy = request_sort_copy[["dlat", "dlon", "index_val"]] | |
request_sort_join = pd.merge( | |
request_sort, | |
request_sort_copy, | |
how="left", | |
on=None, | |
left_on="index_val", | |
right_on="index_val", | |
left_index=False, | |
right_index=False, | |
sort=False, | |
suffixes=("_x", "_y"), | |
copy=True, | |
indicator=False, | |
validate=None, | |
) | |
request_join_no_na = request_sort_join.dropna() | |
# print (len(request_join_no_na)) | |
# print (request_) | |
# request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x, | |
# request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
if len(request_join_no_na) > 0: | |
request_join_no_na["dist_bw_dest"] = gh.distance_in_meters( | |
request_join_no_na.dlat_x, | |
request_join_no_na.dlon_x, | |
request_join_no_na.dlat_y, | |
request_join_no_na.dlon_y, | |
) | |
actual_distance = first_row.trip_distance + (sum(request_join_no_na.dist_bw_dest)) / 1000 | |
trip_time = (actual_distance / self.ASSIGNMENT_SPEED) * 60 | |
# print ('shared....') | |
# print (type(actual_distance),type(trip_time)) | |
# print (trip_time) | |
else: | |
actual_distance = first_row.trip_distance | |
trip_time = first_row.trip_time | |
# print ('not shared....') | |
# print (type(actual_distance),type(trip_time)) | |
# print (trip_time) | |
# trip_time = request.trip_time | |
vehicle.start_service(dloc, wait_time, trip_time, actual_distance, eff_distance, num_present_vehicle_pass) | |
wait += wait_time | |
# effective_d +=eff_distance | |
# actual_d += actual_distance | |
original_trips = len(r) | |
return wait, num_vehicles, num_passengers, original_trips | |
def dispatch(self, actions): | |
cache = [] | |
distances = [] | |
vids, targets = zip(*actions) | |
vlocs = [self.vehicles[vid].location for vid in vids] | |
for vloc, tloc in zip(vlocs, targets): | |
try: | |
p, d, s, t = self.router.map_matching_shortest_path(vloc, tloc) | |
cache.append((p, s, t)) | |
distances.append(d) | |
except: | |
start_lat = vloc[0] | |
start_lon = vloc[1] | |
dest_lat = tloc[0] | |
dest_lon = tloc[1] | |
d = gh.distance_in_meters(start_lat, start_lon, dest_lat, dest_lon) | |
distances.append(d) | |
N = len(vids) | |
X = np.zeros((N, 7)) | |
X[:, 0] = self.dayofweek | |
X[:, 1] = self.minofday / 60.0 | |
X[:, 2:4] = vlocs | |
X[:, 4:6] = targets | |
X[:, 6] = distances | |
trip_times = self.eta_model.predict(X) | |
for i, vid in enumerate(vids): | |
if trip_times[i] > self.MIN_TRIPTIME: | |
# p, s, t = cache[i] | |
# step = distances[i] / (trip_times[i] * 60.0 / TIMESTEP) | |
# trajectory = self.router.generate_path(vlocs[i], targets[i], step, p, s, t) | |
eta = min(trip_times[i], self.max_action_time) | |
self.vehicles[vid].route([], eta) | |
return | |
class Vehicle(object): | |
""" | |
Status available location eta storage_id | |
WT: waiting 1 real 0 0 | |
MV: moving 1 real >0 0 | |
SV: serving 0 future >0 0 | |
ST: stored 0 real 0 >0 | |
CO: carry-out 0 real >0 r>0 | |
""" | |
def __init__(self, vehicle_id, location, params): | |
self.init_params(params) | |
self.id = vehicle_id | |
self.status = "WT" | |
self.location = location | |
self.zone = Geohash.encode(location[0], location[1], precision=self.GEOHASH_PRECISION) | |
self.available = True | |
self.trajectory = [] | |
self.eta = 0 | |
self.idle = 0 | |
self.total_idle = 0 | |
self.total_service = 0 | |
self.reward = 0 | |
self.effective_d = 0 | |
self.actual_d = 0 | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
self.capacity = 4 | |
####### EDITED HERE ######## | |
def init_params(self, params): | |
self.TIMESTEP = params.TIMESTEP | |
self.GEOHASH_PRECISION = params.GEOHASH_PRECISION | |
self.REJECT_DISTANCE = params.REJECT_DISTANCE | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
self.RIDE_REWARD = params.RIDE_REWARD | |
self.TRIP_REWARD = params.TRIP_REWARD | |
self.WAIT_COST = params.WAIT_COST | |
# self.HOP_REWARD = params.HOP_REWARD | |
# self.GOOD_REWARD = params.GOOD_REWARD | |
self.MIN_TRIPTIME = params.MIN_TRIPTIME # in meters | |
self.ASSIGNMENT_SPEED = params.ASSIGNMENT_SPEED # km/h (grand circle distance) | |
def update_location(self, location): | |
lat, lon = location | |
self.location = (lat, lon) | |
self.zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
def transition(self): | |
cost = 0 | |
if self.status != "SV": | |
self.idle += self.TIMESTEP / 60.0 | |
if self.eta > 0: | |
time = min(self.TIMESTEP / 60.0, self.eta) | |
self.eta -= time | |
# moving | |
# if self.trajectory: | |
if self.status == "MV": | |
# self.update_location(self.trajectory.pop(0)) | |
cost = time | |
self.reward -= cost | |
if self.eta <= 0: | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
####### EDITED HERE ######## | |
# serving -> waiting | |
if self.status == "SV": | |
self.available = True | |
self.status = "WT" | |
# moving -> waiting | |
elif self.status == "MV": | |
self.status = "WT" | |
return cost | |
def start_service(self, destination, wait_time, trip_time, actual_distance, eff_distance, num_pass): | |
# print (type(wait_time)) | |
# print (type(trip_time)) | |
# print type(destination) | |
if not self.available: | |
print("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.available = False | |
self.update_location(destination) | |
self.total_idle += self.idle + wait_time | |
self.idle = 0 | |
self.eta = wait_time + trip_time | |
self.total_service += trip_time | |
self.reward += self.RIDE_REWARD * num_pass - self.TRIP_REWARD * trip_time - self.WAIT_COST * wait_time | |
self.trajectory = [] | |
self.status = "SV" | |
self.effective_d += eff_distance | |
self.actual_d += actual_distance | |
return True | |
def route(self, path, trip_time): | |
if not self.available: | |
print("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.eta = trip_time | |
self.trajectory = path | |
self.status = "MV" | |
return True | |
def get_state(self): | |
if self.trajectory: | |
lat, lon = self.trajectory[-1] | |
dest_zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
else: | |
dest_zone = self.zone | |
lat, lon = self.location | |
#######EDITED HERE########### added self.passengers at the end | |
return ( | |
self.id, | |
int(self.available), | |
self.zone, | |
dest_zone, | |
self.eta, | |
self.status, | |
self.reward, | |
lat, | |
lon, | |
self.idle, | |
self.effective_d, | |
self.actual_d, | |
self.passengers, | |
) | |
#######EDITED HERE########### | |
def get_location(self): | |
lat, lon = self.location | |
if self.passengers >= self.capacity: | |
self.available = False | |
#######EDITED HERE########### | |
return (self.id, lat, lon, int(self.available), int(self.passengers), int(self.capacity)) | |
#######EDITED HERE########### | |
def get_score(self): | |
return (self.id, self.reward, self.total_service, self.total_idle) |