I am writing code that, at some later point, will be part of an academic multi-agent simulation of the learning process of car-sharing users and grid stability. This code deals with finding the price for incoming bookings.
The algorithm tries to find optimal charging strategies for electric vehicles in a car-sharing setting with the (conflicting) goals
- minimal energy price
- uploading energy to the grid during times of grid instability due to peak demand
- as little fossil fuels in the primary energy mix as possible maximal availability of the cars to users
The idle car charges when energy is cheap and discharges when the grid needs stabilizing - and at the same time, prices are high. Under my simplified circumstances, it earns money. I have historical data for the energy cost, grid stability, and percent CO2 in the energy mix.
The guiding principle for calculating the price for a booking is “The user pays for changes he causes.” If the cheapest time for charging becomes unavailable due to a booking, those additional costs are charged to the user causing the change, even if the electricity is not for her trip. A booking is given by start and end time (or duration) and the needed battery charge (or distance of the journey.)
This is historical data for the energy cost over time. To establish a baseline for the booking price, I first need to calculate the cost (or rather earnings) of the “idle” state of the car. So I look for the best charging and discharging times around the time of the booking.
The added boxes mark the peak demand times with reduced power grid stability, during which the car should discharge its battery.
Here I drew the optimal discharging (where the car earns money) and charging time (where the car spends money.) It takes three hours to (dis)charge the battery fully.
Here I added a booking worth one hour of battery charge, which also blocks some optimal (dis)charging periods.
The resulting discharging time is shorter, and the charging takes place at a later, more expensive time. It is also possible that the cheapest charging time is later, after the peak. The user would pay the cost for the missed opportunity cost of discharging during that period of reduced grid stability, too. My algorithm considers the price/CO2/stability forecast of the next 24h after the booking because that is the timeframe for such projections. The situation gets more complicated if many bookings are connected. Each time the last booking pays, the difference in costs caused by it.
Bookings are added one at a time. Each time, the best time of charging and discharging is found, the respective costs calculated, and the difference charged to the individual booking.
One general problem is that electric cars might need to recharge between bookings, which takes time. This code assumes linear charging and determines if booking requests can be accepted.
Initially, I generate some synthetic booking data - that will be replaced by real booking data later. A booking request consists of
- the start and end times and
- the distance, so we can estimate the energy (State of Charge, or SoC) required for the booking.
UPDATE: By now, I have feature-complete running code with optimization. The problem is that the optimization algorithm is not a good fit for the problem and also does not honor the constraints. Later, I will replace that with a linear programming algorithm. The code needs some cleanup, and I want to introduce classes. Could you please suggest the starting points for the classes? I have a hard time getting started on that.
from bisect import insort, bisect_left
from random import randint, shuffle, random
from typing import TypedDict
# import pyomo.environ as pyo
# from pyomo.opt import SolverFactory
import dateutil.parser as dp
import matplotlib.pyplot as plt
import numpy as np
from bottleneck import move_sum
from scipy.optimize import minimize, OptimizeResult
from my_csv import get_metrics_vector
NUM_BOOKINGS = 3
PAUSE_BETWEEN_BOOKINGS_SEC = 7 * 60 # in sec
PAUSE_BETWEEN_BOOKINGS_MAX = 3 * 60 * 60 # 3h, in sec
BOOKING_DURATION_MIN = 30 * 60 # in sec
BOOKING_DURATION_MAX = 2 * 60 * 60 # 2h in minutes
SPEED_MAX = 120 / 3600 # 120km/h in km/sec
SOC_MAX = 1.0 # normalized
SOC_PER_KM = SOC_MAX / 400 # so we can go 400km
CHARGE_TIME_MAX_SECONDS = 3 * 60 * 60 # in seconds
CHARGING_SOC_PER_SEC = SOC_MAX / (3 * 60 * 60) # fully charge in 3h
ONE_MINUTE = 60 # in seconds
DATA_INTERVAL = 15 * ONE_MINUTE
DURATION_FEE = 2.5 / 3600 # Euro per second
DISTANCE_FEE = 0.5 # Euro per km
ONE_DAY = 24 * 60 * 60 # in seconds
# https://docs.python.org/3/tutorial/datastructures.html#more-on-lists
# https://pythonexamples.org/python-list-of-dictionaries/
# https://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
booking_dt = np.dtype([("start", int),
("distance", float),
("duration", int),
("soc", float), # state of charge of the battery
("delta_soc", float), # change in battery charge
("score", float), # this is purely the environmental impact. time of booking does not count
("price", float),
("charge", bool),
("booking", bool),
("discharge", bool),
("cnt", int)]) # this is what the user is billed.
class OptimisationItem(TypedDict, total=False):
key: int
start: int # this might be not needed, as its the key in the dict
distance: float
duration: int
soc: float # state of charge of the battery
delta_soc: float # change in battery charge
score: float # this is purely the environmental impact. time of booking does not count
price: float # this is what the user is billed.
optional: bool
bound_high: int
bound_low: int
index: int
charge: bool
booking: bool
discharge: bool
modified: bool
def create_sequential_list_of_bookings(bookings_num: int) -> list[booking_dt]:
while True:
sequential_list_of_bookings: list[booking_dt] = []
initial_start: int = int(dp.parse('2021-11-04T00:05:23').timestamp())
initial_duration: int = 0
# add an inital starting point, so the graph and the algorithm has initial data.
initial_booking: booking_dt = np.zeros((4,), dtype=booking_dt)
initial_booking["start"][0:4] = initial_start
# initial_booking["start"] = initial_start
# initial_booking["duration"] = initial_duration
initial_booking["soc"][0:4] = SOC_MAX
sequential_list_of_bookings.append(initial_booking)
for booking_cnt in range(bookings_num):
pause: int = randint(PAUSE_BETWEEN_BOOKINGS_SEC, PAUSE_BETWEEN_BOOKINGS_MAX)
duration: int = randint(BOOKING_DURATION_MIN, BOOKING_DURATION_MAX)
distance_max: float = float(SPEED_MAX * duration)
distance: float = random() * distance_max
soc_needed: float = - distance * SOC_PER_KM
booking_start: int = sequential_list_of_bookings[-1]["start"][-1]
booking: booking_dt = np.zeros((4,), booking_dt)
booking['start'][0] = booking_start
booking["duration"][0] = duration
booking['distance'][0] = distance
booking["booking"][0] = True
booking['delta_soc'][0] = soc_needed
# pausing
booking["start"][1] = booking_start + duration
booking["duration"][1] = pause
# chargeing
booking["start"][2] = booking_start + duration + pause
booking["charge"][2] = True
# filler, so i can shift the charging time in the pause time window for optimisation.
booking["start"][3] = booking_start + duration + pause
sequential_list_of_bookings.append(booking)
if not check_if_schedule_is_impossible(sequential_list_of_bookings):
break
sequential_list_of_bookings.pop(0) # remove the initial starting point
return sequential_list_of_bookings
def check_if_schedule_is_impossible(list_of_bookings: list[booking_dt]) -> bool:
plan: np.array = np.array(list_of_bookings, dtype=booking_dt).reshape(-1)
# quick check if it is possible to charge enough
duration_trip = plan["duration"][::4].sum()
duration_pause = plan["duration"].sum() - duration_trip
soc_needed = plan["delta_soc"][::4].sum()
soc_charge_max = duration_pause * CHARGING_SOC_PER_SEC + 1
if soc_needed + soc_charge_max < 0:
print("problem:", soc_needed, ">", soc_charge_max)
impossible = True
else:
print("ok:", soc_needed, "<", soc_charge_max)
impossible = False
return impossible
def randomize_list_of_bookings(list_of_bookings: list[booking_dt]) -> list[booking_dt]:
randomized_list_of_bookings: list[booking_dt] = list_of_bookings.copy()
shuffle(randomized_list_of_bookings)
return randomized_list_of_bookings
def plan_charge_max(list_of_bookings: list[booking_dt]) -> np.array:
plan: np.array = np.array(list_of_bookings, dtype=booking_dt).reshape(-1)
plan["soc"][-1] = SOC_MAX
# quick check if it is possible to charge enough
for step in range(len(plan)):
plan["soc"][step] = max(0, plan["soc"][step - 1] + plan["delta_soc"][step - 1])
plan["start"][step] = plan["duration"][step - 1] + plan["start"][step - 1]
if plan["delta_soc"][step - 1] < 0:
pause_duration = plan["duration"][step:step + 2].sum()
possible_charge = pause_duration * CHARGING_SOC_PER_SEC
needed_charge = SOC_MAX - plan["soc"][step]
delta_soc = min(needed_charge, possible_charge)
plan["delta_soc"][step + 1] = delta_soc
charging_duration = delta_soc / CHARGING_SOC_PER_SEC
plan["duration"][step + 1] = charging_duration
plan["duration"][step] -= charging_duration
return plan
def valid_bookings(sequential_list_of_bookings: list[booking_dt]) -> tuple[bool, np.array]:
plan: np.array = plan_charge_max(sequential_list_of_bookings)
valid: bool = 0 not in plan["soc"]
return valid, plan
# finds start and end of stability incidents
# https://stackoverflow.com/questions/69457696/get-ranges-of-true-values-start-and-end-in-a-boolean-list-without-using-a-for
def np_vec2ran(x: np.array) -> np.array:
z = np.concatenate(([False], x, [False]))
start = np.flatnonzero(~z[:-1] & z[1:])
end = np.flatnonzero(z[:-1] & ~z[1:])
return np.column_stack((start, end - 1))
def find_max_price_for_discharge_interval(price: np.array, window_size: int) -> (float, int):
if len(price) <= window_size:
return price.sum(), 0
sum_array = move_sum(price, window=window_size)
max_index = np.nanargmax(sum_array) - window_size + 1
max_price = sum_array[max_index]
return max_price, max_index
def find_min_price_for_charge_interval(price: np.array, window_size) -> (float, int):
if len(price) <= window_size:
return price.sum(), 0
sum_array = move_sum(price, window=window_size)
min_index = np.nanargmin(sum_array) - window_size + 1
min_price = sum_array[min_index]
return min_price, min_index
def integrate(start, duration, time: np.array, price: np.array) -> float:
x_start = start[0]
x_end = start[0] + duration[0]
x_data: np.array = np.linspace(x_start, x_end, 10)
y_interp: np.array = np.interp(x_data, time, price)
area: float = np.trapz(y_interp, x_data)
return area
def soc_constraint(x: np.array, plan: np.array):
# x = np.append(x, CHARGE_TIME_MAX_SECONDS)
soc = SOC_MAX
durations = x[1::2]
i = 0
for plan_line in plan:
if plan_line["charge"]:
soc += durations[i] * CHARGING_SOC_PER_SEC
i += 1
elif plan_line["booking"]:
soc -= plan_line["delta_soc"]
else:
soc -= durations[i] * CHARGING_SOC_PER_SEC
i += 1
if soc < 0:
return soc
elif soc > SOC_MAX:
soc = SOC_MAX - soc
break
return soc
def bounds_constraint(x, bounds):
# x = np.append(x, CHARGE_TIME_MAX_SECONDS)
end_points = bounds - (x[::2] + x[1::2])
smallest_end_point = np.min(end_points)
return smallest_end_point
def objective_func(x: np.array, args_list: list[np.array]) -> float:
# x = np.append(x, CHARGE_TIME_MAX_SECONDS)
metrics_v = args_list[0]
plan = args_list[1]
x_index = 0
for plan_line in plan:
if plan_line["booking"] == True:
continue
elif plan_line["charge"] == True or plan_line["discharge"] == True:
plan_line["start"] = x[x_index]
x_index += 1
plan_line["duration"] = x[x_index]
x_index += 1
# start values for soc=SOC_MAX and delta_soc: 0
plan["soc"][-1] = SOC_MAX
plan["delta_soc"][-1] = 0
plan["duration"][-1] = 0
# straighten out the plan, calculate price and soc
for step in range(len(plan)):
plan["soc"][step] = plan["soc"][step - 1] + plan["delta_soc"][step - 1]
if plan["distance"][step] != 0: # this is a booking
continue
elif plan["charge"][step] == False: # discharge
plan["delta_soc"][step] = - plan["duration"][step] * CHARGING_SOC_PER_SEC
plan["price"][step] = - integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
metrics_v["price"] - metrics_v["price_avg"])
else: # charge
plan["delta_soc"][step] = plan["duration"][step] * CHARGING_SOC_PER_SEC
plan["price"][step] = + integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
metrics_v["price_co2"] - metrics_v["price_co2_avg"])
# last step, which the search algorithm never gets right
plan["soc"][-1] = plan["soc"][-2] + plan["delta_soc"][-2]
plan["duration"][-1] = (SOC_MAX - plan["soc"][- 1]) / CHARGING_SOC_PER_SEC
plan["price"][-1] = + integrate(plan["start"][-1], plan["duration"][-1], metrics_v["time"],
metrics_v["price_co2"] - metrics_v["price_co2_avg"])
price: float = plan["price"].sum()
return price
def optimise_discharge_plan(plan: np.array, optimisation_items, metrics_v: np.array) -> np.array:
x_initial = []
bounds = []
constraints = []
bounds_high_l = []
for item in optimisation_items:
if "booking" in item:
continue
x_initial.append(item["start"])
x_initial.append(item["duration"])
bound_low = item["bound_low"]
bound_high = item["bound_high"]
bounds_high_l.append(bound_high)
bounds.append((bound_low, bound_high))
bounds.append((0, min(CHARGE_TIME_MAX_SECONDS, bound_high - bound_low)))
bounds_high = np.array(bounds_high_l)
constraints.append({"type": "ineq", "fun": bounds_constraint, "args": (bounds_high,)})
constraints.append({"type": "ineq", "fun": soc_constraint, "args": (plan,)})
result: OptimizeResult = minimize(objective_func, np.array(x_initial),
args=[metrics_v, plan],
method="SLSQP",
bounds=bounds,
constraints=constraints,
# options={"eps": 1 , "maxiter": 1000, "ftol": 1.0, "disp": True},
tol=1)
check_plan_consistency(plan, bounds)
print("\n", result, "\n", plan)
return result.x
def pad_plan_with_pauses(plan: np.array) -> np.array:
padded_plan = np.zeros(len(plan) * 2 - 1, dtype=booking_dt)
for cnt, row in enumerate(plan):
padded_plan[cnt * 2] = row
if cnt < len(plan) - 1:
padded_plan[cnt * 2 + 1]["start"] = row["start"] + row["duration"]
padded_plan[cnt * 2 + 1]["duration"] = plan[cnt + 1]["start"] - row["start"] - row["duration"]
padded_plan[cnt * 2 + 1]["soc"] = row["soc"] + row["delta_soc"]
return padded_plan
def plot_plan(plan_for_plotting, metrics_v, instabilities):
plan = pad_plan_with_pauses(plan_for_plotting)
fig, ax_left = plt.subplots()
ax_right = ax_left.twinx()
ax_right.plot(plan["start"], plan["soc"], label="State of Charge", color="black")
ax_left.plot(metrics_v["time"], metrics_v["price"], label="price", color="blue")
#ax_left.plot(metrics_v["time"], metrics_v["price_avg"], label="avg price")
#ax_left.plot(metrics_v["time"], metrics_v["price_co2"], label="price with co2 costs")
#ax_left.plot(metrics_v["time"], metrics_v["price_co2_avg"], label="avg price with co2 costs")
instability_flag = True
for i in instabilities:
if instability_flag:
ax_left.axvspan(metrics_v["time"][i[0]], metrics_v["time"][i[1]], alpha=0.5, color='red',
label="instability")
instability_flag = False
else:
ax_left.axvspan(metrics_v["time"][i[0]], metrics_v["time"][i[1]], alpha=0.5, color='red')
discharge_flag = True
charge_flag = True
booking_flag = True
cnt = 0
for i in range(len(plan)):
if plan["distance"][i] > 0: # this is a booking
if booking_flag:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="yellow",
label="booking")
booking_flag = False
else:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="yellow")
elif not plan["charge"][i] and plan["delta_soc"][i] != 0:
if discharge_flag:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="blue",
label="discharge")
discharge_flag = False
else:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="blue")
elif plan["charge"][i] and plan["delta_soc"][i] != 0:
if charge_flag:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="green",
label="charge")
charge_flag = False
else:
ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="green")
else:
# this is a pause
pass
ax_left.set_xlabel("time in seconds")
ax_left.set_ylabel("price in €/kWs", color="blue")
ax_right.set_ylabel("soc (normalized)", color="black")
legend_1 = ax_left.legend(loc=2, borderaxespad=1.)
legend_1.remove()
ax_right.legend(loc=1, borderaxespad=1.)
ax_right.add_artist(legend_1)
plt.show()
def check_plan_consistency(plan: np.array, bounds: list[tuple[float, float]]):
# start time must be increasing
start = 0
for cnt, i in enumerate(plan):
if i["start"] > start:
start = i["start"]
else:
print(f"{cnt}\n{i}\n{len(plan)}\n{plan},{len(bounds)}")
raise Exception("Plan is not sorted")
def fill_in_plan(plan: np.array, x: np.array, metrics_v: np.array) -> np.array:
# fill in the plan with the optimised times
events = x.reshape(-1, 2)
skip = 0
for cnt, (start, duration) in enumerate(events):
index = cnt + skip
while plan[index]["distance"] > 0: # this is a booking
plan[index]["delta_soc"] = - plan[index]["distance"] * SOC_PER_KM
skip += 1
index = cnt + skip
plan[index]["start"] = start
plan[index]["duration"] = duration
plan["start"][-1] = plan["start"][-2] + plan["duration"][-2]
plan["soc"][-1] = SOC_MAX
plan["delta_soc"][-1] = 0
plan["duration"][-1] = 0
# straighten out the plan, calculate price and soc
for step in range(len(plan)):
plan["soc"][step] = plan["soc"][step - 1] + plan["delta_soc"][step - 1]
if step % 2 == 0:
plan["delta_soc"][step] = - plan["duration"][step] * CHARGING_SOC_PER_SEC
plan["price"][step] = - integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
metrics_v["price"])
else:
plan["delta_soc"][step] = plan["duration"][step] * CHARGING_SOC_PER_SEC
plan["price"][step] = + integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
metrics_v["price_co2"])
# last step, which the search algorithm never gets right
plan["soc"][-1] = plan["soc"][-2] + plan["delta_soc"][-2]
plan["duration"][-1] = (SOC_MAX - plan["soc"][- 1]) / CHARGING_SOC_PER_SEC
plan["delta_soc"][-1] = - plan["duration"][-1] * CHARGING_SOC_PER_SEC
plan["price"][-1] = + integrate(plan["start"][-1], plan["duration"][-1], metrics_v["time"], metrics_v["price_co2"])
return plan
def get_data_arrays(end, start):
metrics_v: np.array = get_metrics_vector()
boundary_index = metrics_v["time"].searchsorted([start, end])
sub_metric_v = metrics_v[boundary_index[0]:boundary_index[1]].copy()
stability_ranges = np_vec2ran((sub_metric_v["stability"]))
instability_ranges = np_vec2ran(sub_metric_v["instability"])
while stability_ranges[0][0] < instability_ranges[0][0]:
stability_ranges = stability_ranges[1:]
return instability_ranges, stability_ranges, sub_metric_v
def create_stability_optimisation_items(stability_ranges: np.array, sub_metric_v: np.array) -> list[OptimisationItem]:
stability_item_list = []
for stability_range in stability_ranges:
if stability_range[0] == stability_range[1]:
time_min_price = stability_range[0]
window_size = 1
else:
window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), stability_range[1] - stability_range[0])
_, min_index = find_min_price_for_charge_interval(
sub_metric_v["price"][stability_range[0]:stability_range[1]], window_size)
time_min_price = sub_metric_v["time"][stability_range[0] + min_index]
stability_item: OptimisationItem = {"key": sub_metric_v["time"][stability_range[0]],
"bound_low": sub_metric_v["time"][stability_range[0]],
"bound_high": sub_metric_v["time"][stability_range[1]] + 15 * 60 - 1,
"optional": True,
"charge": True,
"duration": window_size * DATA_INTERVAL,
"start": time_min_price,
"modified": False}
stability_item_list.append(stability_item)
return stability_item_list
def create_instability_optimisation_items(instability_ranges: np.array, sub_metric_v: np.array) -> list[
OptimisationItem]:
instability_item_list = []
for instability_range in instability_ranges:
if instability_range[0] == instability_range[1]:
time_max_price = instability_range[0]
window_size = 1
else:
window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), instability_range[1] - instability_range[0])
_, max_index = find_max_price_for_discharge_interval(
sub_metric_v["price"][instability_range[0]:instability_range[1]], window_size)
time_max_price = sub_metric_v["time"][instability_range[0] + max_index]
instability_item: OptimisationItem = {"key": sub_metric_v["time"][instability_range[0]],
"bound_low": sub_metric_v["time"][instability_range[0]],
"bound_high": sub_metric_v["time"][instability_range[1]] + 15 * 60 - 1,
"optional": True,
"discharge": True,
"duration": window_size * DATA_INTERVAL,
"start": time_max_price,
"modified": False}
instability_item_list.append(instability_item)
return instability_item_list
def create_booking_optimisation_items(booking_list: list[booking_dt]) -> list[OptimisationItem]:
booking_item_list = []
for booking in booking_list:
booking_item: OptimisationItem = {"key": booking["start"],
"bound_low": booking["start"],
"bound_high": booking["start"] + booking["duration"],
"optional": False,
"booking": True,
"charge": False,
"duration": booking["duration"],
"start": booking["start"],
"distance": booking["distance"],
}
booking_item_list.append(booking_item)
return booking_item_list
def turn_booking_into_charging_plan(start: int, end: int, bookings_list: list[booking_dt]) -> tuple[np.array, float]:
# schedule discharge interval per stability incident
instability_ranges, stability_ranges, sub_metric_v = get_data_arrays(end, start)
instability_item_list = create_instability_optimisation_items(instability_ranges, sub_metric_v)
stability_item_list = create_stability_optimisation_items(stability_ranges, sub_metric_v)
booking_item_list = create_booking_optimisation_items(bookings_list)
item_list = sorted(booking_item_list + instability_item_list + stability_item_list, key=lambda x: x["key"])
# pass over the item list several times.
# only optional items are modified
# optional items don't intersect or overlap with each other
# first pass: see if the item's bounds collide with the previous item
# are the start times identical? -> change the optional item
# does a non-optional item intersect an optional item? -> spit the optional item into two and have it continue after the higher prio item, adjust the bounds and durations of the two optional items
# does an optional item intersect a non-optional item? -> delete the optional item
# does an optional item overlap with a non-optional item? -> adjust the optional item
# if we change or modify an item, we set the modified flag to true
sort_flag = True
sort_cnt = 0
while sort_flag:
sort_flag = False
sort_flag = adjust_item_boundaries(item_list, sort_flag)
if sort_flag:
sort_cnt += 1
if sort_cnt > 5: # how many (dis)charging items can intersect a booking?
raise Exception("Too many sorting iterations")
item_list.sort(key=lambda x: x["key"])
print("sorting")
# second pass: adjust the start times and duration of modified optional items
adjust_initial_optimization_values(item_list, sub_metric_v)
key=0
start=0
for cnt, i in enumerate(item_list):
if key > i["key"]:
raise Exception("key error at ", cnt)
key = i["key"]
if start > i["start"]:
raise Exception("start error at ", cnt)
start = i["start"]
# construct the charging plan skeletal from the item list
events_l: list[booking_dt] = []
for cnt, item in enumerate(item_list):
event = np.zeros(1, dtype=booking_dt)
event["cnt"] = cnt
if item["optional"]:
if "charge" in item:
event["start"] = item["start"]
event["duration"] = item["duration"]
event["charge"] = True
else:
event["start"] = item["start"]
event["duration"] = item["duration"]
event["discharge"] = True
else:
event["start"] = item["start"]
event["duration"] = item["duration"]
event["distance"] = item["distance"]
event["booking"] = True
event["delta_soc"] = - item["distance"] * SOC_PER_KM
event["charge"] = False
events_l.append(event)
plan = np.array(events_l, dtype=booking_dt)
times = optimise_discharge_plan(plan, item_list, sub_metric_v)
plan = fill_in_plan(plan, times, sub_metric_v)
price = plan["price"].sum()
plot_plan(plan, sub_metric_v, instability_ranges)
return plan, price
def adjust_initial_optimization_values(item_list, sub_metric_v):
for item in item_list:
if "booking" in item:
continue # don't touch bookings
if "charge" or "dicharge" in item:
boundary_index = sub_metric_v["time"].searchsorted([item["bound_low"], item["bound_high"]])
if item["bound_high"] - item["bound_low"] < CHARGE_TIME_MAX_SECONDS: # short window
item["start"] = item["bound_low"]
item["duration"] = item["bound_high"] - item["bound_low"]
item["modified"] = False
elif "charge" in item: # long charge window
# find the minimum price in the interval
window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), boundary_index[1] - boundary_index[0])
_, min_index = find_min_price_for_charge_interval(
sub_metric_v["price"][boundary_index[0]:boundary_index[1]], window_size)
start = sub_metric_v["time"][boundary_index[0] + min_index]
item["start"] = start
item["duration"] = window_size * DATA_INTERVAL
# the above calculation was approximate, because we looked at 900s intervals
if item["start"] < item["bound_low"]:
item["start"] = item["bound_low"]
if item["start"] + item["duration"] > item["bound_high"]:
item["start"] = item["bound_high"] - item["duration"]
else: # long discharge window
# find the maximum price in the interval
window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), boundary_index[1] - boundary_index[0])
_, max_index = find_max_price_for_discharge_interval(
sub_metric_v["price"][boundary_index[0]:boundary_index[1]], window_size)
item["start"] = sub_metric_v["time"][boundary_index[0] + max_index]
item["duration"] = window_size * DATA_INTERVAL
# the above calculation was approximate, because we looked at 900s intervals
if item["start"] < item["bound_low"]:
item["start"] = item["bound_low"]
if item["start"] + item["duration"] > item["bound_high"]:
item["start"] = item["bound_high"] - item["duration"]
item["modified"] = False
# check if start time is within the bounds
if item["modified"]:
raise Exception(f"what item is this?\n{item}")
if item["start"] < item["bound_low"]:
print("booking: start time is lower than lower bound by", item["bound_low"] - item["start"],
"seconds")
item["start"] = item["bound_low"]
if item["start"] + item["duration"] > item["bound_high"]:
print("booking: end time is higher than upper bound by",
item["start"] + item["duration"] - item["bound_high"],
"seconds")
item["start"] = item["bound_high"] - item["duration"]
item["modified"] = False
def adjust_item_boundaries(item_list, sort_flag):
for i, item in enumerate(item_list):
if not "booking" in item:
# we only look at charge or discharge items
if i > 0:
previous_item = item_list[i - 1]
# if the two items intersect, the other one is a booking and this item needs adjustment
if item["bound_high"] <= previous_item["bound_high"]:
# this optional item fully intersects a booking item
# delete this optional item
item_list.pop(i)
# we don't need to increment i, because the next item will be at the same index
continue
elif item["bound_low"] <= previous_item["bound_high"]:
item["bound_low"] = previous_item["bound_high"] + 1
item["modified"] = True
if i < len(item_list) - 1:
next_item = item_list[i + 1]
if item["bound_low"] == next_item["bound_low"] and item["bound_high"] > next_item["bound_high"]:
# an optional item has the same start time as a non-optional item and is longer
# change the optional item
item["bound_low"] = next_item["bound_high"] + 1
item["key"] = next_item["bound_high"] + 1
item["modified"] = True
# items need to be sorted again
sort_flag = True
if item["bound_high"] == next_item["bound_high"] and item["bound_low"] == next_item["bound_low"]:
# an optional item has the same start time and duration as a booking item
# remove the optional item
item_list.pop(i)
# we don't need to increment i, because the next item will be at the same index
continue
if next_item["bound_high"] < item["bound_high"]:
# an optional item fully intersects a non-optional item
# split the optional item into two, around the non-optional item
# the first optional item continues after the non-optional item
# the second optional item is the remainder of the original optional item
new_item = {"key": next_item["bound_high"] + 1,
"bound_low": next_item["bound_high"] + 1,
"bound_high": item["bound_high"],
"optional": True,
"modified": True}
if "charge" in item:
new_item["charge"] = True
else:
new_item["discharge"] = True
item_list.insert(i + 2, new_item)
item["bound_high"] = next_item["bound_low"] - 1
item["modified"] = True
sort_flag = True
if item["bound_high"] >= next_item["bound_low"]:
# an optional item overlaps with a non-optional item
# adjust the optional item
item["bound_high"] = next_item["bound_low"] - 1
item["modified"] = True
return sort_flag
def calculate_price_for_booking(booking: booking_dt, bookings_list: list[booking_dt],
old_bookings_list: list[booking_dt], old_price) -> np.array:
# we really want to go to integer time
start_time = bookings_list[0]["start"] - ONE_DAY
end_time = bookings_list[-1]["start"] + bookings_list[-1]["duration"] + ONE_DAY
if len(old_bookings_list) == 0 or \
bookings_list[0]["start"] != old_bookings_list[0]["start"] or \
bookings_list[-1]["start"] != old_bookings_list[-1]["start"]:
# the timeframe changed, so we need to recalculate the old plan on the new timeframe
old_charging_plan, old_price = turn_booking_into_charging_plan(start_time, end_time, old_bookings_list)
charging_plan, price = turn_booking_into_charging_plan(start_time, end_time, bookings_list)
caused_cost: float = old_price - price
booking["price"] = booking["duration"] * DURATION_FEE + booking["distance"] * DISTANCE_FEE + caused_cost
print(f"\033[0;32m{price}\033[00m ")
return charging_plan, price
def process_booking_requests(requests: list[booking_dt]):
booking_plan: np.array = None
charging_plan_list = []
sequential_list_of_bookings: list[booking_dt] = []
for request in requests:
old_booking_list = sequential_list_of_bookings.copy()
insort(sequential_list_of_bookings, request,
key=lambda x: x["start"][0])
valid, booking_plan = valid_bookings(sequential_list_of_bookings)
price: float = 0
if not valid:
start: int = request['start'][0]
index: int = bisect_left(sequential_list_of_bookings, start,
key=lambda x: x["start"][0])
del sequential_list_of_bookings[index]
else:
sequential_trip_requests = extract_trip_events(sequential_list_of_bookings)
trip_request = request[0]
old_trip_requests = extract_trip_events(old_booking_list)
charging_plan, price = calculate_price_for_booking(trip_request, sequential_trip_requests,
old_trip_requests, price)
charging_plan_list.append(charging_plan)
return sequential_list_of_bookings, booking_plan
def extract_trip_events(requests):
extracted_trips = []
for request in requests:
extracted_trips.append(request[0])
return extracted_trips
def main():
sequential_list_of_bookings = create_sequential_list_of_bookings(NUM_BOOKINGS)
charging_plan_max: np.array = plan_charge_max(sequential_list_of_bookings)
randomized_list_of_bookings = randomize_list_of_bookings(sequential_list_of_bookings)
accepted_list_of_bookings, accepted_bookings_a = process_booking_requests(randomized_list_of_bookings)
if __name__ == "__main__":
main()