Source code for dijkstra

from future_builtins import zip
from operator import itemgetter
from collections import defaultdict
from heapdict import heapdict

[docs]def identity(value): """ Identity mapping of single argument. """ return value
[docs]def make_graph(source, edge_factory=identity): """ Make graph out of list of edges. :param source: list of tuples in form of (from, to, *payload) :param edge_factory: factory for edge attributes :returns: adjacency list Example: >>> make_graph([(1, 2, 10), (2, 3, 15), (1, 3, 30)]) defaultdict(<type 'list'>, {1: [(2, 10), (3, 30)], 2: [(3, 15)]}) >>> from collections import namedtuple >>> EdgeAttrs = namedtuple('EdgeAttrs', 'time distance mode') >>> make_graph([(1, 2, 10, 0.4, 'WALK'), (2, 3, 30, 15, 'BUS'), (1, 3, 15, 30, 'TRAIN')], edge_factory=EdgeAttrs) defaultdict(<type 'list'>, {1: [(2, EdgeAttrs(time=10, distance=0.4, mode='WALK')), (3, EdgeAttrs(time=15, distance=30, mode='TRAIN'))], 2: [(3, EdgeAttrs(time=30, distance=15, mode='BUS'))]}) """ graph = defaultdict(list) for edge in source: left, right = edge[:2] payload = edge_factory(*edge[2:]) graph[left].append((right, payload)) return graph
[docs]def reverse_graph(source): """ Reverse direction of graph. :param source: adjacency list of graph Example: >>> reverse_graph(make_graph([(1, 2, 10), (2, 3, 15), (1, 3, 30)])) defaultdict(<type 'list'>, {2: [(1, 10)], 3: [(1, 30), (2, 15)]}) """ reversed = defaultdict(list) for left, edges in source.items(): for right, payload in edges: reversed[right].append((left, payload)) return reversed
[docs]def dijkstra_kernel(graph, start, previous, cost_fn): """ Generator for dijkstra search. Each iteration yields visited node id and cost to reach it from start node. :param graph: adjacency list of graph :param start: start node id :param previous: dictionary for storing settled nodes :param cost_fn: cost function applied for each edge, must be stateless """ queue = heapdict({start: 0.0}) previous.update({start: (None, 0.0, None)}) while queue: left, cost = queue.popitem() yield left, cost for right, payload in graph.get(left, []): alt_cost = cost + cost_fn(payload) # if there was no cost associated or cost is lower if right not in previous or alt_cost < previous[right][1]: queue[right] = alt_cost previous[right] = (left, alt_cost, payload)
[docs]def backtrack(previous, start): """ Collect edges visited by dijkstra kernel. :param previous: dictionary of settled nodes :param start: node id to start backtracking from :returns: reversed list of edges Example: >>> list(backtrack({3: (2, 25, 15), 2: (1, 10, 10), 1: (None, 0.0, None)}, 3)) [(2, 3, 15), (1, 2, 10)] """ left = start while True: right, _, payload = previous[left] if right is None: break yield right, left, payload left = right
[docs]def just_ids(source): """ Convert dijkstra result to contain only node ids. :param source: (cost, edges) tuple returned by dijkstra search Example: >>> just_ids((25.0, [(1, 2, 10.0), (2, 3, 15.0)])) (25.0, [1, 2, 3]) """ cost, edges = source if not len(edges): return (cost, []) result = [left for left, right, _ in edges] result.append(right) return (cost, result)
[docs]def dijkstra(graph, start, end, cost_fn=identity): """ Dijkstra search on directed graph. :param graph: adjacency list of graph :param start: node to start search at :param end: final node :param cost_fn: cost function applied on edges Example: >>> graph = make_graph([(1, 2, 10.0), (2, 3, 15.0), (1, 3, 30.0)]) >>> dijkstra(graph, 1, 3) (25.0, [(1, 2, 10.0), (2, 3, 15.0)]) """ previous = {} # keep visiting nodes till search is finished for id, cost in dijkstra_kernel(graph, start, previous, cost_fn): if id == end: break # if end node was reached if id == end: edges = list(backtrack(previous, id)) edges.reverse() return (cost, edges)
[docs]def bidirect_dijkstra(graph, start, end, bwd_graph = None, cost_fn=identity): """ Bidirectional dijkstra search on directed graph. Search from both start and end of the graph reducing search space substantially. :param graph: adjacency list of graph :param start: node to start search at :param end: final node :param bwd_graph: reversed graph for backward search :param cost_fn: cost function applied on edges Example: >>> graph = make_graph([(1, 2, 10.0), (2, 3, 15.0), (1, 3, 30.0)]) >>> bidirect_dijkstra(graph, 1, 3) (25.0, [(1, 2, 10.0), (2, 3, 15.0)]) """ fwd_previous = {} bwd_previous = {} fwd_kernel = dijkstra_kernel(graph, start, fwd_previous, cost_fn) bwd_kernel = dijkstra_kernel(reverse_graph(graph), end, bwd_previous, cost_fn) intersection = None cost = float('inf') # helper for finding intersection candidate def check_intersects(id, previous): if id in previous: alt_cost = fwd_previous[id][1] + bwd_previous[id][1] if alt_cost < cost: return (id, alt_cost) return (None, float('inf')) for fwd, bwd in zip(fwd_kernel, bwd_kernel): # check if there is better intersection intersection, cost = min((intersection, cost), check_intersects(fwd[0], bwd_previous), check_intersects(bwd[0], fwd_previous), key=itemgetter(1)) # stop searching if current path is not improved if cost < fwd[1] + bwd[1]: break # if shortest path was found if intersection is not None: edges = list(backtrack(fwd_previous, intersection)) edges.reverse() edges.extend((right, left, payload) for left, right, payload in backtrack(bwd_previous, intersection)) return (cost, edges)