Source code for cereeberus.reeb.mapper

from cereeberus import ReebGraph
from ..distance.labeled_blocks import LabeledBlockMatrix as LBM
from ..distance.labeled_blocks import LabeledMatrix as LM
from ..compute.unionfind import UnionFind
import numpy as np

[docs] class MapperGraph(ReebGraph): """ A mapper graph structure. This inherits the properties of the Reeb graph in that it is a graph with a function given on the vertices, but with some additional requirements. - The values are integers in some range, [n_low, n_low+1, \cdots, n_high], although we consider the funciton values to be [\delta * n_low, \delta* (n_low+1), \cdots, \delta * n_high] for a stored delta - If an edge crosses a value, it has a vertex (so that the inverse image of any integer is only vertices, not interiors of edges) - An internal delta is stored so that this can be interpreted as function values [\delta * n_low, \delta* (n_low+1), \cdots, \delta * n_high] """
[docs] def __init__(self, G=None, f={}, delta = None, seed = None, verbose=False): # Check that $f$ values are only integers if not all([isinstance(f[v], int) for v in f]): raise ValueError("Function values must be integers.") if delta is None: self.delta = 1 else: self.delta = delta super().__init__(G, f, seed, verbose) self.mapperify()
[docs] def add_edge(self, u, v, reset_pos=True): """ Add an edge to the graph. This will also update the internal structure to make sure it satisfies the mapper properties. """ super().add_edge(u, v, reset_pos) self.mapperify()
[docs] def mapperify(self): """ Take the internal structure and make sure it satisfies the requirement that all edges have adjacent function values. """ # If we're initializing with nothing, this should pass. # Note that if self.n_low is None, then self.n_high and self.delta # are both None as well but I am not currently checking that. try: n_low = min(self.f.values()) n_high = max(self.f.values()) except: return for i in range(n_low,n_high+1): e_list = [e for e in self.edges() if self.f[e[0]] < i and self.f[e[1]] > i] for e in e_list: w_name = self.get_next_vert_name() self.subdivide_edge(*e,w_name, i)
[docs] def add_node(self, vertex, f_vertex, reset_pos=True): """ Same as adding a node in Reeb, but with the additional requirement that the function value is an integer. """ if not isinstance(f_vertex, int): raise ValueError("Function values must be integers.") return super().add_node(vertex, f_vertex, reset_pos)
[docs] def set_pos_from_f(self, seed=None, verbose=False): """ Same as the Reeb graph function, but we want to draw the vertex locations at delta*function value. """ super().set_pos_from_f(seed, verbose) for v in self.nodes(): self.pos_f[v] = (self.pos_f[v][0],self.delta * self.f[v])
[docs] def induced_subgraph(self, nodes): """ Returns the subgraph of the mapper graph induced by the nodes in the list nodes. Parameters: nodes (list): The list of nodes to include in the subgraph. Returns: MapperGraph """ R = super().induced_subgraph(nodes) return R.to_mapper(self.delta)
[docs] def smoothing_and_maps(self, n = 1): """ Compute the smoothing of a mapper graph as given in todo: Cite the paper. Note that the input :math:`n` parameter is related to the integer function values, not the delta-scaled function values. Parameters: n (int): The amount of smoothing Returns: tuple: MapperGraph, vertex_map, edge_map """ if type(n) != int: raise ValueError("Smoothing amount must be an integer.") M_n, V_map, E_map = super().smoothing_and_maps(n) # E_map is a dictonary with output lists of edges, but we should only have one edge to one edge in the mapper graph case # This just strips E_map[key] = [(u,v,0)] to instead be E_map[key] = (u,v,0) E_map = {key : E_map[key][0] for key in E_map} M_n = M_n.to_mapper(self.delta) return M_n, V_map, E_map
[docs] def smoothing(self, n=1): """ Compute the smoothing of a mapper graph as given in todo: Cite the paper. Note that the input :math:`n` parameter is related to the integer function values, not the delta-scaled function values. Args: n (int, optional): Smoothing amount. Defaults to 1. Raises: ValueError: _description_ Returns: _type_: _description_ """ M_n, _, _ = self.smoothing_and_maps(n) return M_n
#------------------------------# # Functions for computing thickening distance matrix #------------------------------#
[docs] def thickening_distance_by_level(self, level, obj_type = 'V'): """ Get the thickening distance matrix at a given level. This distance is the amount of thickening needed before the given pair of vertices at that level map to the same connected component. Parameters: level : int. The level to get the thickening distance matrix at. obj_type : str. 'V' or 'E' to get the distance matrix for vertices or edges, respectively. Default is 'V'. Returns: LabeledMatrix """ # Dict to get list of vertices at a level LevelVerts = self.func_to_vertex_dict() # Dict to get lists of edges at a level LevelEdges = self.func_to_edge_dict() # Current level to be checking L = level # Objects to be checking if obj_type == 'V': # Vertices at the current level rowLabels = LevelVerts[L] elif obj_type == 'E': # Edges at the current level rowLabels = LevelEdges[L] else: raise ValueError("Type must be 'V' or 'E'.") # If there's only one object, the distance is 0. # Return the little matrix block if len(rowLabels) == 1: D = LM(rows = rowLabels, cols = rowLabels) return D # Max difference to check max_diff = max(L-self.min_f(), self.max_f()-L) # Distance matrix for this level D = np.zeros(shape = (len(rowLabels), len(rowLabels))) - 1 D += np.identity(len(rowLabels)) D = LM(D, rows = rowLabels, cols = rowLabels) # Initialize a union find object UF = UnionFind(list(self.nodes())) # Loop through the levels to add edges, and update teh distance matrix if # a pair of objects not already merged. for k in range(1, max_diff+1): if obj_type == 'V': up_level = L+k-1 down_level = L-k elif obj_type == 'E': up_level = L+k-1 down_level = L-k+1 U = [] # Add edges at each up and down level # try/except is to ignore entries without edges try: # up verts U.extend(LevelEdges[up_level]) except: pass try: # down verts U.extend(LevelEdges[down_level]) except: pass # Add these edges to the union find object for e in U: UF.union(e[0], e[1]) if obj_type == 'V': for v in rowLabels: for u in rowLabels: if v != u and D.array[rowLabels.index(v)][rowLabels.index(u)] == -1: if UF.find(v) == UF.find(u): # print(f"found {v} and {u}") D.array[rowLabels.index(v)][rowLabels.index(u)] = k D.array[rowLabels.index(u)][rowLabels.index(v)] = k elif obj_type == 'E': for i, e in enumerate(rowLabels): for j, f in enumerate(rowLabels): if e != f and D.array[i][j] == -1: if UF.find(e[0]) == UF.find(f[0]): # and UF.find(e[1]) == UF.find(f[1]): # print(f"found {e} and {f}") D.array[i][j] = k D.array[j][i] = k # Check if there are still any entries of -1 # If there are none, no need to keep adding edges if not np.any(D.array == -1): break # If there are still -1's, set them to np.inf if np.any(D.array == -1): D.array[D.array == -1] = np.inf return D
[docs] def thickening_distance_matrix(self, obj_type = 'V'): """ Get the thickening distance matrix for the entire mapper graph. This is a labeled block matrix with rows and columns indexed by vertices, and entries given by the thickening distance between the two vertices. Returns: LabeledBlockMatrix """ if obj_type == 'V': V_dict = self.func_to_vertex_dict() DistMat = LBM(rows_dict= V_dict, cols_dict = V_dict) elif obj_type == 'E': E_dict = self.func_to_edge_dict() DistMat = LBM(rows_dict= E_dict, cols_dict = E_dict) for i in DistMat.get_all_block_indices(): D = self.thickening_distance_by_level(i, obj_type = obj_type) DistMat[i] = D return DistMat