Source code for geometric2dr.decomposition.random_walk_patterns

"""Functions for inducing random walk patterns within the graphs
as in Gartner et al.

Inspired by the implementation of anonymous walks by Sergey Ivanov
and Evgeny Burnaev [1]_

"""

# Author: Paul Scherer 2020

# Standard libraries
import sys
import os
import glob
import pickle
import itertools
import random
from collections import defaultdict
from time import time
from tqdm import tqdm

# 3rd party
import numpy as np
import networkx as nx

# Internal 
from .utils import get_files

# Random seeds for reproduction
random.seed(1234)
np.random.seed(1234)

[docs]def load_graph(file_handle): """Loads a nx graph object and a corresponding numpy adjacency matrix given a GEXF file graph. Parameters ---------- file_handle : str path to gexf file Returns ------- graph : networkx graph Networkx graph instance of input gexf file adj_matrix : numpy ndarray The adjacency matrix of the graph """ graph = nx.read_gexf(file_handle) adj_matrix = nx.to_numpy_matrix(graph) return graph, adj_matrix
[docs]def random_step_node(rw_graph, node): '''Moves one step from the current according to probabilities of outgoing edges. Return next node. ''' r = random.uniform(0, 1) low = 0 for v in rw_graph[node]: p = rw_graph[node][v]['weight'] if r <= low + p: return v low += p
[docs]def random_walk_with_label_nodes(graph, rw_graph, node, steps): '''Creates anonymous walk from a node for arbitrary steps with usage of node labels. Returns a tuple with consequent nodes. ''' d = dict() count = 0 pattern = [] for i in range(steps + 1): label = graph.nodes[node]['Label'] if label not in d: d[label] = count count += 1 pattern.append(d[label]) node = random_step_node(rw_graph, node) return tuple(pattern)
[docs]def create_random_walk_graph(nx_graph): """Generate a random walk graph equivalent of a graph as described in anonymous walk embeddings """ # get name of the label on graph edges (assume all label names are the same) label_name = 'weight' RW = nx.DiGraph() for node in nx_graph: edges = nx_graph[node] total = float(sum([edges[v].get(label_name, 1) for v in edges if v != node])) for v in edges: if v != node: RW.add_edge(node, v, weight=edges[v].get(label_name, 1) / total) rw_graph = RW return rw_graph
[docs]def random_walks_graph(nx_graph, walk_length, neighborhood_size=0): """Perform a random walk with cooccurring "neighbouring" walks starting from the same node for all the nodes in the graph. """ neigh_size = neighborhood_size + 1 # there is at least one random walk per node rw_graph = create_random_walk_graph(nx_graph) walks = [] for i, node in enumerate(rw_graph): rw = [str(random_walk_with_label_nodes(nx_graph, rw_graph, node, walk_length)) for _ in range(neighborhood_size)] # for anonymous_walk walks.append(rw) return walks
[docs]def save_rw_doc(gexf_fh, coocurrence_corpus, walk_length): """Saves a shortest path graph doc with the extension .spp Parameters ---------- gexf_fh : str sdf cooccurrence_corpus : list list of lists containing cooccuring shortest paths (ie those starting from the same starting node) walk_length : int length of the walk Returns ------- None : None Saves the induced shortest path patterns into a graph document for the graph specified in the `gexf_fh` """ open_fname = gexf_fh + ".rw" + str(walk_length) # if os.path.isfile(open_fname): # return with open(open_fname,'w') as fh: for spp_neighbourhood in coocurrence_corpus: sentence = str.join(" ", map(str, spp_neighbourhood)) print (sentence, file=fh)
[docs]def rw_corpus(corpus_dir, walk_length, neighborhood_size, saving_graph_docs=True): """Induces random walks up to a desired length across the input set of graphs Parameters ---------- corpus_dir : str path to directory containing graph files walk_length : int desired length of the random walk neighborhood_size : int number of cooccuring walks to find per walk source (node) saving_graph_docs : bool boolean on whether the graph documents should be generated or not Returns ------- None : None Induces the random walks of `walk_length` across the list of graphs files supplied in `corpus_dir` """ graph_files = get_files(corpus_dir, extension=".gexf") vocabulary = set() count_map = {} graph_map = {} corpus = [] pattern_to_short_id = {} short_id_to_patern = {} pattern_id = 0 for gexf_fh in tqdm(graph_files): gidx = int((os.path.basename(gexf_fh)).replace(".gexf", "")) graph, am = load_graph(gexf_fh) tmp_corpus = [] count_map[gidx] = {} cooccurence_corpus = [] # Induce random walks across the single graph rws_graph = random_walks_graph(graph, walk_length, neighborhood_size) for node_walks in rws_graph: paths_cooccurence = [] for source_wlk in node_walks: # make a shorter label for each pattern if source_wlk in pattern_to_short_id: label = pattern_to_short_id[source_wlk] else: pattern_to_short_id[source_wlk] = str(pattern_id) short_id_to_patern[str(pattern_id)] = source_wlk label = pattern_to_short_id[source_wlk] pattern_id += 1 tmp_corpus.append(label) paths_cooccurence.append(label) count_map[gidx][label] = count_map[gidx].get(label,0)+1 vocabulary.add(label) cooccurence_corpus.append(paths_cooccurence) corpus.append(tmp_corpus) if saving_graph_docs: save_rw_doc(gexf_fh, cooccurence_corpus, walk_length) graph_map = count_map prob_map = {gidx: {path: count/float(sum(paths.values())) \ for path, count in paths.items()} for gidx, paths in count_map.items()} num_graphs = len(count_map) return corpus, vocabulary, prob_map, num_graphs, graph_map
if __name__ == "__main__": corpus_dir = "../data/dortmund_gexf/MUTAG/" corpus, vocabulary, prob_map, num_graphs, graph_map = rw_corpus(corpus_dir, 4, 5)