Source code for graphical_models.rand.graphs

import random
import itertools as itr
from functools import partial
from typing import Union, List, Callable, Optional, Any, Dict

import numpy as np
from tqdm import tqdm
from scipy.special import comb
from networkx import barabasi_albert_graph, fast_gnp_random_graph

# === IMPORTS: LOCAL ===
from graphical_models import DAG, GaussDAG, SampleDAG, CamDAG
from graphical_models.classes.dags.discrete_dag import DiscreteDAG

# class RandWeightFn(Protocol):
#     def __call__(self, size: int) -> Union[float, List[float]]: ...

RandWeightFn = Any

def _coin(p, size=1):
    return np.random.binomial(1, p, size=size)

def unif_away_zero(low=.25, high=1, size=1, all_positive=False):
    if all_positive:
        return np.random.uniform(low, high, size=size)
    signs = (_coin(.5, size) - .5) * 2
    return signs * np.random.uniform(low, high, size=size)

def unif_away_original(original, dist_original=.25, low=.25, high=1):
    if dist_original < low:
        raise ValueError(
            "the lowest absolute value of weights must be larger than the distance between old weights and new weights")
    regions = []
    if original < 0:
        regions.append((low, high))
        if original - dist_original >= -high:
            regions.append((-high, original - dist_original))
        if original + dist_original <= -low:
            regions.append((original + dist_original, -low))
        regions.append((-high, -low))
        if original + dist_original <= high:
            regions.append((original + dist_original, high))
        if original - dist_original >= low:
            regions.append((low, original - dist_original))
    a, b = random.choices(regions, weights=[b - a for a, b in regions])[0]
    return np.random.uniform(a, b)

[docs]def directed_erdos( nnodes, density=None, exp_nbrs=None, size=1, as_list=False, random_order=True ) -> Union[DAG, List[DAG]]: """ Generate random Erdos-Renyi DAG(s) on `nnodes` nodes with density `density`. Parameters ---------- nnodes: Number of nodes in each graph. density: Probability of any edge. size: Number of graphs. as_list: If True, always return as a list, even if only one DAG is generated. Examples -------- >>> from graphical_models.rand import directed_erdos >>> d = directed_erdos(5, .5) """ assert density is not None or exp_nbrs is not None density = density if density is not None else exp_nbrs / (nnodes - 1) if size == 1: # if density < .01: # print('here') # random_nx = fast_gnp_random_graph(nnodes, density, directed=True) # d = DAG(nodes=set(range(nnodes)), arcs=random_nx.edges) # return [d] if as_list else d bools = _coin(density, size=int(nnodes * (nnodes - 1) / 2)) arcs = {(i, j) for (i, j), b in zip(itr.combinations(range(nnodes), 2), bools) if b} d = DAG(nodes=set(range(nnodes)), arcs=arcs) if random_order: nodes = list(range(nnodes)) d = d.rename_nodes(dict(enumerate(np.random.permutation(nodes)))) return [d] if as_list else d else: return [directed_erdos(nnodes, density, random_order=random_order) for _ in range(size)]
def directed_erdos_with_confounders( nnodes: int, density: Optional[float] = None, exp_nbrs: Optional[float] = None, num_confounders: int = 1, confounder_pervasiveness: float = 1, size=1, as_list=False, random_order=True ) -> Union[DAG, List[DAG]]: assert density is not None or exp_nbrs is not None density = density if density is not None else exp_nbrs / (nnodes - 1) if size == 1: confounders = list(range(num_confounders)) nonconfounders = list(range(num_confounders, nnodes+num_confounders)) bools = _coin(confounder_pervasiveness, size=int(num_confounders*nnodes)) confounder_arcs = {(i, j) for (i, j), b in zip(itr.product(confounders, nonconfounders), bools) if b} bools = _coin(density, size=int(nnodes * (nnodes - 1) / 2)) local_arcs = {(i, j) for (i, j), b in zip(itr.combinations(nonconfounders, 2), bools) if b} d = DAG(nodes=set(range(nnodes)), arcs=confounder_arcs|local_arcs) if random_order: nodes = list(range(nnodes+num_confounders)) d = d.rename_nodes(dict(enumerate(np.random.permutation(nodes)))) return [d] if as_list else d else: return [ directed_erdos_with_confounders( nnodes, density, num_confounders=num_confounders, confounder_pervasiveness=confounder_pervasiveness, random_order=random_order ) for _ in range(size) ]
[docs]def rand_weights(dag, rand_weight_fn: RandWeightFn = unif_away_zero) -> GaussDAG: """ Generate a GaussDAG from a DAG, with random edge weights independently drawn from `rand_weight_fn`. Parameters ---------- dag: DAG rand_weight_fn: Function to generate random weights. Examples -------- >>> import causaldag as cd >>> d = cd.DAG(arcs={(1, 2), (2, 3)}) >>> g = cd.rand.rand_weights(d) """ weights = rand_weight_fn(size=len(dag.arcs)) return GaussDAG(nodes=list(range(len(dag.nodes))), arcs=dict(zip(dag.arcs, weights)))
def rand_discrete_dag(dag, node_alphabets=None, alpha=1.0): if node_alphabets is None: node_alphabets = {node: list(range(3)) for node in dag.nodes} conditionals = dict() node2parents = dict() for node in dag.nodes: parents = list(dag.parents_of(node)) node2parents[node] = parents K = len(node_alphabets[node]) if len(parents) == 0: conditional = np.random.dirichlet([alpha] * K) if len(parents) != 0: parent_alphabet_shape = [len(node_alphabets[p]) for p in parents] conditional = np.random.dirichlet([alpha] * K, size=parent_alphabet_shape) conditionals[node] = conditional dag = DiscreteDAG( nodes=list(range(dag.nnodes)), arcs=dag.arcs, conditionals=conditionals, node2parents=node2parents, node_alphabets=node_alphabets ) return dag def tensor_product(vecs): current_prod = vecs[0] for i in range(1, len(vecs)): new_vec = vecs[i] current_prod = np.tensordot(current_prod, new_vec, axes=0) return current_prod def rand_noisy_or_dag(dag, q_low=0.75, q_high=1.0, p_low=0.5, p_high=1.0): conditionals = dict() node2parents = dict() for node in dag.nodes: parents = list(dag.parents_of(node)) node2parents[node] = parents q_j = np.random.uniform(q_low, q_high) # q_j = 1 if len(parents) > 0: parent_params = np.random.uniform(p_low, p_high, size=len(parents)) vecs = np.vstack((np.ones(len(parents)), parent_params)).T prob0 = tensor_product(vecs) prob0 = prob0 * q_j conditional = np.stack((prob0, 1 - prob0), axis=-1) else: prob0 = np.random.uniform(0, 1) * q_j conditional = np.array([prob0, 1 - prob0]) conditionals[node] = conditional dag = DiscreteDAG( nodes=list(range(dag.nnodes)), arcs=dag.arcs, conditionals=conditionals, node2parents=node2parents, node_alphabets={i: [0, 1] for i in range(dag.nnodes)} ) return dag def rand_normal_wishart(dag, var_dof=None, edge_scale=1, edge_mean=0): arcs = dict() variances = [] means = [] var_dof = var_dof if var_dof is not None else dag.nnodes + 1 nodes = list(dag.nodes) for node in nodes: parents = dag.parents_of(node) var_node = np.random.chisquare((var_dof - dag.nnodes + len(parents) + 1)/2) weights = np.random.normal(edge_mean, var_node * edge_scale, size=len(parents) + 1) variances.append(var_node) means.append(weights[-1]) arcs.update({(parent, node): weight for parent, weight in zip(parents, weights[:-1])}) return GaussDAG(nodes=list(dag.nodes), arcs=arcs, biases=means, variances=variances) def _leaky_relu(vals): return np.where(vals > 0, vals, vals * .01) def rand_nn_functions( dag: DAG, num_layers=3, nonlinearity=_leaky_relu, noise=lambda: np.random.laplace(0, 1) ) -> SampleDAG: s = SampleDAG(dag._nodes, arcs=dag._arcs) # for each node, create the conditional for node in dag._nodes: nparents = dag.indegree_of(node) layer_mats = [np.random.rand(nparents, nparents) * 2 for _ in range(num_layers)] def conditional(parent_vals): vals = parent_vals for a in layer_mats: vals = a @ vals vals = nonlinearity(vals) return vals + noise() s.set_conditional(node, conditional) return s def _cam_conditional(parent_vals, c_node, parent_weights, parent_bases, noise): return sum([ c_node * weight * base(val) for weight, base, val in zip(parent_weights, parent_bases, parent_vals) ]) + noise() def _cam_mean_function( parent_vals: np.ndarray, parents: list, c_node: float, parent_weights: np.ndarray, parent2base: dict ): assert parent_vals.shape[1] == len(parents) assert len(parent_weights) == len(parents) assert all(parent in parent2base for parent in parents) if len(parents) == 0: return np.zeros(parent_vals.shape[0]) parent_contribs = np.array([parent2base[parent](parent_vals[:, ix]) for ix, parent in enumerate(parents)]).T parent_contribs = parent_contribs * parent_weights parent_contrib = parent_contribs.sum(axis=1) return c_node * parent_contrib def rand_additive_basis( dag: DAG, basis: list, r2_dict: Optional[Union[Dict[int, float], float]] = None, rand_weight_fn: RandWeightFn = unif_away_zero, noise=lambda size: np.random.normal(0, 1, size=size), num_monte_carlo: int = 10000, progress=False, r2_key="nparents" ): """ Generate a random structural causal model (SCM), using `dag` as the structure, and with each variable being a general additive model (GAM) of its parents. Parameters ---------- dag: A DAG to use as the structure for the model. basis: Basis functions for the GAM. r2_dict: A dictionary mapping each number of parents to the desired signal-to-noise ratio (SNR) for nodes with that many parents. By default, 1/2 for any number of parents. rand_weight_fn: A function to generate random weights for each parent. noise: A function to generate random internal noise for each node. internal_variance: The variance of the above noise function. num_monte_carlo: The number of Monte Carlo samples used when computing coefficients to achieve the desired SNR. Examples -------- >>> import causaldag as cd >>> import numpy as np >>> d = cd.DAG(arcs={(1, 2), (2, 3), (1, 3)}) >>> basis = [np.sin, np.cos, np.exp] >>> r2_dict = {1: 1/2, 2: 2/3} >>> g = cd.rand.rand_additive_basis(d, basis, r2_dict) """ if r2_dict is None: r2_dict = {nparents: 1/2 for nparents in range(dag.nnodes)} if isinstance(r2_dict, float): r2_dict = {nparents: r2_dict for nparents in range(dag.nnodes)} cam_dag = CamDAG(dag._nodes, arcs=dag._arcs) top_order = dag.topological_sort() sample_dict = dict() # for each node, create the conditional node_iterator = top_order if not progress else tqdm(top_order) for node in node_iterator: parents = dag.parents_of(node) nparents = dag.indegree_of(node) parent2base = dict(zip(parents, random.choices(basis, k=nparents))) if nparents > 0: parent_vals = np.array([sample_dict[parent] for parent in parents]).T parent_weights = rand_weight_fn(size=nparents) else: parent_vals = np.zeros([num_monte_carlo, 0]) parent_weights = dict() noise_vals = noise(size=num_monte_carlo) internal_variance = np.var(noise_vals) # print("internal variance", node, internal_variance) c_node = None b_node = np.sqrt(1/internal_variance) if nparents > 0: mean_function_no_c = partial(_cam_mean_function, c_node=1, parent_weights=parent_weights, parent2base=parent2base) values_from_parents = mean_function_no_c(parent_vals, parents) variance_from_parents = np.var(values_from_parents) try: if r2_key == "nparents": desired_r2 = r2_dict[nparents] elif r2_key == "node": desired_r2 = r2_dict[node] except ValueError: raise Exception(f"`r2_dict` does not specify a desired R^2 for nodes with {nparents} parents") c_node = np.sqrt(desired_r2 / variance_from_parents) b_node = np.sqrt((1 - desired_r2) / internal_variance) if np.isnan(c_node): raise ValueError # print(node, parents, variance_from_parents, parent_weights, c_node, b_node) mean_function = partial(_cam_mean_function, c_node=c_node, parent_weights=parent_weights, parent2base=parent2base) mean_vals = mean_function(parent_vals, parents) sample_dict[node] = mean_vals + noise_vals * b_node # print("variance of samples", np.var(sample_dict[node])) cam_dag.set_mean_function(node, mean_function) cam_dag.set_noise(node, create_scaled_noise_function(b_node, noise)) return cam_dag def create_scaled_noise_function(val, noise): def scaled_noise_function(size): return val * noise(size) return scaled_noise_function # OPTION 1 # - equally predictable given parents (signal to noise ratio): internal variance \propto variance from the parents # - compute variance of each parent. add up. set my internal noise variance \propto variance from parents # - always keep noise variance same, scale signal coefficients # OPTION 2 # - bound each variable # OPTION 3 def directed_random_graph(nnodes: int, random_graph_model: Callable, size=1, as_list=False) -> Union[DAG, List[DAG]]: if size == 1: # generate a random undirected graph edges = random_graph_model(nnodes).edges # generate a random permutation random_permutation = np.arange(nnodes) np.random.shuffle(random_permutation) arcs = [] for edge in edges: node1, node2 = edge node1_position = np.where(random_permutation == node1)[0][0] node2_position = np.where(random_permutation == node2)[0][0] if node1_position < node2_position: source = node1 endpoint = node2 else: source = node2 endpoint = node1 arcs.append((source, endpoint)) d = DAG(nodes=set(range(nnodes)), arcs=arcs) return [d] if as_list else d else: return [directed_random_graph(nnodes, random_graph_model) for _ in range(size)] def directed_barabasi(nnodes: int, nattach: int, size=1, as_list=False) -> Union[DAG, List[DAG]]: random_graph_model = lambda nnodes: barabasi_albert_graph(nnodes, nattach) return directed_random_graph(nnodes, random_graph_model, size=size, as_list=as_list) def alter_weights( gdag: GaussDAG, prob_altered: float = None, num_altered: int = None, prob_added: float = None, num_added: int = None, prob_removed: float = None, num_removed: int = None, rand_weight_fn=unif_away_zero, rand_change_fn=unif_away_original ): """ Return a copy of a GaussDAG with some of its arc weights randomly altered by `rand_weight_fn`. Parameters ---------- gdag: GaussDAG prob_altered: Probability each arc has its weight altered. num_altered: Number of arcs whose weights are altered. prob_added: Probability that each missing arc is added. num_added: Number of missing arcs added. prob_removed: Probability that each arc is removed. num_removed: Number of arcs removed. rand_weight_fn: Function that returns a random weight for each new edge. rand_change_fn: Function that takes the current weight of an edge and returns the new weight. """ if num_altered is None and prob_altered is None: raise ValueError("Must specify at least one of `prob_altered` or `num_altered`.") if num_added is None and prob_added is None: raise ValueError("Must specify at least one of `prob_added` or `num_added`.") if num_removed is None and prob_removed is None: raise ValueError("Must specify at least one of `prob_removed` or `num_removed`.") if num_altered + num_removed > gdag.num_arcs: raise ValueError( f"Tried altering {num_altered} arcs and removing {num_removed} arcs, but there are only {gdag.num_arcs} arcs in this DAG.") num_missing_arcs = comb(gdag.nnodes, 2) - gdag.num_arcs if num_added > num_missing_arcs: raise ValueError( f"Tried adding {num_added} arcs but there are only {num_missing_arcs} arcs missing from the DAG.") # GET NUMBER ADDED/CHANGED/REMOVED num_altered = num_altered if num_altered is not None else np.random.binomial(gdag.num_arcs, prob_altered) num_removed = num_removed if num_removed is not None else np.random.binomial(gdag.num_arcs, prob_removed) num_removed = min(num_removed, gdag.num_arcs - num_altered) num_added = num_added if num_added is not None else np.random.binomial(num_missing_arcs, prob_added) # GET ACTUAL ARCS THAT ARE ADDED/CHANGED/REMOVED altered_arcs = random.sample(list(gdag.arcs), num_altered) removed_arcs = random.sample(list(gdag.arcs - set(altered_arcs)), num_removed) valid_arcs_to_add = set(itr.combinations(gdag.topological_sort(), 2)) - gdag.arcs added_arcs = random.sample(list(valid_arcs_to_add), num_added) # CREATE NEW DAG new_gdag = gdag.copy() weights = gdag.arc_weights for i, j in altered_arcs: new_gdag.set_arc_weight(i, j, rand_change_fn(weights[(i, j)])) for i, j in removed_arcs: new_gdag.remove_arc(i, j) new_weights = rand_weight_fn(size=num_added) for (i, j), val in zip(added_arcs, new_weights): new_gdag.set_arc_weight(i, j, val) return new_gdag __all__ = [ 'directed_erdos', 'directed_erdos_with_confounders', 'rand_weights', 'unif_away_zero', 'directed_barabasi', 'directed_random_graph', 'rand_nn_functions', 'unif_away_original', 'alter_weights', 'rand_additive_basis', 'rand_normal_wishart', 'rand_discrete_dag', 'rand_noisy_or_dag' ] if __name__ == '__main__': d = directed_erdos(10, .5, random_order=False) cam_dag = rand_additive_basis(d, [np.sin]) samples = cam_dag.sample(100) cond_means = cam_dag.conditional_mean(cond_values=np.ones([10, 1]), cond_nodes=[0])