## Phase 3: Core Optimizer - Adaptive Large Neighborhood Search (ALNS)
import time
import random
import copy
import math
import pandas as pd
import networkx as nx
from functools import lru_cache
# ------------- CONFIG -------------
ALNS_ITERATIONS = 80 # was 200
INITIAL_TEMPERATURE = 2.0 # accept more early moves
COOLING_RATE = 0.996 # slightly slower cooling
# prefer longer runs
MIN_ROUTE_DISTANCE = 8.0 # miles
MAX_ROUTE_DISTANCE = 12.0 # miles
TARGET_MILES = 10.5 # soft target toward the upper bound
# Objective weights (tune these)
W_RUNS = 1000.0
W_DIST = 1.0
W_UNCOVERED = 600.0
W_RANGE = 200.0
W_SHORTFALL = 30.0
# Hard wall-time cap to avoid long runs
MAX_WALL_TIME_SEC = 180 # ~3 minutes
# ------------- HELPERS -------------
def path_distance_m(graph, path):
"""Total path distance in meters (handles MultiDiGraph edges)."""
dist = 0.0
for u, v in zip(path[:-1], path[1:]):
ed = graph.get_edge_data(u, v)
if ed:
dist += list(ed.values())[0]['length']
return dist
def miles(meters):
return meters / 1609.34
def edges_from_path(path):
"""Directionless edge set from node path."""
edges = set()
for i in range(len(path) - 1):
u, v = sorted((path[i], path[i+1]))
edges.add((u, v))
return edges
# Required coverage universe (normalize like edges_from_path)
all_required_edges = {tuple(sorted((u, v))) for u, v, k in G_final_streets.edges(keys=True)}
# ------------- COST (SOFT FEASIBILITY) -------------
def route_range_violation(mi):
"""Miles outside [MIN, MAX] — positive if violating, 0 otherwise."""
if mi < MIN_ROUTE_DISTANCE:
return (MIN_ROUTE_DISTANCE - mi)
if mi > MAX_ROUTE_DISTANCE:
return (mi - MAX_ROUTE_DISTANCE)
return 0.0
def route_shortfall(mi):
"""Encourage longer runs by penalizing how far below TARGET we are."""
return max(0.0, TARGET_MILES - mi)
def penalized_cost(solution):
"""Lower is better. No hard filters; violations become penalties."""
if not solution:
return float('inf')
total_miles = 0.0
covered = set()
range_pen = 0.0
shortfall_pen = 0.0
for r in solution:
mi = r['distance_miles']
total_miles += mi
range_pen += route_range_violation(mi)
shortfall_pen += route_shortfall(mi)
covered |= edges_from_path(r['path'])
uncovered = len(all_required_edges - covered)
return (
W_RUNS * len(solution)
+ W_DIST * total_miles
+ W_UNCOVERED * uncovered
+ W_RANGE * range_pen
+ W_SHORTFALL * shortfall_pen
)
# ------------- DESTROY OPERATORS (light) -------------
def destroy_random_routes(solution, percentage_to_remove=0.10):
"""Remove k random routes; return (remaining_solution, unassigned_edges)."""
if not solution:
return [], set()
destroyed = copy.deepcopy(solution)
k = max(1, int(len(destroyed) * percentage_to_remove))
random.shuffle(destroyed)
unassigned = set()
for _ in range(k):
if not destroyed:
break
r = destroyed.pop()
unassigned |= edges_from_path(r['path'])
return destroyed, unassigned
def destroy_shaw_routes(solution, k=2):
"""
Shaw removal: pick a seed route; remove it and the k-1 most related routes
by Jaccard overlap of (directionless) edge sets.
"""
if len(solution) <= 1:
return copy.deepcopy(solution), set()
destroyed = copy.deepcopy(solution)
unassigned = set()
# seed: longest route
seed_idx = max(range(len(destroyed)), key=lambda i: destroyed[i]['distance_miles'])
seed_edges = edges_from_path(destroyed[seed_idx]['path'])
def relatedness(i):
e = edges_from_path(destroyed[i]['path'])
inter = len(e & seed_edges)
union = len(e | seed_edges) or 1
return inter / union
# k-1 most related (excluding the seed itself)
idxs = sorted(range(len(destroyed)), key=relatedness, reverse=True)
removed = sorted(set([seed_idx] + idxs[1:k]), reverse=True)
for i in removed:
unassigned |= edges_from_path(destroyed[i]['path'])
destroyed.pop(i)
return destroyed, unassigned
# ------------- REPAIR OPERATOR (REGRET-2, throttled) -------------
@lru_cache(maxsize=10000)
def sp_length_from(src):
"""Cache of single-source shortest path lengths from src node."""
return nx.single_source_dijkstra_path_length(G_final_streets, src, weight='length')
@lru_cache(maxsize=50000)
def sp_path(src, dst):
"""Cached shortest path (list of nodes) between two nodes."""
return nx.shortest_path(G_final_streets, src, dst, weight='length')
def insertion_cost_for_edge(edge, repaired, run_idx, pos_idx):
"""Cost delta (meters) to splice edge (u,v) between path[pos_idx]→path[pos_idx+1]."""
u, v = edge
path = repaired[run_idx]['path']
p1, p2 = path[pos_idx], path[pos_idx+1]
ed_uv = G_final_streets.get_edge_data(u, v) or G_final_streets.get_edge_data(v, u)
ed_p1p2 = G_final_streets.get_edge_data(p1, p2)
if not ed_uv or not ed_p1p2:
return float('inf')
uv_len = list(ed_uv.values())[0]['length']
p1p2_len = list(ed_p1p2.values())[0]['length']
d1 = sp_length_from(p1).get(u, float('inf'))
d2 = sp_length_from(v).get(p2, float('inf'))
return d1 + uv_len + d2 - p1p2_len
def patch_path_with_edge(edge, repaired, run_idx, pos_idx):
"""Return the new path after actually inserting the edge (u,v)."""
u, v = edge
path = repaired[run_idx]['path']
p1, p2 = path[pos_idx], path[pos_idx+1]
path_to_u = sp_path(p1, u)
path_v_to_p2 = sp_path(v, p2)
return path[:pos_idx+1] + path_to_u[1:] + path_v_to_p2[1:]
def repair_regret2(destroyed_solution, unassigned_edges, home_node, gym_node):
repaired = copy.deepcopy(destroyed_solution)
edges_left = set(unassigned_edges)
# Stage A: insert into existing runs by regret-2
while edges_left:
candidate_rows = []
# cap edges per outer loop to reduce work
MAX_EDGES_PER_LOOP = 400
for e in list(edges_left)[:MAX_EDGES_PER_LOOP]:
costs = []
for i, run in enumerate(repaired):
path = run['path']
# check at most ~20 evenly-spaced insertion positions
step = max(1, (len(path) - 1) // 20)
for j in range(0, len(path) - 1, step):
c = insertion_cost_for_edge(e, repaired, i, j)
if c < float('inf'):
# coarse feasibility: allow small overshoot
new_len_m = path_distance_m(G_final_streets, patch_path_with_edge(e, repaired, i, j))
if miles(new_len_m) <= (MAX_ROUTE_DISTANCE + 0.8):
costs.append((c, i, j))
if costs:
costs.sort()
best = costs[0][0]
second = costs[1][0] if len(costs) > 1 else best + 1e6
regret = second - best
candidate_rows.append((regret, best, e, costs[0][1], costs[0][2]))
if not candidate_rows:
break
# pick the edge with highest regret
regret, best_cost, edge, run_i, pos_j = max(candidate_rows)
new_path = patch_path_with_edge(edge, repaired, run_i, pos_j)
repaired[run_i]['path'] = new_path
repaired[run_i]['distance_miles'] = miles(path_distance_m(G_final_streets, new_path))
# mark both (u,v) and (v,u) as covered
edges_left.discard(edge)
edges_left.discard((edge[1], edge[0]))
# Stage B: leftover edges → create new runs (start home, end gym)
while edges_left:
new_path = [home_node]
last = new_path[-1]
# pick closest leftover edge endpoint to extend toward
d_last = sp_length_from(last)
edge = min(edges_left, key=lambda e: min(d_last.get(e[0], 1e18), d_last.get(e[1], 1e18)))
target = edge[0] if d_last.get(edge[0], 1e18) <= d_last.get(edge[1], 1e18) else edge[1]
# walk to it, traverse to the other end, then head to gym
new_path += sp_path(last, target)[1:]
other = edge[1] if target == edge[0] else edge[0]
new_path.append(other)
new_path += sp_path(new_path[-1], gym_node)[1:]
repaired.append({
'run_id': -1,
'path': new_path,
'distance_miles': miles(path_distance_m(G_final_streets, new_path))
})
edges_left.discard(edge)
edges_left.discard((edge[1], edge[0]))
return repaired
# ------------- ACCEPTANCE (SA + RRT) -------------
def accept(curr_cost, new_cost, temperature, best_cost, rrt_band):
"""Accept if better, SA-probability, or within record-to-record band."""
if new_cost <= curr_cost:
return True
if temperature > 1e-9:
if math.exp((curr_cost - new_cost) / max(1e-9, temperature)) > random.random():
return True
if new_cost <= best_cost + rrt_band:
return True
return False
# ------------- OPERATOR REGISTRY + ADAPTATION -------------
destroy_ops = [
("random", destroy_random_routes),
("shaw", destroy_shaw_routes),
]
repair_ops = [
("regret2", lambda s,u: repair_regret2(s, u, home_node, gym_node)),
]
weights_d = {n: 1.0 for n,_ in destroy_ops}
weights_r = {n: 1.0 for n,_ in repair_ops}
scores_d = {n: 0.0 for n,_ in destroy_ops}
scores_r = {n: 0.0 for n,_ in repair_ops}
def pick_operator(ops, weights):
names = [n for n,_ in ops]
ws = [weights[n] for n in names]
s = sum(ws)
probs = [w/s for w in ws]
return random.choices(ops, weights=probs, k=1)[0]
# ------------- INITIALIZATION -------------
random.seed(42)
try:
import numpy as _np
_np.random.seed(42)
except Exception:
pass
initial_solution = generated_runs # from Phase 2 (greedy/straight)
current_solution = copy.deepcopy(initial_solution)
best_solution = copy.deepcopy(initial_solution)
best_cost = penalized_cost(best_solution)
temperature = INITIAL_TEMPERATURE
rrt_band = 0.01 * best_cost # 1% band around the incumbent
no_improve = 0
alns_start_time = time.time()
print(f"🧠 ALNS: iters={ALNS_ITERATIONS}, T0={INITIAL_TEMPERATURE}, cooling={COOLING_RATE}")
print(f"Initial runs={len(best_solution)}, cost={best_cost:.2f}")
# ------------- MAIN LOOP -------------
for i in range(ALNS_ITERATIONS):
# hard wall-time cap
if time.time() - alns_start_time > MAX_WALL_TIME_SEC:
break
# choose operators (roulette by weights)
(d_name, destroy) = pick_operator(destroy_ops, weights_d)
(r_name, repair) = pick_operator(repair_ops, weights_r)
# destroy + repair
tmp = copy.deepcopy(current_solution)
destroyed, unassigned = destroy(tmp)
candidate = repair(destroyed, unassigned)
curr_cost = penalized_cost(current_solution)
cand_cost = penalized_cost(candidate)
improved = cand_cost < curr_cost
accepted = accept(curr_cost, cand_cost, temperature, best_cost, rrt_band)
if accepted:
current_solution = candidate
# reward operators proportionally to improvement (non-negative)
delta = max(0.0, curr_cost - cand_cost)
scores_d[d_name] += delta
scores_r[r_name] += delta
# update best
if cand_cost < best_cost:
best_solution = copy.deepcopy(candidate)
best_cost = cand_cost
no_improve = 0
else:
no_improve += 1
# cooling
temperature *= COOLING_RATE
# periodic operator weight update (more frequent)
if (i + 1) % 20 == 0:
for n in weights_d:
weights_d[n] = 0.8 * weights_d[n] + 0.2 * (1.0 + scores_d[n])
for n in weights_r:
weights_r[n] = 0.8 * weights_r[n] + 0.2 * (1.0 + scores_r[n])
scores_d = {n: 0.0 for n in scores_d}
scores_r = {n: 0.0 for n in scores_r}
# plateau reheat to keep exploring
if no_improve >= 200:
temperature = max(temperature, 0.5) # brief reheat
no_improve = 0
# progress log (removed)
# early stop on long plateau (removed)
# ------------- SUMMARY -------------
end = time.time()
best_df = pd.DataFrame(best_solution)
best_df['run_id'] = range(1, len(best_df) + 1)
print("\nALNS Optimization Complete")
print(f"Final runs: {len(best_solution)}")
print(best_df[['run_id', 'distance_miles']].round(2))