3
\$\begingroup\$

I am writing code that, at some later point, will be part of an academic multi-agent simulation of the learning process of car-sharing users and grid stability. This code deals with finding the price for incoming bookings.

The algorithm tries to find optimal charging strategies for electric vehicles in a car-sharing setting with the (conflicting) goals

  • minimal energy price
  • uploading energy to the grid during times of grid instability due to peak demand
  • as little fossil fuels in the primary energy mix as possible maximal availability of the cars to users

The idle car charges when energy is cheap and discharges when the grid needs stabilizing - and at the same time, prices are high. Under my simplified circumstances, it earns money. I have historical data for the energy cost, grid stability, and percent CO2 in the energy mix.

The guiding principle for calculating the price for a booking is “The user pays for changes he causes.” If the cheapest time for charging becomes unavailable due to a booking, those additional costs are charged to the user causing the change, even if the electricity is not for her trip. A booking is given by start and end time (or duration) and the needed battery charge (or distance of the journey.)

Energy cost over time (from historic data)

This is historical data for the energy cost over time. To establish a baseline for the booking price, I first need to calculate the cost (or rather earnings) of the “idle” state of the car. So I look for the best charging and discharging times around the time of the booking.

Price over time, with periods of reduced grid stability

The added boxes mark the peak demand times with reduced power grid stability, during which the car should discharge its battery.

enter image description here

Here I drew the optimal discharging (where the car earns money) and charging time (where the car spends money.) It takes three hours to (dis)charge the battery fully. adding a booking worth 1h of battery charge, and blocking some earlier charging and discharging times

Here I added a booking worth one hour of battery charge, which also blocks some optimal (dis)charging periods.

The booking causes some lost-opportunity-cost which is added to the booking's cost

The resulting discharging time is shorter, and the charging takes place at a later, more expensive time. It is also possible that the cheapest charging time is later, after the peak. The user would pay the cost for the missed opportunity cost of discharging during that period of reduced grid stability, too. My algorithm considers the price/CO2/stability forecast of the next 24h after the booking because that is the timeframe for such projections. The situation gets more complicated if many bookings are connected. Each time the last booking pays, the difference in costs caused by it.

Bookings are added one at a time. Each time, the best time of charging and discharging is found, the respective costs calculated, and the difference charged to the individual booking.

One general problem is that electric cars might need to recharge between bookings, which takes time. This code assumes linear charging and determines if booking requests can be accepted.

Initially, I generate some synthetic booking data - that will be replaced by real booking data later. A booking request consists of

  • the start and end times and
  • the distance, so we can estimate the energy (State of Charge, or SoC) required for the booking.

UPDATE: By now, I have feature-complete running code with optimization. The problem is that the optimization algorithm is not a good fit for the problem and also does not honor the constraints. Later, I will replace that with a linear programming algorithm. The code needs some cleanup, and I want to introduce classes. Could you please suggest the starting points for the classes? I have a hard time getting started on that.

This is an example plot generated by the code - you can see how the optimizing algorithm fails to observe the constraint that the battery can only be fully charged, not 110% or so.

from bisect import insort, bisect_left
from random import randint, shuffle, random
from typing import TypedDict

# import pyomo.environ as pyo
# from pyomo.opt import SolverFactory
import dateutil.parser as dp
import matplotlib.pyplot as plt
import numpy as np
from bottleneck import move_sum
from scipy.optimize import minimize, OptimizeResult

from my_csv import get_metrics_vector

NUM_BOOKINGS = 3
PAUSE_BETWEEN_BOOKINGS_SEC = 7 * 60  # in sec
PAUSE_BETWEEN_BOOKINGS_MAX = 3 * 60 * 60  # 3h, in sec
BOOKING_DURATION_MIN = 30 * 60  # in sec
BOOKING_DURATION_MAX = 2 * 60 * 60  # 2h in minutes
SPEED_MAX = 120 / 3600  # 120km/h in km/sec
SOC_MAX = 1.0  # normalized
SOC_PER_KM = SOC_MAX / 400  # so we can go 400km
CHARGE_TIME_MAX_SECONDS = 3 * 60 * 60  # in seconds
CHARGING_SOC_PER_SEC = SOC_MAX / (3 * 60 * 60)  # fully charge in 3h
ONE_MINUTE = 60  # in seconds
DATA_INTERVAL = 15 * ONE_MINUTE
DURATION_FEE = 2.5 / 3600  # Euro per second
DISTANCE_FEE = 0.5  # Euro per km
ONE_DAY = 24 * 60 * 60  # in seconds

# https://docs.python.org/3/tutorial/datastructures.html#more-on-lists
# https://pythonexamples.org/python-list-of-dictionaries/
# https://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list


booking_dt = np.dtype([("start", int),
                       ("distance", float),
                       ("duration", int),
                       ("soc", float),  # state of charge of the battery
                       ("delta_soc", float),  # change in battery charge
                       ("score", float),  # this is purely the environmental impact. time of booking does not count
                       ("price", float),
                       ("charge", bool),
                       ("booking", bool),
                       ("discharge", bool),
                       ("cnt", int)])  # this is what the user is billed.


class OptimisationItem(TypedDict, total=False):
    key: int
    start: int  # this might be not needed, as its the key in the dict
    distance: float
    duration: int
    soc: float  # state of charge of the battery
    delta_soc: float  # change in battery charge
    score: float  # this is purely the environmental impact. time of booking does not count
    price: float  # this is what the user is billed.
    optional: bool
    bound_high: int
    bound_low: int
    index: int
    charge: bool
    booking: bool
    discharge: bool
    modified: bool


def create_sequential_list_of_bookings(bookings_num: int) -> list[booking_dt]:
    while True:
        sequential_list_of_bookings: list[booking_dt] = []
        initial_start: int = int(dp.parse('2021-11-04T00:05:23').timestamp())
        initial_duration: int = 0

        # add an inital starting point, so the graph and the algorithm has initial data.
        initial_booking: booking_dt = np.zeros((4,), dtype=booking_dt)
        initial_booking["start"][0:4] = initial_start
        #    initial_booking["start"] = initial_start
        #    initial_booking["duration"] = initial_duration
        initial_booking["soc"][0:4] = SOC_MAX
        sequential_list_of_bookings.append(initial_booking)

        for booking_cnt in range(bookings_num):
            pause: int = randint(PAUSE_BETWEEN_BOOKINGS_SEC, PAUSE_BETWEEN_BOOKINGS_MAX)
            duration: int = randint(BOOKING_DURATION_MIN, BOOKING_DURATION_MAX)
            distance_max: float = float(SPEED_MAX * duration)
            distance: float = random() * distance_max
            soc_needed: float = - distance * SOC_PER_KM

            booking_start: int = sequential_list_of_bookings[-1]["start"][-1]

            booking: booking_dt = np.zeros((4,), booking_dt)
            booking['start'][0] = booking_start
            booking["duration"][0] = duration
            booking['distance'][0] = distance
            booking["booking"][0] = True
            booking['delta_soc'][0] = soc_needed
            # pausing
            booking["start"][1] = booking_start + duration
            booking["duration"][1] = pause
            # chargeing
            booking["start"][2] = booking_start + duration + pause
            booking["charge"][2] = True
            # filler, so i can shift the charging time in the pause time window for optimisation.
            booking["start"][3] = booking_start + duration + pause
            sequential_list_of_bookings.append(booking)

        if not check_if_schedule_is_impossible(sequential_list_of_bookings):
            break
    sequential_list_of_bookings.pop(0)  # remove the initial starting point

    return sequential_list_of_bookings


def check_if_schedule_is_impossible(list_of_bookings: list[booking_dt]) -> bool:
    plan: np.array = np.array(list_of_bookings, dtype=booking_dt).reshape(-1)

    # quick check if it is possible to charge enough
    duration_trip = plan["duration"][::4].sum()
    duration_pause = plan["duration"].sum() - duration_trip
    soc_needed = plan["delta_soc"][::4].sum()
    soc_charge_max = duration_pause * CHARGING_SOC_PER_SEC + 1
    if soc_needed + soc_charge_max < 0:
        print("problem:", soc_needed, ">", soc_charge_max)
        impossible = True
    else:
        print("ok:", soc_needed, "<", soc_charge_max)
        impossible = False
    return impossible


def randomize_list_of_bookings(list_of_bookings: list[booking_dt]) -> list[booking_dt]:
    randomized_list_of_bookings: list[booking_dt] = list_of_bookings.copy()
    shuffle(randomized_list_of_bookings)
    return randomized_list_of_bookings


def plan_charge_max(list_of_bookings: list[booking_dt]) -> np.array:
    plan: np.array = np.array(list_of_bookings, dtype=booking_dt).reshape(-1)

    plan["soc"][-1] = SOC_MAX

    # quick check if it is possible to charge enough
    for step in range(len(plan)):
        plan["soc"][step] = max(0, plan["soc"][step - 1] + plan["delta_soc"][step - 1])
        plan["start"][step] = plan["duration"][step - 1] + plan["start"][step - 1]
        if plan["delta_soc"][step - 1] < 0:
            pause_duration = plan["duration"][step:step + 2].sum()
            possible_charge = pause_duration * CHARGING_SOC_PER_SEC
            needed_charge = SOC_MAX - plan["soc"][step]
            delta_soc = min(needed_charge, possible_charge)
            plan["delta_soc"][step + 1] = delta_soc

            charging_duration = delta_soc / CHARGING_SOC_PER_SEC
            plan["duration"][step + 1] = charging_duration
            plan["duration"][step] -= charging_duration

    return plan


def valid_bookings(sequential_list_of_bookings: list[booking_dt]) -> tuple[bool, np.array]:
    plan: np.array = plan_charge_max(sequential_list_of_bookings)
    valid: bool = 0 not in plan["soc"]
    return valid, plan


# finds start and end of stability incidents
# https://stackoverflow.com/questions/69457696/get-ranges-of-true-values-start-and-end-in-a-boolean-list-without-using-a-for
def np_vec2ran(x: np.array) -> np.array:
    z = np.concatenate(([False], x, [False]))

    start = np.flatnonzero(~z[:-1] & z[1:])
    end = np.flatnonzero(z[:-1] & ~z[1:])

    return np.column_stack((start, end - 1))


def find_max_price_for_discharge_interval(price: np.array, window_size: int) -> (float, int):
    if len(price) <= window_size:
        return price.sum(), 0
    sum_array = move_sum(price, window=window_size)
    max_index = np.nanargmax(sum_array) - window_size + 1
    max_price = sum_array[max_index]
    return max_price, max_index


def find_min_price_for_charge_interval(price: np.array, window_size) -> (float, int):
    if len(price) <= window_size:
        return price.sum(), 0
    sum_array = move_sum(price, window=window_size)
    min_index = np.nanargmin(sum_array) - window_size + 1
    min_price = sum_array[min_index]
    return min_price, min_index


def integrate(start, duration, time: np.array, price: np.array) -> float:
    x_start = start[0]
    x_end = start[0] + duration[0]
    x_data: np.array = np.linspace(x_start, x_end, 10)
    y_interp: np.array = np.interp(x_data, time, price)
    area: float = np.trapz(y_interp, x_data)
    return area


def soc_constraint(x: np.array, plan: np.array):
#   x = np.append(x, CHARGE_TIME_MAX_SECONDS)
    soc = SOC_MAX
    durations = x[1::2]
    i = 0
    for plan_line in plan:
        if plan_line["charge"]:
            soc += durations[i] * CHARGING_SOC_PER_SEC
            i += 1
        elif plan_line["booking"]:
            soc -= plan_line["delta_soc"]
        else:
            soc -= durations[i] * CHARGING_SOC_PER_SEC
            i += 1
        if soc < 0:
            return soc
        elif soc > SOC_MAX:
            soc = SOC_MAX - soc
            break
    return soc


def bounds_constraint(x, bounds):
#    x = np.append(x, CHARGE_TIME_MAX_SECONDS)
    end_points = bounds - (x[::2] + x[1::2])
    smallest_end_point = np.min(end_points)
    return smallest_end_point


def objective_func(x: np.array, args_list: list[np.array]) -> float:
#    x = np.append(x, CHARGE_TIME_MAX_SECONDS)

    metrics_v = args_list[0]
    plan = args_list[1]
    x_index = 0
    for plan_line in plan:
        if plan_line["booking"]  == True:
            continue
        elif plan_line["charge"] == True or plan_line["discharge"] == True:
            plan_line["start"] = x[x_index]
            x_index += 1
            plan_line["duration"] = x[x_index]
            x_index += 1

    # start values for soc=SOC_MAX and delta_soc: 0
    plan["soc"][-1] = SOC_MAX
    plan["delta_soc"][-1] = 0
    plan["duration"][-1] = 0


    # straighten out the plan, calculate price and soc
    for step in range(len(plan)):
        plan["soc"][step] = plan["soc"][step - 1] + plan["delta_soc"][step - 1]
        if plan["distance"][step] != 0:  # this is a booking
             continue
        elif plan["charge"][step] == False:  # discharge
            plan["delta_soc"][step] = - plan["duration"][step] * CHARGING_SOC_PER_SEC
            plan["price"][step] = - integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
                                              metrics_v["price"] - metrics_v["price_avg"])
        else: # charge
            plan["delta_soc"][step] = plan["duration"][step] * CHARGING_SOC_PER_SEC
            plan["price"][step] = + integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
                                              metrics_v["price_co2"] - metrics_v["price_co2_avg"])
    # last step, which the search algorithm never gets right
    plan["soc"][-1] = plan["soc"][-2] + plan["delta_soc"][-2]
    plan["duration"][-1] = (SOC_MAX - plan["soc"][- 1]) / CHARGING_SOC_PER_SEC
    plan["price"][-1] = + integrate(plan["start"][-1], plan["duration"][-1], metrics_v["time"],
                                    metrics_v["price_co2"] - metrics_v["price_co2_avg"])

    price: float = plan["price"].sum()
    return price


def optimise_discharge_plan(plan: np.array, optimisation_items, metrics_v: np.array) -> np.array:
    x_initial = []
    bounds = []
    constraints = []
    bounds_high_l = []
    for item in optimisation_items:
        if "booking" in item:
            continue
        x_initial.append(item["start"])
        x_initial.append(item["duration"])
        bound_low = item["bound_low"]
        bound_high = item["bound_high"]
        bounds_high_l.append(bound_high)
        bounds.append((bound_low, bound_high))
        bounds.append((0, min(CHARGE_TIME_MAX_SECONDS, bound_high - bound_low)))
    bounds_high = np.array(bounds_high_l)
    constraints.append({"type": "ineq", "fun": bounds_constraint, "args": (bounds_high,)})
    constraints.append({"type": "ineq", "fun": soc_constraint, "args": (plan,)})

    result: OptimizeResult = minimize(objective_func, np.array(x_initial),
                                      args=[metrics_v, plan],
                                      method="SLSQP",
                                      bounds=bounds,
                                      constraints=constraints,
                                      # options={"eps": 1 , "maxiter": 1000, "ftol": 1.0, "disp": True},
                                      tol=1)
    check_plan_consistency(plan, bounds)

    print("\n", result, "\n", plan)
    return result.x


def pad_plan_with_pauses(plan: np.array) -> np.array:
    padded_plan = np.zeros(len(plan) * 2 - 1, dtype=booking_dt)
    for cnt, row in enumerate(plan):
        padded_plan[cnt * 2] = row
        if cnt < len(plan) - 1:
            padded_plan[cnt * 2 + 1]["start"] = row["start"] + row["duration"]
            padded_plan[cnt * 2 + 1]["duration"] = plan[cnt + 1]["start"] - row["start"] - row["duration"]
            padded_plan[cnt * 2 + 1]["soc"] = row["soc"] + row["delta_soc"]
    return padded_plan


def plot_plan(plan_for_plotting, metrics_v, instabilities):
    plan = pad_plan_with_pauses(plan_for_plotting)

    fig, ax_left = plt.subplots()
    ax_right = ax_left.twinx()
    ax_right.plot(plan["start"], plan["soc"], label="State of Charge", color="black")

    ax_left.plot(metrics_v["time"], metrics_v["price"], label="price", color="blue")
    #ax_left.plot(metrics_v["time"], metrics_v["price_avg"], label="avg price")
    #ax_left.plot(metrics_v["time"], metrics_v["price_co2"], label="price with co2 costs")
    #ax_left.plot(metrics_v["time"], metrics_v["price_co2_avg"], label="avg price with co2 costs")

    instability_flag = True
    for i in instabilities:
        if instability_flag:
            ax_left.axvspan(metrics_v["time"][i[0]], metrics_v["time"][i[1]], alpha=0.5, color='red',
                            label="instability")
            instability_flag = False
        else:
            ax_left.axvspan(metrics_v["time"][i[0]], metrics_v["time"][i[1]], alpha=0.5, color='red')

    discharge_flag = True
    charge_flag = True
    booking_flag = True
    cnt = 0
    for i in range(len(plan)):
        if plan["distance"][i] > 0:  # this is a booking
            if booking_flag:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="yellow",
                                label="booking")
                booking_flag = False
            else:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="yellow")
        elif not plan["charge"][i] and plan["delta_soc"][i] != 0:
            if discharge_flag:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="blue",
                                label="discharge")
                discharge_flag = False
            else:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="blue")
        elif plan["charge"][i] and plan["delta_soc"][i] != 0:
            if charge_flag:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="green",
                                label="charge")
                charge_flag = False
            else:
                ax_left.axvspan(plan["start"][i], plan["start"][i] + plan["duration"][i], alpha=0.5, color="green")
        else:
            # this is a pause
            pass
    ax_left.set_xlabel("time in seconds")
    ax_left.set_ylabel("price in €/kWs", color="blue")
    ax_right.set_ylabel("soc (normalized)", color="black")
    legend_1 = ax_left.legend(loc=2, borderaxespad=1.)
    legend_1.remove()
    ax_right.legend(loc=1, borderaxespad=1.)
    ax_right.add_artist(legend_1)
    plt.show()

def check_plan_consistency(plan: np.array, bounds: list[tuple[float, float]]):
    # start time must be increasing
    start = 0
    for cnt, i in enumerate(plan):
            if i["start"] > start:
                start = i["start"]
            else:
                print(f"{cnt}\n{i}\n{len(plan)}\n{plan},{len(bounds)}")

                raise Exception("Plan is not sorted")


def fill_in_plan(plan: np.array, x: np.array, metrics_v: np.array) -> np.array:
    # fill in the plan with the optimised times
    events = x.reshape(-1, 2)
    skip = 0
    for cnt, (start, duration) in enumerate(events):
        index = cnt + skip
        while plan[index]["distance"] > 0:  # this is a booking
            plan[index]["delta_soc"] = - plan[index]["distance"] * SOC_PER_KM
            skip += 1
            index = cnt + skip
        plan[index]["start"] = start
        plan[index]["duration"] = duration
    plan["start"][-1] = plan["start"][-2] + plan["duration"][-2]

    plan["soc"][-1] = SOC_MAX
    plan["delta_soc"][-1] = 0
    plan["duration"][-1] = 0
    # straighten out the plan, calculate price and soc
    for step in range(len(plan)):
        plan["soc"][step] = plan["soc"][step - 1] + plan["delta_soc"][step - 1]
        if step % 2 == 0:
            plan["delta_soc"][step] = - plan["duration"][step] * CHARGING_SOC_PER_SEC
            plan["price"][step] = - integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
                                              metrics_v["price"])
        else:
            plan["delta_soc"][step] = plan["duration"][step] * CHARGING_SOC_PER_SEC
            plan["price"][step] = + integrate(plan["start"][step], plan["duration"][step], metrics_v["time"],
                                              metrics_v["price_co2"])
    # last step, which the search algorithm never gets right
    plan["soc"][-1] = plan["soc"][-2] + plan["delta_soc"][-2]
    plan["duration"][-1] = (SOC_MAX - plan["soc"][- 1]) / CHARGING_SOC_PER_SEC
    plan["delta_soc"][-1] = - plan["duration"][-1] * CHARGING_SOC_PER_SEC
    plan["price"][-1] = + integrate(plan["start"][-1], plan["duration"][-1], metrics_v["time"], metrics_v["price_co2"])
    return plan


def get_data_arrays(end, start):
    metrics_v: np.array = get_metrics_vector()
    boundary_index = metrics_v["time"].searchsorted([start, end])
    sub_metric_v = metrics_v[boundary_index[0]:boundary_index[1]].copy()
    stability_ranges = np_vec2ran((sub_metric_v["stability"]))
    instability_ranges = np_vec2ran(sub_metric_v["instability"])
    while stability_ranges[0][0] < instability_ranges[0][0]:
        stability_ranges = stability_ranges[1:]
    return instability_ranges, stability_ranges, sub_metric_v


def create_stability_optimisation_items(stability_ranges: np.array, sub_metric_v: np.array) -> list[OptimisationItem]:
    stability_item_list = []
    for stability_range in stability_ranges:
        if stability_range[0] == stability_range[1]:
            time_min_price = stability_range[0]
            window_size = 1
        else:
            window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), stability_range[1] - stability_range[0])
            _, min_index = find_min_price_for_charge_interval(
                sub_metric_v["price"][stability_range[0]:stability_range[1]], window_size)
            time_min_price = sub_metric_v["time"][stability_range[0] + min_index]

        stability_item: OptimisationItem = {"key": sub_metric_v["time"][stability_range[0]],
                                            "bound_low": sub_metric_v["time"][stability_range[0]],
                                            "bound_high": sub_metric_v["time"][stability_range[1]] + 15 * 60 - 1,
                                            "optional": True,
                                            "charge": True,
                                            "duration": window_size * DATA_INTERVAL,
                                            "start": time_min_price,
                                            "modified": False}
        stability_item_list.append(stability_item)
    return stability_item_list


def create_instability_optimisation_items(instability_ranges: np.array, sub_metric_v: np.array) -> list[
    OptimisationItem]:
    instability_item_list = []
    for instability_range in instability_ranges:
        if instability_range[0] == instability_range[1]:
            time_max_price = instability_range[0]
            window_size = 1
        else:
            window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), instability_range[1] - instability_range[0])
            _, max_index = find_max_price_for_discharge_interval(
                sub_metric_v["price"][instability_range[0]:instability_range[1]], window_size)
            time_max_price = sub_metric_v["time"][instability_range[0] + max_index]
        instability_item: OptimisationItem = {"key": sub_metric_v["time"][instability_range[0]],
                                              "bound_low": sub_metric_v["time"][instability_range[0]],
                                              "bound_high": sub_metric_v["time"][instability_range[1]] + 15 * 60 - 1,
                                              "optional": True,
                                              "discharge": True,
                                              "duration": window_size * DATA_INTERVAL,
                                              "start": time_max_price,
                                              "modified": False}
        instability_item_list.append(instability_item)
    return instability_item_list


def create_booking_optimisation_items(booking_list: list[booking_dt]) -> list[OptimisationItem]:
    booking_item_list = []
    for booking in booking_list:
        booking_item: OptimisationItem = {"key": booking["start"],
                                          "bound_low": booking["start"],
                                          "bound_high": booking["start"] + booking["duration"],
                                          "optional": False,
                                          "booking": True,
                                          "charge": False,
                                          "duration": booking["duration"],
                                          "start": booking["start"],
                                          "distance": booking["distance"],
                                          }
        booking_item_list.append(booking_item)
    return booking_item_list


def turn_booking_into_charging_plan(start: int, end: int, bookings_list: list[booking_dt]) -> tuple[np.array, float]:
    # schedule discharge interval per stability incident
    instability_ranges, stability_ranges, sub_metric_v = get_data_arrays(end, start)

    instability_item_list = create_instability_optimisation_items(instability_ranges, sub_metric_v)
    stability_item_list = create_stability_optimisation_items(stability_ranges, sub_metric_v)
    booking_item_list = create_booking_optimisation_items(bookings_list)
    item_list = sorted(booking_item_list + instability_item_list + stability_item_list, key=lambda x: x["key"])

    # pass over the item list several times.
    # only optional items are modified
    # optional items don't intersect or overlap with each other
    # first pass: see if the item's bounds collide with the previous item
    #             are the start times identical? -> change the optional item
    #             does a non-optional item intersect an optional item? -> spit the optional item into two and have it continue after the higher prio item, adjust the bounds and durations of the two optional items
    #             does an optional item intersect a non-optional item? -> delete the optional item
    #             does an optional item overlap with a non-optional item? -> adjust the optional item
    #             if we change or modify an item, we set the modified flag to true
    sort_flag = True
    sort_cnt = 0
    while sort_flag:
        sort_flag = False
        sort_flag = adjust_item_boundaries(item_list, sort_flag)

        if sort_flag:
            sort_cnt += 1
            if sort_cnt > 5:  # how many (dis)charging items can intersect a booking?
                raise Exception("Too many sorting iterations")
            item_list.sort(key=lambda x: x["key"])
            print("sorting")

    # second pass: adjust the start times and duration of modified optional items
    adjust_initial_optimization_values(item_list, sub_metric_v)

    key=0
    start=0
    for cnt, i in enumerate(item_list):
        if key > i["key"]:
            raise Exception("key error at ", cnt)
        key = i["key"]
        if start > i["start"]:
            raise Exception("start error at ", cnt)
        start = i["start"]

    # construct the charging plan skeletal from the item list
    events_l: list[booking_dt] = []
    for cnt, item in enumerate(item_list):
        event = np.zeros(1, dtype=booking_dt)
        event["cnt"] = cnt
        if item["optional"]:
            if "charge" in item:

                event["start"] = item["start"]
                event["duration"] = item["duration"]
                event["charge"] = True
            else:

                event["start"] = item["start"]
                event["duration"] = item["duration"]
                event["discharge"] = True
        else:
            event["start"] = item["start"]
            event["duration"] = item["duration"]
            event["distance"] = item["distance"]
            event["booking"] = True
            event["delta_soc"] = - item["distance"] * SOC_PER_KM
            event["charge"] = False
        events_l.append(event)

    plan = np.array(events_l, dtype=booking_dt)
    times = optimise_discharge_plan(plan, item_list, sub_metric_v)
    plan = fill_in_plan(plan, times, sub_metric_v)
    price = plan["price"].sum()

    plot_plan(plan, sub_metric_v, instability_ranges)
    return plan, price


def adjust_initial_optimization_values(item_list, sub_metric_v):
    for item in item_list:
        if "booking" in item:
            continue # don't touch bookings
        if "charge" or "dicharge" in item:
            boundary_index = sub_metric_v["time"].searchsorted([item["bound_low"], item["bound_high"]])
            if item["bound_high"] - item["bound_low"] < CHARGE_TIME_MAX_SECONDS: # short window
                item["start"] = item["bound_low"]
                item["duration"] = item["bound_high"] - item["bound_low"]
                item["modified"] = False
            elif "charge" in item:  # long charge window
                # find the minimum price in the interval
                window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), boundary_index[1] - boundary_index[0])
                _, min_index = find_min_price_for_charge_interval(
                    sub_metric_v["price"][boundary_index[0]:boundary_index[1]], window_size)
                start = sub_metric_v["time"][boundary_index[0] + min_index]
                item["start"] = start
                item["duration"] = window_size * DATA_INTERVAL
                # the above calculation was approximate, because we looked at 900s intervals
                if item["start"] < item["bound_low"]:
                    item["start"] = item["bound_low"]
                if item["start"] + item["duration"] > item["bound_high"]:
                    item["start"] = item["bound_high"] - item["duration"]
            else:  # long discharge window
                # find the maximum price in the interval
                window_size = min(int(CHARGE_TIME_MAX_SECONDS / DATA_INTERVAL), boundary_index[1] - boundary_index[0])
                _, max_index = find_max_price_for_discharge_interval(
                    sub_metric_v["price"][boundary_index[0]:boundary_index[1]], window_size)
                item["start"] = sub_metric_v["time"][boundary_index[0] + max_index]
                item["duration"] = window_size * DATA_INTERVAL
                # the above calculation was approximate, because we looked at 900s intervals
                if item["start"] < item["bound_low"]:
                    item["start"] = item["bound_low"]
                if item["start"] + item["duration"] > item["bound_high"]:
                    item["start"] = item["bound_high"] - item["duration"]
            item["modified"] = False
            # check if start time is within the bounds
        if item["modified"]:
            raise Exception(f"what item is this?\n{item}")
            if item["start"] < item["bound_low"]:
                print("booking: start time is lower than lower bound by", item["bound_low"] - item["start"],
                      "seconds")
                item["start"] = item["bound_low"]
            if item["start"] + item["duration"] > item["bound_high"]:
                print("booking: end time is higher than upper bound by",
                      item["start"] + item["duration"] - item["bound_high"],
                      "seconds")
                item["start"] = item["bound_high"] - item["duration"]
            item["modified"] = False


def adjust_item_boundaries(item_list, sort_flag):
    for i, item in enumerate(item_list):
        if not "booking" in item:
            # we only look at charge or discharge items
            if i > 0:
                previous_item = item_list[i - 1]
                # if the two items intersect, the other one is a booking and this item needs adjustment
                if item["bound_high"] <= previous_item["bound_high"]:
                    # this optional item fully intersects a booking item
                    # delete this optional item
                    item_list.pop(i)
                    # we don't need to increment i, because the next item will be at the same index
                    continue
                elif item["bound_low"] <= previous_item["bound_high"]:
                    item["bound_low"] = previous_item["bound_high"] + 1
                    item["modified"] = True
            if i < len(item_list) - 1:
                next_item = item_list[i + 1]
                if item["bound_low"] == next_item["bound_low"] and item["bound_high"] > next_item["bound_high"]:
                    # an optional item has the same start time as a non-optional item and is longer
                    # change the optional item
                    item["bound_low"] = next_item["bound_high"] + 1
                    item["key"] = next_item["bound_high"] + 1
                    item["modified"] = True
                    # items need to be sorted again
                    sort_flag = True
                if item["bound_high"] == next_item["bound_high"] and item["bound_low"] == next_item["bound_low"]:
                    # an optional item has the same start time and duration as a booking item
                    # remove the optional item
                    item_list.pop(i)
                    # we don't need to increment i, because the next item will be at the same index
                    continue
                if next_item["bound_high"] < item["bound_high"]:
                    # an optional item fully intersects a non-optional item
                    # split the optional item into two, around the non-optional item
                    # the first optional item continues after the non-optional item
                    # the second optional item is the remainder of the original optional item
                    new_item = {"key": next_item["bound_high"] + 1,
                                "bound_low": next_item["bound_high"] + 1,
                                "bound_high": item["bound_high"],
                                "optional": True,
                                "modified": True}
                    if "charge" in item:
                        new_item["charge"] = True
                    else:
                        new_item["discharge"] = True
                    item_list.insert(i + 2, new_item)
                    item["bound_high"] = next_item["bound_low"] - 1
                    item["modified"] = True
                    sort_flag = True
                if item["bound_high"] >= next_item["bound_low"]:
                    # an optional item overlaps with a non-optional item
                    # adjust the optional item
                    item["bound_high"] = next_item["bound_low"] - 1
                    item["modified"] = True
    return sort_flag


def calculate_price_for_booking(booking: booking_dt, bookings_list: list[booking_dt],
                                old_bookings_list: list[booking_dt], old_price) -> np.array:
    # we really want to go to integer time
    start_time = bookings_list[0]["start"] - ONE_DAY
    end_time = bookings_list[-1]["start"] + bookings_list[-1]["duration"] + ONE_DAY

    if len(old_bookings_list) == 0 or \
            bookings_list[0]["start"] != old_bookings_list[0]["start"] or \
            bookings_list[-1]["start"] != old_bookings_list[-1]["start"]:
        #  the timeframe changed, so we need to recalculate the old plan on the new timeframe
        old_charging_plan, old_price = turn_booking_into_charging_plan(start_time, end_time, old_bookings_list)

    charging_plan, price = turn_booking_into_charging_plan(start_time, end_time, bookings_list)
    caused_cost: float = old_price - price

    booking["price"] = booking["duration"] * DURATION_FEE + booking["distance"] * DISTANCE_FEE + caused_cost
    print(f"\033[0;32m{price}\033[00m ")
    return charging_plan, price


def process_booking_requests(requests: list[booking_dt]):
    booking_plan: np.array = None
    charging_plan_list = []

    sequential_list_of_bookings: list[booking_dt] = []
    for request in requests:
        old_booking_list = sequential_list_of_bookings.copy()
        insort(sequential_list_of_bookings, request,
               key=lambda x: x["start"][0])
        valid, booking_plan = valid_bookings(sequential_list_of_bookings)
        price: float = 0
        if not valid:
            start: int = request['start'][0]
            index: int = bisect_left(sequential_list_of_bookings, start,
                                     key=lambda x: x["start"][0])
            del sequential_list_of_bookings[index]
        else:
            sequential_trip_requests = extract_trip_events(sequential_list_of_bookings)
            trip_request = request[0]
            old_trip_requests = extract_trip_events(old_booking_list)

            charging_plan, price = calculate_price_for_booking(trip_request, sequential_trip_requests,
                                                               old_trip_requests, price)
            charging_plan_list.append(charging_plan)
    return sequential_list_of_bookings, booking_plan


def extract_trip_events(requests):
    extracted_trips = []
    for request in requests:
        extracted_trips.append(request[0])
    return extracted_trips


def main():
    sequential_list_of_bookings = create_sequential_list_of_bookings(NUM_BOOKINGS)
    charging_plan_max: np.array = plan_charge_max(sequential_list_of_bookings)
    randomized_list_of_bookings = randomize_list_of_bookings(sequential_list_of_bookings)
    accepted_list_of_bookings, accepted_bookings_a = process_booking_requests(randomized_list_of_bookings)


if __name__ == "__main__":
    main()
\$\endgroup\$
1
  • \$\begingroup\$ I would like to add a tag indicating that I look for help with structuring the internal data. What would be an appropriate tag? \$\endgroup\$ Commented Dec 26, 2022 at 15:48

0

Browse other questions tagged or ask your own question.