Source code for cereeberus.compute.distance

import networkx as nx
import math
import numpy as np

[docs] def edit(R1, R2): """Function to return the edit distance between two Reeb graphs. Uses the graph_edit_distance function from https://networkx.org/documentation/stable/reference/algorithms/similarity.html. Args: R1 (reeb graph): Reeb graph or Merge tree R2 (reeb graph): Reeb graph or Merge tree Returns: edit_distance (int): graph edit distance """ import networkx as nx edit_distance = nx.graph_edit_distance(R1.G, R2.G) return edit_distance
[docs] def findFiltration(theta: float, origin: "tuple[float,float]"=(0,0)) -> "tuple[tuple[float,float], tuple[float,float], int]": '''get line of theta slope going through origin point, also track an inversion flag for dist''' # could assert this domain but can just adjust for it if theta >= 2*math.pi: theta = theta % (2*math.pi) if theta == 0: p1 = (origin[0], origin[1]+1) else: # slope is normal to the angle slope = -1 / math.tan(theta) p1 = (origin[0]+1, origin[1]+slope) # flip distance function for certain angles # here, since "left" in our signed dist function is positive, inverting here sign = 1 if theta <= math.pi and theta > 0 else -1 return (origin, p1, sign)
[docs] def getHeightMatrix(mt: nx.DiGraph, verbose: bool=False) -> "tuple[np.matrix, np.matrix]": ''' LCA from networkx, assuming it to be better than Eulerian tour + spare table min range query, returning a matrix of pairwise LCA heights and then the matrix of LCA nodes this uses Tarjan's offline LCA optimized with inverse Ackermann function so O(Nodes+Queries) approximately ''' # remove inf node size = mt.number_of_nodes()-1 heightMatrix = np.zeros((size,size)) nodeMatrix = np.zeros((size,size)) # map actual node indexes to sorted matrix indexes allNodes = list(mt.nodes) allNodes.remove('inf') allNodes = sorted(allNodes) nodesDict = dict( zip(allNodes, range(len(allNodes))) ) for LCA in nx.tree_all_pairs_lowest_common_ancestor(mt): # this is of the form ( (node1, node2), LCA ) # handle root separately if LCA[0][0]=='inf' or LCA[0][1]=='inf': continue i = nodesDict[LCA[0][0]] j = nodesDict[LCA[0][1]] heightMatrix[i,j] = mt.nodes[LCA[1]]['fx'] heightMatrix[j,i] = mt.nodes[LCA[1]]['fx'] nodeMatrix[i,j] = LCA[1] nodeMatrix[j,i] = LCA[1] heightMatrix = np.matrix(heightMatrix) nodeMatrix = np.matrix(nodeMatrix) if verbose: print(nodesDict) print(heightMatrix) print(nodeMatrix) return heightMatrix, nodeMatrix
[docs] def calcDistanceMatrix(mt0: nx.DiGraph, mt1: nx.DiGraph, verbose: bool=False) -> "tuple[np.matrix, np.matrix, np.matrix]": '''calculate difference matrix of LCA heights, also return LCA nodes of mt0 and mt1''' h0, nodes0 = getHeightMatrix(mt0, verbose=verbose) h1, nodes1 = getHeightMatrix(mt1, verbose=verbose) diff = np.subtract(h1, h0) if verbose: print(diff) return diff, nodes0, nodes1
[docs] def calcDistanceInfNorm(mt0: nx.DiGraph, mt1: nx.DiGraph, verbose: bool=False) -> float: '''calculate infinity norm of two graphs, not used in the complete MT flow but useful as separate utility''' return np.max(np.abs(calcDistanceMatrix(mt0, mt1, verbose=verbose)[0]))
[docs] def getUnitVector(angle: float) -> "tuple[float,float]": '''get 2D unit vector of input angle''' x = math.cos(angle) y = math.sin(angle) return (x,y)
[docs] def getMidpointKey(arr: "list[float]", target: float) -> float: '''get the midpoint key of the region that contains the target''' import bisect b = bisect.bisect_left( arr, target ) if b>=len(arr) or b == 0: return ((arr[-1]+arr[0])/2 + math.pi) % (math.pi*2) else: return (arr[b]+arr[b-1])/2
def _innerProduct(unitVec: "tuple[float,float]", pos: "tuple[float,float]") -> float: # compute inner product of 2D vector with position return(unitVec[0] * pos[0] + unitVec[1] * pos[1])
[docs] def computeGraphDistanceAtAngle(angle: float, G1: nx.Graph, G2: nx.Graph, criticalDict: "dict[float, np.matrix]", midpointDict, verbose=False) -> float: '''compute distance of two embedded graphs at a given angle using cached precomputation and inferring all other angles''' angle = angle % (math.pi*2) criticalAngles = list(criticalDict.keys()) if angle in criticalAngles: if verbose: print(f'using critical angle cache for: {angle}\n') diff = np.max(np.abs(criticalDict[angle]['diff'])) else: # convert angle to unit vector unitVec = getUnitVector(angle) # find the computed midpoint matrix to work off of key = getMidpointKey(criticalAngles, angle) key = math.floor(key*1e9) / 1e9 if verbose: print(f'found midpoint key {key} for angle {angle}\n') # intentionally ignoring the heights computed for the midpoint LCA0 = midpointDict[key]['LCA0'] LCA1 = midpointDict[key]['LCA1'] # both matrices should be same shape assert(LCA0.shape==LCA1.shape) A0 = np.matrix(np.zeros(LCA0.shape)) A1 = np.matrix(np.zeros(LCA1.shape)) # inner product of unit vector and LCA position for i in range(A0.shape[0]): for j in range(A0.shape[1]): pos0 = G1.nodes[LCA0[i,j]]['pos'] A0[i,j] = _innerProduct(unitVec, pos0) pos1 = G2.nodes[LCA1[i,j]]['pos'] A1[i,j] = _innerProduct(unitVec, pos1) diff = np.max(np.abs(np.subtract(A1, A0))) return diff
[docs] def computeDistanceAtAngleFromMT(g1_orig: nx.Graph, g2_orig: nx.Graph, angle: float, precision: int=5, show: bool=False, verbose: bool=False) -> "tuple[np.matrix, np.matrix, np.matrix]": '''given an angle and two graphs, compute full difference matrix and LCA node matrix''' g1 = g1_orig.copy() g2 = g2_orig.copy() filtration = findFiltration(angle) # assuming all the nodes are integers - using the conversion isn't reliable mt1 = buildMergeTree(g1, filtration, precision=precision, show=show, size=max(g1.nodes())+1, verbose=verbose) mt2 = buildMergeTree(g2, filtration, precision=precision, show=show, size=max(g2.nodes())+1, verbose=verbose) g1_proj = _mtToOtherGraph(mt2, g1, verbose=verbose) g2_proj = _mtToOtherGraph(mt1, g2, verbose=verbose) mt1_proj = buildMergeTree(g1_proj, filtration, precision=precision, show=show, size=max(g1_proj.nodes())+1, verbose=verbose) mt2_proj = buildMergeTree(g2_proj, filtration, precision=precision, show=show, size=max(g2_proj.nodes())+1, verbose=verbose) assert mt1_proj.number_of_nodes() == mt2_proj.number_of_nodes(), f"Error graph sizes don't match at {angle}, Graph 1: {mt1_proj.nodes()}, Graph 2: {mt2_proj.nodes()}" return calcDistanceMatrix(mt1_proj, mt2_proj)
[docs] def computeDistanceFull(g1: nx.Graph, g2: nx.Graph, precision: int=5, show: bool=False, verbose: bool=False) -> "tuple[dict[str,np.matrix], dict[str,np.matrix]]": '''calculate all necessary pre-computations of height and LCA matrices for every critical angle and midpoint of two embedded graphs''' criticalAngles, midpoints = computeAllAngles(g1, g2) c_dict = {} for angle in criticalAngles: if show: print(f"using critical angle: {angle}") diff, LCA0, LCA1 = computeDistanceAtAngleFromMT(g1, g2, angle, precision=precision, show=show, verbose=verbose) c_dict[angle] = {'diff':diff, 'LCA0':LCA0, 'LCA1':LCA1} m_dict = {} for angle in midpoints: angle = math.floor(angle * 1e9) / 1e9 if show: print(f"using midpoint: {angle}") diff, LCA0, LCA1 = computeDistanceAtAngleFromMT(g1, g2, angle, precision=precision, show=show, verbose=verbose) m_dict[angle] = {'diff':diff, 'LCA0':LCA0, 'LCA1':LCA1} return c_dict, m_dict
[docs] def directedMergeTreeDistance(G1_orig: nx.Graph, G2_orig: nx.Graph, precision: int=5, plot: bool=True, show: bool=False, verbose: bool=False, xMin: float=0, xMax: float=2*math.pi, n: int=10000) -> "tuple[np.array, np.array]": '''get distances of two embedded graphs over linspace of n points from xMin to xMax, full driver code, plot to show graph, show is for internal merge trees''' G1, G2 = prepareTwoGraphs(G1_orig, G2_orig) # precache given the prepared graphs c_dict, m_dict = computeDistanceFull(G1, G2, precision=precision, show=show, verbose=verbose) X = np.linspace(xMin, xMax, n) Y = [computeGraphDistanceAtAngle(x, G1, G2, c_dict, m_dict) for x in X] if plot: plt.scatter(X,Y, marker=',', s=1) plt.show() return X, Y