Source code for geometric2dr.decomposition.anonymous_walk_patterns

"""This module contains algorithms useful for inducing anonymous walks in graphs.

The algorithms are adapted from the original source code by 
Ivanov and Burnaev 2018 "Anonymous Walk Embeddings" [1]_. Original reference 
implementation can be found in: https://github.com/nd7141/AWE

"""

# Author: Paul Scherer 2019
# Carry down MIT License

# Standard libraries
import sys
import os
import glob
import pickle
import itertools
import random
from collections import defaultdict
from time import time
from tqdm import tqdm

# 3rd party
import numpy as np
import networkx as nx

# Internal
from .utils import get_files

# Random seeds from reproduction
random.seed(2018)
np.random.seed(2018)


[docs]def load_graph(file_handle): """Loads a nx graph object and a corresponding numpy adjacency matrix given a GEXF file graph. Parameters ---------- file_handle : str path to gexf file Returns ------- graph : networkx graph Networkx graph instance of input gexf file adj_matrix : numpy ndarray The adjacency matrix of the graph """ graph = nx.read_gexf(file_handle) adj_matrix = nx.to_numpy_matrix(graph) return graph, adj_matrix
[docs]def all_paths(paths, steps, keep_last = False): '''Get all possible anonymous walks of length up to steps. Parameters ---------- Returns ------- ''' new_paths = [] last_step_paths = [[0, 1]] for i in range(2, steps+1): current_step_paths = [] for j in range(i + 1): for walks in last_step_paths: if walks[-1] != j and j <= max(walks) + 1: new_paths.append(walks + [j]) current_step_paths.append(walks + [j]) last_step_paths = current_step_paths # filter only on n-steps walks if keep_last: new_paths = list(filter(lambda path: len(path) == steps + 1, new_paths)) paths[steps] = new_paths return new_paths
[docs]def all_paths_edges(paths, steps, keep_last = True): '''Get all possible anonymous walks of length up to steps, using edge labels ''' new_paths = [] last_step_paths = [[]] for i in range(0, steps): current_step_paths = [] for j in range(i + 1): for walks in last_step_paths: if j <= max(walks + [0]) + 1: new_paths.append(walks + [j]) current_step_paths.append(walks + [j]) last_step_paths = current_step_paths if keep_last: new_paths = last_step_paths paths[steps] = new_paths return new_paths
[docs]def all_paths_nodes(paths, steps, keep_last = True): '''Get all possible anonymous walks of length up to steps, using node labels ''' new_paths = [] last_step_paths = [[0]] for i in range(1, steps+1): current_step_paths = [] for j in range(i + 1): for walks in last_step_paths: if j <= max(walks) + 1: new_paths.append(walks + [j]) current_step_paths.append(walks + [j]) last_step_paths = current_step_paths if keep_last: new_paths = last_step_paths paths[steps] = new_paths return new_paths
[docs]def all_paths_edges_nodes(paths, steps, keep_last = True): '''Get all possible anonymous walks of length up to steps, using edge-node labels ''' edge_paths = all_paths_edges(paths, steps, keep_last=keep_last) node_paths = all_paths_nodes(paths, steps, keep_last=keep_last) new_paths = [] for p1 in edge_paths: for p2 in node_paths: if len(p2) == len(p1) + 1: current_path = [p2[0]] for ix in range(len(p1)): current_path.append(p1[ix]) current_path.append(p2[ix+1]) new_paths.append(current_path) paths[steps] = new_paths return new_paths
[docs]def create_random_walk_graph(nx_graph): """Generate a random walk graph equivalent of a graph as described in anonymous walk embeddings """ # get name of the label on graph edges (assume all label names are the same) label_name = 'weight' RW = nx.DiGraph() for node in nx_graph: edges = nx_graph[node] total = float(sum([edges[v].get(label_name, 1) for v in edges if v != node])) for v in edges: if v != node: RW.add_edge(node, v, weight=edges[v].get(label_name, 1) / total) rw_graph = RW return rw_graph
[docs]def random_step_node(rw_graph, node): '''Moves one step from the current according to probabilities of outgoing edges. Return next node. ''' r = random.uniform(0, 1) low = 0 for v in rw_graph[node]: p = rw_graph[node][v]['weight'] if r <= low + p: return v low += p
[docs]def random_walk_node(rw_graph, node, steps): '''Creates anonymous walk from a node for arbitrary steps. Returns a tuple with consequent nodes. ''' d = dict() d[node] = 0 count = 1 walk = [d[node]] for i in range(steps): v = random_step_node(rw_graph, node) if v not in d: d[v] = count count += 1 walk.append(d[v]) node = v return tuple(walk)
[docs]def random_walk_with_label_nodes(graph, rw_graph, node, steps): '''Creates anonymous walk from a node for arbitrary steps with usage of node labels. Returns a tuple with consequent nodes. ''' d = dict() count = 0 pattern = [] for i in range(steps + 1): label = graph.nodes[node]['Label'] if label not in d: d[label] = count count += 1 pattern.append(d[label]) node = random_step_node(rw_graph, node) return tuple(pattern)
[docs]def random_walk_with_label_edges(graph, rw_graph, node, steps): '''Creates anonymous walk from a node for arbitrary steps with usage of edge labels. Returns a tuple with consequent nodes. ''' idx = 0 pattern = [] d = dict() for i in range(steps): v = random_step_node(rw_graph, node) label = int(graph[node][v]['Label']) if label not in d: d[label] = idx idx += 1 pattern.append(d[label]) return tuple(pattern)
[docs]def random_walk_with_label_edges_nodes(graph, rw_graph, node, steps): '''Creates anonymous walk from a node for arbitrary steps with usage of edge and node labels. Returns a tuple with consequent nodes. ''' node_idx = 0 edge_idx = 0 pattern = [0] node_labels = dict() edge_labels = dict() for i in range(steps): v = random_step_node(rw_graph, node) node_label = graph.nodes[node]['Label'] edge_label = int(graph[node][v]['Label']) if node_label not in node_labels: node_labels[node_label] = node_idx node_idx += 1 if edge_label not in edge_labels: edge_labels[edge_label] = edge_idx edge_idx += 1 pattern.append(node_labels[node_label]) pattern.append(edge_labels[edge_label]) node = v return tuple(pattern)
[docs]def anonymous_walk_node(graph, rw_graph, node, steps, label_setting=None): '''Creates anonymous walk from a node. ''' if label_setting is None: return random_walk_node(rw_graph, node, steps) elif label_setting == 'nodes': return random_walk_with_label_nodes(graph, rw_graph, node, steps) elif label_setting == 'edges': return random_walk_with_label_edges(graph, rw_graph, node, steps) elif label_setting == 'edges_nodes': return random_walk_with_label_edges_nodes(graph, rw_graph, node, steps)
[docs]def anonymous_walks(graph, neighborhood_size, walk_ids, awe_length, label_setting): """Generates anonymous walks for a given graph and input hyperparameters """ rw_graph = create_random_walk_graph(graph) aws = [] for i, node in enumerate(rw_graph): # aw = [str(walk_ids[random_walk_node(node, steps)]) for _ in range(neighborhood_size)] # for random_walk aw = [str(walk_ids[anonymous_walk_node(graph, rw_graph, node, awe_length, label_setting)]) for _ in range(neighborhood_size)] # for anonymous_walk aws.append(aw) return aws
[docs]def awe_corpus(corpus_dir, awe_length, label_setting, neighborhood_size=10, saving_graph_docs=True): """Induces anonymous walks up to a supplied length across a set of graphs. This function extracts anonymous walks of `awe_length` across the graphs in a dataset into a vocabulary. The function saves a graphdoc for each graph which records string identifiers of the anonymous walks present in each graph The main use case for this function is for the user to supply a path to the directory containing the .gexf file which contains .gexf files of each graph in a dataset. The decomposition function will produce a file with the extension *.awe_<`awe_length`>_<`label_setting`>* for each .gexf file which contains the anonymous walks patterns specified by a string hash controlled by the vocabulary of patterns across the list of graphs being studied. Parameters ---------- corpus_dir : str path to folder containing gexf graph files on which the substructure patterns should be induced awe_length : int desired length of anonymous walk patterns label_setting : str information of node/edge labels that should be used; this can be None, 'nodes', 'edges', 'edges_nodes' neighborhood_size : int (default=10) the number of anonymous walks to take from a source node saving_graph_docs : bool (default=True) boolean value which dictates whether graph documents will be generated and saved. Returns ------- None : None Induces the anonymous walks of `awe_length` across the list of gexf file paths supplied in `corpus_dir`. Graph "documents" with the extension *.awe_<`awe_length`>_<`label_setting`>* are created for each of the gexf files in the same directory containing string identifiers of the patterns induced in each graph. """ graph_files = get_files(corpus_dir, extension=".gexf") awe_corpus = {} # Map of possible anonymous walks paths = dict() if label_setting is None: all_paths(paths, awe_length, keep_last=True) elif label_setting == 'nodes': all_paths_nodes(paths, awe_length, keep_last=True) elif label_setting == 'edges': all_paths_edges(paths, awe_length, keep_last=True) elif label_setting == 'edges_nodes': all_paths_edges_nodes(paths, awe_length, keep_last=True) walk_ids = dict() for i, path in enumerate(paths[awe_length]): walk_ids[tuple(path)] = i label_suffix = "" if label_setting is not None: label_suffix = label_setting for gexf_fh in tqdm(graph_files): open_fname = gexf_fh + ".awe_" + str(awe_length) + "_" + label_suffix if os.path.exists(open_fname): continue gidx = int((os.path.basename(gexf_fh)).replace(".gexf", "")) graph, am = load_graph(gexf_fh) # the old write corpus rewritten as anonymous walks of a single graph aws_graph = anonymous_walks(graph, neighborhood_size, walk_ids, awe_length, label_setting) awe_corpus[gidx] = aws_graph if saving_graph_docs: # Write to the file. if label_suffix: open_fname = gexf_fh + ".awe_" + str(awe_length) + "_" + label_suffix else: open_fname = gexf_fh + ".awe_" + str(awe_length) + "_allpaths" with open(open_fname, 'w+') as fh: for aw in aws_graph: fh.write(' '.join(aw) + '\n') corpus = [] vocabulary = set() graph_map = {} for gidx in sorted(awe_corpus.keys()): count_map = {} tmp_corpus = [] for node_neighborhood_awalks in awe_corpus[gidx]: for pattern_str in node_neighborhood_awalks: vocabulary.add(pattern_str) count_map[pattern_str] = count_map.get(pattern_str, 0) + 1 tmp_corpus.append(pattern_str) corpus.append(tmp_corpus) graph_map[gidx] = count_map # Normalise the probabilities of a graphlet in a graph. prob_map = {gidx: {graphlet: count/float(sum(anon_walk_patterns.values())) \ for graphlet, count in anon_walk_patterns.items()} for gidx, anon_walk_patterns in graph_map.items()} num_graphs = len(graph_map) return corpus, vocabulary, prob_map, num_graphs, graph_map
if __name__ == '__main__': corpus_dir = "../data/dortmund_gexf/MUTAG/" awe_length = 4 label_setting = 'nodes' corpus, vocabulary, prob_map, num_graphs, graph_map = awe_corpus(corpus_dir, awe_length, label_setting, saving_graph_docs=True)