Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
FlexPool/simulator_Hybrid.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
672 lines (578 sloc)
24.9 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import pdb | |
import numpy as np | |
import pandas as pd | |
from pathgenerator import PathGenerator | |
import geohelper as gh | |
import geohash2 as Geohash | |
from utils.utils import Params | |
import sys | |
sys.path.append("/home/wenqi/Kaushik") | |
def index_pull(rows, trips_test_2): | |
val = trips_test_2[ | |
(trips_test_2["plat"] == rows["plat"].item()) & (trips_test_2["plon"] == rows["plon"].item()) | |
].index.values | |
return val | |
def merge_list_dicts(passenger_assignments, package_assignments): | |
assignments = {} | |
for i in set(list(passenger_assignments.keys()) + list(package_assignments.keys())): | |
try: | |
pass_ass = list(passenger_assignments[i]) | |
except: | |
pass_ass = [] | |
try: | |
pack_ass = list(package_assignments[i]) | |
except: | |
pack_ass = [] | |
assignments[i] = [*pass_ass, *pack_ass] | |
return assignments | |
class FleetSimulator(object): | |
""" | |
FleetSimulator is an environment in which fleets mobility, dispatch | |
and passenger pickup / dropoff are simulated. | |
Init Variables: (type) Description | |
router: (PathGenerator) Allows for finding shortest paths for distance | |
eta_model: (h5) This model predicts eta given pickup and dropoff locations | |
cycle: (int) number of cycles **to be better understood | |
max_action_time (int) Action update cycle **to be better understood | |
List of Methods: See Individual Signatures for each of the below | |
reset(self, num_vehicles, dataset, dayofweek, minofday) | |
update_time(self) | |
step(self, actions=None) ** MAIN RUNNING METHOD | |
get_requests(self, num_steps, offset=0) | |
get_vehicles_dataframe(self) ** calls get_state from vehicle class | |
get_vehicles_location(self) ** calls get_location from vehicle class | |
get_vehicles_score(self) ** calls get_score from vehicle class | |
""" | |
def __init__(self, params, G, eta_model, cycle, max_action_time=15): | |
self.params = params | |
print("LOADED SIMULATOR. ID: ", self.params.id) | |
self.init_params(params) | |
self.router = PathGenerator(G) | |
self.eta_model = eta_model | |
self.cycle = cycle | |
self.max_action_time = max_action_time | |
self.scenario = "Hybrid" | |
self.out_of_range = 0 #Out of Range Counter to increase reject distance | |
def init_params(self, params): | |
try: | |
self.DOWN_SAMPLE = params.DOWN_SAMPLE | |
except: | |
self.DOWN_SAMPLE = 1 | |
self.TIMESTEP = params.TIMESTEP | |
self.GEOHASH_PRECISION = params.GEOHASH_PRECISION | |
self.REJECT_DISTANCE = params.REJECT_DISTANCE | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
self.RIDE_REWARD = params.RIDE_REWARD | |
self.TRIP_REWARD = params.TRIP_REWARD | |
self.WAIT_COST = params.WAIT_COST | |
self.HOP_REWARD = params.HOP_REWARD | |
self.GOOD_REWARD = params.GOOD_REWARD | |
self.MIN_TRIPTIME = params.MIN_TRIPTIME # in meters | |
self.ASSIGNMENT_SPEED = params.ASSIGNMENT_SPEED # km/h (grand circle distance) | |
def reset(self, num_vehicles, dataset, dayofweek, minofday): | |
""" | |
This initiates the simulator environment taking the following arguments | |
Args: | |
self: see init | |
num_vehicles: (int) number of "resources" to initiate. | |
Ultimately instantiates this number of vehicle objects. | |
dataset:(pd.DataFrame) chunks loaded from experiments.py | |
dayofweek: (int) exactly what it sounds like | |
minofday: (int) exactly what it sounds like | |
""" | |
self.requests = dataset | |
self.current_time = 0 | |
self.minofday = minofday | |
self.dayofweek = dayofweek | |
# randomly assign initial locations based on where the first N pickups are to be made. | |
init_locations = self.requests[["plat", "plon"]].values[np.arange(num_vehicles) % len(self.requests)] | |
self.vehicles = [Vehicle(i, init_locations[i], self.params) for i in range(num_vehicles)] | |
def update_time(self): | |
self.current_time += self.TIMESTEP | |
self.minofday += int(self.TIMESTEP / 60.0) | |
if self.minofday >= 1440: | |
self.minofday -= 1440 | |
self.dayofweek = (self.dayofweek + 1) % 7 | |
def step(self, actions=None): | |
""" | |
step forward the environment by self.TIMESTEP. | |
This is the main function that updates the state space, takes actions from dqn, and returns rewards(score). | |
""" | |
num_steps = int(self.cycle * 60.0 / self.TIMESTEP) | |
if actions: | |
self.dispatch(actions) | |
requests = self.get_requests(num_steps) | |
wait, r_reject, g_reject, gas, idle, request_hop_zero = 0, 0, 0, 0, 0, 0 | |
for _ in range(num_steps): | |
for vehicle in self.vehicles: | |
# Is the only purpose of vehicle.transition to get gas usage? | |
gas += vehicle.transition() | |
idle += vehicle.idle | |
X = self.get_vehicles_location() | |
W = requests[(requests.second >= self.current_time) & (requests.second < self.current_time + self.TIMESTEP)] | |
W = W.sample(frac=self.DOWN_SAMPLE, random_state = 69) | |
# pdb.set_trace() | |
if len(W) == 0: | |
print("WE GOT NO REQUESTS") | |
# pdb.set_trace() | |
passenger_assignments, package_assignments = self.match_both(X, W) | |
wait_, num_vehicles, num_passengers, num_packages, request_hop_zero_, assignments = self.assign( | |
passenger_assignments, package_assignments | |
) | |
wait += wait_ | |
assignment_length = len(assignments) | |
#reject += len(W) - (num_passengers + num_packages) | |
r_reject += len(W[W.g_type=='r']) - num_passengers | |
g_reject += int((len(W[W.g_type!='r']) - num_packages) * 0.8) | |
#pdb.set_trace() | |
request_hop_zero += request_hop_zero_ | |
self.update_time() | |
vehicles = self.get_vehicles_dataframe() | |
v_score = self.get_vehicles_score() | |
return vehicles, requests, wait, r_reject, g_reject, gas, idle, request_hop_zero, v_score | |
def get_requests(self, num_steps, offset=0): | |
requests = self.requests[ | |
(self.requests.second >= self.current_time + offset * self.TIMESTEP) | |
& (self.requests.second < self.current_time + self.TIMESTEP * (num_steps + offset)) | |
] | |
return requests | |
def get_vehicles_dataframe(self): | |
vehicles = [vehicle.get_state() for vehicle in self.vehicles] | |
#######EDITED HERE########### added curr_passengers | |
vehicles = pd.DataFrame( | |
vehicles, | |
columns=[ | |
"id", | |
"available", | |
"geohash", | |
"dest_geohash", | |
"eta", | |
"status", | |
"reward", | |
"lat", | |
"lon", | |
"idle", | |
"eff_dist", | |
"act_dist", | |
"curr_passengers", | |
"curr_packages", | |
], | |
) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_location(self): | |
vehicles = [vehicle.get_location() for vehicle in self.vehicles] | |
#######EDITED HERE########### | |
vehicles = pd.DataFrame( | |
vehicles, | |
columns=[ | |
"id", | |
"lat", | |
"lon", | |
"available", | |
"curr_passengers", | |
"capacity_passengers", | |
"curr_packages", | |
"capacity_packages", | |
], | |
) | |
#######EDITED HERE########### | |
return vehicles | |
def get_vehicles_score(self): | |
vehicles = [vehicle.get_score() for vehicle in self.vehicles] | |
vehicles = pd.DataFrame(vehicles, columns=["id", "reward_score", "service_time", "idle_time"]) | |
return vehicles | |
def post_process(self, prelim_assignments, R, kind): | |
""""" | |
Helper function for matching algorithm. The job of this is to filter out assignments made in excess of the vehicle capacity constraints | |
""" "" | |
if kind == "RIDE_SHARE": | |
cap = "capacity_passengers" | |
curr = "curr_passengers" | |
else: | |
cap = "capacity_packages" | |
curr = "curr_packages" | |
assignments = {} | |
rejects = [] | |
# r:request ; v:vehicle | |
for r, v in prelim_assignments: | |
capacity_pickups = R[R.id == v][cap].values[0] | |
curr_pickups = R.loc[R.id == v][curr].values[0] | |
spots = int(capacity_pickups - curr_pickups) | |
if len(r) > spots: | |
# print('Shortened',len(r),'to ', len(r[:spots])) | |
r = r[:spots] | |
rejected = r[spots:] | |
for i in rejected: | |
rejects.append() | |
assignments[v] = r | |
return assignments, rejects | |
def separate_match(self, Resources, tasks, kind): | |
""""" | |
Rough Greedy Matching Algorithm for cars and passengers within a zone. Imposes constraints to the matching | |
""" "" | |
# print("~~~~~~~~~~~~~~~~ Running %s MATCH ~~~~~~~~~~~~~~~~"%kind) | |
Resources_in = Resources.copy() | |
assignments = {} | |
if kind == "RIDE_SHARE": | |
cap = "capacity_passengers" | |
curr = "curr_passengers" | |
else: | |
cap = "capacity_packages" | |
curr = "curr_packages" | |
Resources_in["remaining_spots"] = Resources_in[cap] - Resources_in[curr] | |
R = Resources_in[Resources_in.available == 1] | |
# Getting the locations of cars that are available | |
it = 0 | |
while len(tasks) > 0 and len(R) > 0: | |
tasks_uniq = tasks.groupby(["plat", "plon"]).count() | |
tasks_uniq = tasks_uniq.reset_index(level=["plat", "plon"]) | |
tasks_uniq = tasks_uniq[["plat", "plon"]] | |
tasks_uniq["index"] = tasks_uniq.apply( | |
lambda rows: tasks[ | |
(tasks["plat"] == rows["plat"].item()) & (tasks["plon"] == rows["plon"].item()) | |
].index.values, | |
axis=1, | |
) | |
N = min(len(tasks_uniq), R.remaining_spots.sum()) | |
# Calculate distances from each of the cars and unique pick up locations | |
d = gh.distance_in_meters( | |
R.lat.values, R.lon.values, tasks_uniq.plat.values[:, None], tasks_uniq.plon.values[:, None] | |
) | |
# Create a np zero array of the number of assignments to make | |
vids = np.zeros(N, dtype=int) | |
# Iterating through each task and picking out the closest car. | |
#vid shows the index of the closest car for each of the tasks. | |
for i in range(N): | |
vid = d[i].argmin() | |
if d[i, vid] > self.REJECT_DISTANCE: | |
vids[i] = -1 # assign -1 for a rejection | |
else: | |
vids[i] = vid | |
d[:, vid] = float("inf") # updating the d array for some reason. | |
# Getting indices of accepted tasks. Seems like its those which vid>0 | |
index_values = tasks_uniq.index[:N][vids >= 0] | |
if len(R["id"].iloc[vids[vids >= 0]]) < 1: | |
#print("All cars out of range: ",kind) | |
self.out_of_range += 1 | |
print("out of range tracker: ",self.out_of_range, "Available cars:", len(R)) | |
#if self.out_of_range > 5: | |
# print("REJECT DISTANCE UPDATED FROM ",self.REJECT_DISTANCE, "TO ", self.REJECT_DISTANCE*1.5) | |
# self.REJECT_DISTANCE *= 1.5 | |
#pdb.set_trace() | |
# self.out_of_range = 0 | |
break | |
#self.out_of_range -= 1 | |
prelim_assignments = list(zip(tasks_uniq.loc[index_values]["index"], R["id"].iloc[vids[vids >= 0]])) | |
true_assignments, rejects = self.post_process(prelim_assignments, R, kind) | |
#WE MAY HAVE AN ISSUE ON THIS LINE. WHAT IS .update doing? What happens if assignments already has cars | |
#assignments.update(true_assignments) | |
assignments = merge_list_dicts(assignments, true_assignments) | |
for car_id in assignments.keys(): | |
pickups_added = len(assignments[car_id]) | |
R.loc[R["id"].isin([car_id]), curr] += pickups_added | |
R.loc[R["id"].isin([car_id]), "remaining_spots"] -= pickups_added | |
R["available"] = R.apply(lambda row: 1 if row["remaining_spots"] >= 1 else 0, axis=1) | |
# print("len(R):", len(R)) | |
# print("len(tasks): ",len(tasks)) | |
R = R[R.available == 1] | |
tasks = tasks.iloc[rejects] | |
it += 1 | |
# print("Iteration #%d"%it) | |
# print("len(R):", len(R)) | |
# print("len(tasks): ",len(tasks)) | |
return assignments | |
def match_both(self, Resources, tasks): | |
""""" | |
Helper function to merge package and passenger assignment dictionaries. Its naive and untidy, please do not judge. | |
""" "" | |
# pdb.set_trace() | |
passenger_assignments = self.separate_match(Resources, tasks[tasks.g_type == "r"], kind="RIDE_SHARE") | |
package_assignments = self.separate_match(Resources, tasks[tasks.g_type != "r"], kind="GOODS") | |
return passenger_assignments, package_assignments | |
def assign(self, passenger_assignments, package_assignments): | |
""" | |
Request to vehicle assignments are already made using match(self,tasks,resources) | |
This method is more of a helper function that does assignments of requests in dataset to the vehicle class. | |
""" | |
assignments = merge_list_dicts(passenger_assignments, package_assignments) | |
# Total Number of passengers assigned in the entire step. | |
try: | |
num_vehicles = len(assignments.keys()) | |
except: | |
num_vehicles = 0 | |
request_hop_zero = 0 | |
num_passengers = 0 | |
num_packages = 0 | |
wait = 0 | |
effective_d = 0 | |
actual_d = 0 | |
for v in assignments: | |
r = assignments.get(v) | |
vehicle = self.vehicles[v] # pointer to a Vehicle object | |
request = self.requests.loc[r] | |
try: | |
num_present_vehicle_pass = len(passenger_assignments.get(v)) | |
except: | |
num_present_vehicle_pass = 0 | |
try: | |
num_present_vehicle_pack = len(package_assignments.get(v)) | |
except: | |
num_present_vehicle_pack = 0 | |
# pdb.set_trace() | |
request_hop_zero += sum(request["hop_flag"] == 0) | |
num_passengers += num_present_vehicle_pass | |
num_packages += num_present_vehicle_pack | |
request_sort = request.sort_values(by=["trip_time"]) | |
first_row = request_sort.iloc[0] | |
last_row = request_sort.iloc[-1] | |
ploc = (first_row.plat, first_row.plon) | |
dloc = (last_row.dlat, last_row.dlon) | |
vloc = vehicle.location | |
d = gh.distance_in_meters(vloc[0], vloc[1], ploc[0], ploc[1]) | |
# wait_time = 1 + d / (ASSIGNMENT_SPEED * 1000 / 60) | |
wait_time = (d * 2 / 1.414) / (self.ASSIGNMENT_SPEED * 1000 / 60) | |
# print (d) | |
##calculation for the trip time , effective distance, total distance | |
eff_distance = sum(request.trip_distance) | |
request_sort_copy = request_sort.copy() | |
request_sort.loc[:, "index_val"] = range(len(request_sort)) | |
request_sort_copy.loc[:, "index_val"] = request_sort["index_val"].values - 1 | |
request_sort_copy = request_sort_copy[["dlat", "dlon", "index_val"]] | |
request_sort_join = pd.merge( | |
request_sort, | |
request_sort_copy, | |
how="left", | |
on=None, | |
left_on="index_val", | |
right_on="index_val", | |
left_index=False, | |
right_index=False, | |
sort=False, | |
suffixes=("_x", "_y"), | |
copy=True, | |
indicator=False, | |
validate=None, | |
) | |
request_join_no_na = request_sort_join.dropna() | |
# print (len(request_join_no_na)) | |
# print (request_) | |
# request_join_no_na['dist_bw_dest'] = gh.distance_in_meters(request_join_no_na.dlat_x,request_join_no_na.dlon_x, | |
# request_join_no_na.dlat_y,request_join_no_na.dlon_y) | |
if len(request_join_no_na) > 0: | |
request_join_no_na["dist_bw_dest"] = gh.distance_in_meters( | |
request_join_no_na.dlat_x, | |
request_join_no_na.dlon_x, | |
request_join_no_na.dlat_y, | |
request_join_no_na.dlon_y, | |
) | |
actual_distance = first_row.trip_distance + (sum(request_join_no_na.dist_bw_dest)) / 1000 | |
trip_time = (actual_distance / self.ASSIGNMENT_SPEED) * 60 | |
# print ('shared....') | |
# print (type(actual_distance),type(trip_time)) | |
# print (trip_time) | |
else: | |
actual_distance = first_row.trip_distance | |
trip_time = first_row.trip_time | |
# print ('not shared....') | |
# print (type(actual_distance),type(trip_time)) | |
# print (trip_time) | |
# trip_time = request.trip_time | |
vehicle.start_service( | |
dloc, | |
wait_time, | |
trip_time, | |
actual_distance, | |
eff_distance, | |
num_present_vehicle_pass, | |
num_present_vehicle_pack, | |
) | |
wait += wait_time | |
# effective_d +=eff_distance | |
# actual_d += actual_distance | |
return wait, num_vehicles, num_passengers, num_packages, request_hop_zero, assignments | |
def dispatch(self, actions): | |
cache = [] | |
distances = [] | |
vids, targets = zip(*actions) | |
vlocs = [self.vehicles[vid].location for vid in vids] | |
for vloc, tloc in zip(vlocs, targets): | |
try: | |
p, d, s, t = self.router.map_matching_shortest_path(vloc, tloc) | |
cache.append((p, s, t)) | |
distances.append(d) | |
except: | |
start_lat = vloc[0] | |
start_lon = vloc[1] | |
dest_lat = tloc[0] | |
dest_lon = tloc[1] | |
d = gh.distance_in_meters(start_lat, start_lon, dest_lat, dest_lon) | |
distances.append(d) | |
N = len(vids) | |
X = np.zeros((N, 7)) | |
X[:, 0] = self.dayofweek | |
X[:, 1] = self.minofday / 60.0 | |
X[:, 2:4] = vlocs | |
X[:, 4:6] = targets | |
X[:, 6] = distances | |
trip_times = self.eta_model.predict(X) | |
for i, vid in enumerate(vids): | |
if trip_times[i] > self.MIN_TRIPTIME: | |
# p, s, t = cache[i] | |
# step = distances[i] / (trip_times[i] * 60.0 / self.TIMESTEP) | |
# trajectory = self.router.generate_path(vlocs[i], targets[i], step, p, s, t) | |
eta = min(trip_times[i], self.max_action_time) | |
self.vehicles[vid].route([], eta) | |
return | |
class Vehicle(object): | |
""" | |
Status available location eta storage_id | |
WT: waiting 1 real 0 0 | |
MV: moving 1 real >0 0 | |
SV: serving 0 future >0 0 | |
ST: stored 0 real 0 >0 | |
CO: carry-out 0 real >0 r>0 | |
""" | |
def __init__(self, vehicle_id, location, params): | |
self.init_params(params) | |
self.id = vehicle_id | |
self.status = "WT" | |
self.location = location | |
self.zone = Geohash.encode(location[0], location[1], precision=self.GEOHASH_PRECISION) | |
self.available = True | |
self.trajectory = [] | |
self.eta = 0 | |
self.idle = 0 | |
self.total_idle = 0 | |
self.total_service = 0 | |
self.reward = 0 | |
self.effective_d = 0 | |
self.actual_d = 0 | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
self.capacity_passengers = 4 | |
self.packages = 0 | |
self.capacity_packages = 10 | |
####### EDITED HERE ######## | |
def init_params(self, params): | |
self.TIMESTEP = params.TIMESTEP | |
self.GEOHASH_PRECISION = params.GEOHASH_PRECISION | |
self.REJECT_DISTANCE = params.REJECT_DISTANCE | |
# SERVICE_REWARD = RIDE_REWARD + TRIP_REWARD * trip_time - WAIT_COST * wait_time | |
self.RIDE_REWARD = params.RIDE_REWARD | |
self.TRIP_REWARD = params.TRIP_REWARD | |
self.WAIT_COST = params.WAIT_COST | |
self.HOP_REWARD = params.HOP_REWARD | |
self.GOOD_REWARD = params.GOOD_REWARD | |
self.MIN_TRIPTIME = params.MIN_TRIPTIME # in meters | |
self.ASSIGNMENT_SPEED = params.ASSIGNMENT_SPEED # km/h (grand circle distance) | |
def update_location(self, location): | |
lat, lon = location | |
self.location = (lat, lon) | |
self.zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
def transition(self): | |
cost = 0 | |
if self.status != "SV": | |
self.idle += self.TIMESTEP / 60.0 | |
if self.eta > 0: | |
time = min(self.TIMESTEP / 60.0, self.eta) | |
self.eta -= time | |
# moving | |
# if self.trajectory: | |
if self.status == "MV": | |
# self.update_location(self.trajectory.pop(0)) | |
cost = time | |
self.reward -= cost | |
if self.eta <= 0: | |
####### EDITED HERE ######## | |
self.passengers = 0 | |
self.packages = 0 | |
####### EDITED HERE ######## | |
# serving -> waiting | |
if self.status == "SV": | |
self.available = True | |
self.status = "WT" | |
# moving -> waiting | |
elif self.status == "MV": | |
self.status = "WT" | |
return cost | |
def start_service(self, destination, wait_time, trip_time, actual_distance, eff_distance, num_pass, num_pack): | |
if not self.available: | |
print("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.available = False | |
self.update_location(destination) | |
self.total_idle += self.idle + wait_time | |
######EDITED HERE######## | |
if actual_distance == 0: | |
num_hops = actual_distance | |
else: | |
num_hops = eff_distance / actual_distance | |
######EDITED HERE######## | |
self.idle = 0 | |
self.eta = wait_time + trip_time | |
self.total_service += trip_time | |
######### EDITED HERE ######### | |
self.passengers = num_pass | |
self.packages = num_pack | |
######### EDITED HERE ######### | |
self.reward += ( | |
(self.RIDE_REWARD * num_pass) | |
+ (self.GOOD_REWARD * num_pack) | |
+ (self.TRIP_REWARD * trip_time) | |
- (self.WAIT_COST * wait_time) | |
- (self.HOP_REWARD * num_hops) | |
) | |
self.trajectory = [] | |
self.status = "SV" | |
self.effective_d += eff_distance | |
self.actual_d += actual_distance | |
return True | |
def route(self, path, trip_time): | |
if not self.available: | |
print("The vehicle #%d is not available for service." % self.id) | |
return False | |
self.eta = trip_time | |
self.trajectory = path | |
self.status = "MV" | |
return True | |
def get_state(self): | |
if self.trajectory: | |
lat, lon = self.trajectory[-1] | |
dest_zone = Geohash.encode(lat, lon, precision=self.GEOHASH_PRECISION) | |
else: | |
dest_zone = self.zone | |
lat, lon = self.location | |
return ( | |
self.id, | |
int(self.available), | |
self.zone, | |
dest_zone, | |
self.eta, | |
self.status, | |
self.reward, | |
lat, | |
lon, | |
self.idle, | |
self.effective_d, | |
self.actual_d, | |
self.passengers, | |
self.packages, | |
) | |
def get_location(self): | |
lat, lon = self.location | |
if (self.passengers >= self.capacity_passengers) or (self.packages >= self.capacity_packages): | |
self.available = False | |
return ( | |
self.id, | |
lat, | |
lon, | |
int(self.available), | |
int(self.passengers), | |
int(self.capacity_passengers), | |
int(self.packages), | |
int(self.capacity_packages), | |
) | |
def get_score(self): | |
return (self.id, self.reward, self.total_service, self.total_idle) |