Source code for cereeberus.compute.merge

import networkx as nx
import numpy as np

[docs] def isMerge(T,fx): """ This function takes in a networkx tree or reeb graph and function, and checks to see if it is a merge tree. This assumes that the root node(s) has/have a function value of np.inf. Args: T (Reeb Graph): Networkx graph or reeb graph fx (dict): function values Returns: isMerge(bool): True if T is a merge tree, False if T is not a merge tree """ import numpy as np import networkx as nx from cereeberus.compute.degree import up_degree from cereeberus.reeb.reebgraph import Reeb if type(T) is nx.classes.multigraph.MultiGraph or type(T) is nx.classes.digraph.DiGraph: node_list = list(T.nodes) up_deg = up_degree(T, fx) for i in node_list: if (up_deg[i]==1 or (fx[i]==np.inf and up_deg[i]==0)) == True: 1 == 1 else: return False elif type(T) is Reeb: node_list = list(T.nodes) for i in node_list: if (T.node_properties[i]['up_deg']==1 or (fx[i]==np.inf and T.node_properties[i]['up_deg']==0)) == True: 1 == 1 else: return False else: raise TypeError('Graph is not a networkx graph or Reeb graph') return True
[docs] def computeMergeTree(R, filtration: "tuple[tuple[float,float], tuple[float, float], int]" = [[8,0], [8, 7], 1], infAdjust: int=None, precision: int=5, size: int=0, verbose: bool=False, filter: bool = False): """ main function to build merge tree for a given graph and filtration Args: R (Reeb Graph): Reeb Graph filtration (tuple of tuples): filtration for merge tree infAdjust (int): parameter to adjust infinite value for root node precision (int): precision size (int): size verbose (bool): verbose Returns: rmt: Merge Tree as a Reeb Graph object """ from cereeberus.compute.degree import remove_isolates from cereeberus.compute.uf import UnionFind from cereeberus.compute.uf import getSortedNodeHeights import networkx as nx from cereeberus.reeb.merge import mergeTree Rmt = remove_isolates(R) # digraph for LCA calcs and it's a tree mt = nx.DiGraph() # to handle special indexes post projections (has nodes named >n) if size != 0: uf = UnionFind(size, verbose=verbose) else: uf = UnionFind(len(Rmt.nodes), verbose=verbose) visited = set() numComponents = 0 if filter==True: heights = getSortedNodeHeights(Rmt, filtration, precision) else: heights = R.heights if verbose: print("heights:" + str(heights)) # this is the first node of min height since list topMerge = heights[0][0] for node, height in heights: # track visited nodes (helps deal with equal heights) if verbose: print(f"now processing node{node}, with {numComponents} already found components") visited.add(node) # check to see if the node has been told it is the endpoint of a previous grouping (the endpoint of an already found edge) # perform find to make sure these groupings are not the same possibleGroups = Rmt.nodes[node].get('groups', []) # if this edge has never been told anything, no existing edges # add this node in merge tree as start of a new branch if possibleGroups == []: if verbose: print(f"{node} is unconnected, about to add {numComponents}, {height}") mt.add_node(node, pos=(numComponents, height), fx=height) numComponents += 1 else: # iterate through possible groups via unionFind to determine if this is a merge point or one connected component componentSet = set() for possibleGroup in possibleGroups: componentSet.add(uf.find(possibleGroup)) componentList = list(componentSet) if verbose: print( f"received {componentList} membership") # if they are all the same group, this node is also part of this group # ignore on merge tree if its part of original graph # place this on if its a key label from the other graph's merge tree if len(componentList) == 1: myRoot = componentList.pop() uf.union(node, myRoot) # only add to merge tree if a special key label point if Rmt.nodes[node].get('projected', False): if verbose: print(f"although connected, key label{node}, existing connected to {myRoot}, adding still") mt.add_node(node, pos=(mt.nodes[myRoot]['pos'][0], height), fx=height) mt.add_edge(node, myRoot) # change the root to represent the current head of merge point if verbose: print(f"rerooting component {myRoot} to {node}") uf.rerootComponent(node, node) # this point could be a top merge point to infinity root topMerge = node else: # skip if on the same graph if verbose: print(f"skipping node{node}, existing connected to {myRoot}") else: # else this node is the merge point, add on merge tree and perform union if verbose: print(f"about to add {numComponents}, {height}, updating topMerge") topMerge = node mt.add_node(node, pos=(numComponents-len(componentList), height), fx=height) for component in componentList: componentRoot = uf.find(component) if verbose: print(f"unioning node{node} and componentRoot of node {componentRoot}") # union each component uf.union(node, componentRoot) # track on merge tree mt.add_edge(node, componentRoot) if verbose: print(f"rerooting {componentRoot} to merge point {node}") # change the root to represent the current head of merge point uf.rerootComponent(node, node) numComponents -= 1 # pass along the finalized group to all the edges above myGroup = uf.find(node) for neighbor in Rmt.neighbors(node): # lower height neighbors seen before if verbose: print( f"neighbor{neighbor}") if neighbor in visited: if verbose: print(f"visited{neighbor} already") continue # pass new info groups = Rmt.nodes[neighbor].get('groups', []) Rmt.nodes[neighbor]['groups'] = groups + [myGroup] # add final "inf" point, but visualize as max height + 10% of height range unless passed in if infAdjust is None: infAdjust = (heights[-1][1] - heights[0][1] ) * 0.1 infHeight = heights[-1][1] + infAdjust mt.add_node('inf', pos=(0, infHeight), fx=float('inf')) mt.add_edge('inf', topMerge) mt = nx.MultiGraph(mt) fx=nx.get_node_attributes(mt, 'fx') rmt = mergeTree(mt, fx) return rmt