Source code for graphical_models.rand.graphs

# === IMPORTS: BUILT-IN ===
import random
import itertools as itr
from functools import partial
from typing import Union, List, Callable, Optional, Any, Dict

# === IMPORTS: THIRD-PARTY ===
import numpy as np
from tqdm import tqdm
from scipy.special import comb
from networkx import barabasi_albert_graph, fast_gnp_random_graph

# === IMPORTS: LOCAL ===
from graphical_models import DAG, GaussDAG, SampleDAG, CamDAG
from graphical_models.classes.dags.discrete_dag import DiscreteDAG

# class RandWeightFn(Protocol):
#     def __call__(self, size: int) -> Union[float, List[float]]: ...


RandWeightFn = Any


def _coin(p, size=1):
    return np.random.binomial(1, p, size=size)


def unif_away_zero(low=.25, high=1, size=1, all_positive=False):
    if all_positive:
        return np.random.uniform(low, high, size=size)
    signs = (_coin(.5, size) - .5) * 2
    return signs * np.random.uniform(low, high, size=size)


def unif_away_original(original, dist_original=.25, low=.25, high=1):
    if dist_original < low:
        raise ValueError(
            "the lowest absolute value of weights must be larger than the distance between old weights and new weights")
    regions = []
    if original < 0:
        regions.append((low, high))
        if original - dist_original >= -high:
            regions.append((-high, original - dist_original))
        if original + dist_original <= -low:
            regions.append((original + dist_original, -low))
    else:
        regions.append((-high, -low))
        if original + dist_original <= high:
            regions.append((original + dist_original, high))
        if original - dist_original >= low:
            regions.append((low, original - dist_original))
    a, b = random.choices(regions, weights=[b - a for a, b in regions])[0]
    return np.random.uniform(a, b)


[docs]def directed_erdos( nnodes, density=None, exp_nbrs=None, size=1, as_list=False, random_order=True ) -> Union[DAG, List[DAG]]: """ Generate random Erdos-Renyi DAG(s) on `nnodes` nodes with density `density`. Parameters ---------- nnodes: Number of nodes in each graph. density: Probability of any edge. size: Number of graphs. as_list: If True, always return as a list, even if only one DAG is generated. Examples -------- >>> from graphical_models.rand import directed_erdos >>> d = directed_erdos(5, .5) """ assert density is not None or exp_nbrs is not None density = density if density is not None else exp_nbrs / (nnodes - 1) if size == 1: # if density < .01: # print('here') # random_nx = fast_gnp_random_graph(nnodes, density, directed=True) # d = DAG(nodes=set(range(nnodes)), arcs=random_nx.edges) # return [d] if as_list else d bools = _coin(density, size=int(nnodes * (nnodes - 1) / 2)) arcs = {(i, j) for (i, j), b in zip(itr.combinations(range(nnodes), 2), bools) if b} d = DAG(nodes=set(range(nnodes)), arcs=arcs) if random_order: nodes = list(range(nnodes)) d = d.rename_nodes(dict(enumerate(np.random.permutation(nodes)))) return [d] if as_list else d else: return [directed_erdos(nnodes, density, random_order=random_order) for _ in range(size)]
def directed_erdos_with_confounders( nnodes: int, density: Optional[float] = None, exp_nbrs: Optional[float] = None, num_confounders: int = 1, confounder_pervasiveness: float = 1, size=1, as_list=False, random_order=True ) -> Union[DAG, List[DAG]]: assert density is not None or exp_nbrs is not None density = density if density is not None else exp_nbrs / (nnodes - 1) if size == 1: confounders = list(range(num_confounders)) nonconfounders = list(range(num_confounders, nnodes+num_confounders)) bools = _coin(confounder_pervasiveness, size=int(num_confounders*nnodes)) confounder_arcs = {(i, j) for (i, j), b in zip(itr.product(confounders, nonconfounders), bools) if b} bools = _coin(density, size=int(nnodes * (nnodes - 1) / 2)) local_arcs = {(i, j) for (i, j), b in zip(itr.combinations(nonconfounders, 2), bools) if b} d = DAG(nodes=set(range(nnodes)), arcs=confounder_arcs|local_arcs) if random_order: nodes = list(range(nnodes+num_confounders)) d = d.rename_nodes(dict(enumerate(np.random.permutation(nodes)))) return [d] if as_list else d else: return [ directed_erdos_with_confounders( nnodes, density, num_confounders=num_confounders, confounder_pervasiveness=confounder_pervasiveness, random_order=random_order ) for _ in range(size) ]
[docs]def rand_weights(dag, rand_weight_fn: RandWeightFn = unif_away_zero) -> GaussDAG: """ Generate a GaussDAG from a DAG, with random edge weights independently drawn from `rand_weight_fn`. Parameters ---------- dag: DAG rand_weight_fn: Function to generate random weights. Examples -------- >>> import causaldag as cd >>> d = cd.DAG(arcs={(1, 2), (2, 3)}) >>> g = cd.rand.rand_weights(d) """ weights = rand_weight_fn(size=len(dag.arcs)) return GaussDAG(nodes=list(range(len(dag.nodes))), arcs=dict(zip(dag.arcs, weights)))
def rand_discrete_dag(dag, node_alphabets=None, alpha=1.0): if node_alphabets is None: node_alphabets = {node: list(range(3)) for node in dag.nodes} conditionals = dict() node2parents = dict() for node in dag.nodes: parents = list(dag.parents_of(node)) node2parents[node] = parents K = len(node_alphabets[node]) if len(parents) == 0: conditional = np.random.dirichlet([alpha] * K) if len(parents) != 0: parent_alphabet_shape = [len(node_alphabets[p]) for p in parents] conditional = np.random.dirichlet([alpha] * K, size=parent_alphabet_shape) conditionals[node] = conditional dag = DiscreteDAG( nodes=list(range(dag.nnodes)), arcs=dag.arcs, conditionals=conditionals, node2parents=node2parents, node_alphabets=node_alphabets ) return dag def tensor_product(vecs): current_prod = vecs[0] for i in range(1, len(vecs)): new_vec = vecs[i] current_prod = np.tensordot(current_prod, new_vec, axes=0) return current_prod def rand_noisy_or_dag(dag, q_low=0.75, q_high=1.0, p_low=0.5, p_high=1.0): conditionals = dict() node2parents = dict() for node in dag.nodes: parents = list(dag.parents_of(node)) node2parents[node] = parents q_j = np.random.uniform(q_low, q_high) # q_j = 1 if len(parents) > 0: parent_params = np.random.uniform(p_low, p_high, size=len(parents)) vecs = np.vstack((np.ones(len(parents)), parent_params)).T prob0 = tensor_product(vecs) prob0 = prob0 * q_j conditional = np.stack((prob0, 1 - prob0), axis=-1) else: prob0 = np.random.uniform(0, 1) * q_j conditional = np.array([prob0, 1 - prob0]) conditionals[node] = conditional dag = DiscreteDAG( nodes=list(range(dag.nnodes)), arcs=dag.arcs, conditionals=conditionals, node2parents=node2parents, node_alphabets={i: [0, 1] for i in range(dag.nnodes)} ) return dag def rand_normal_wishart(dag, var_dof=None, edge_scale=1, edge_mean=0): arcs = dict() variances = [] means = [] var_dof = var_dof if var_dof is not None else dag.nnodes + 1 nodes = list(dag.nodes) for node in nodes: parents = dag.parents_of(node) var_node = np.random.chisquare((var_dof - dag.nnodes + len(parents) + 1)/2) weights = np.random.normal(edge_mean, var_node * edge_scale, size=len(parents) + 1) variances.append(var_node) means.append(weights[-1]) arcs.update({(parent, node): weight for parent, weight in zip(parents, weights[:-1])}) return GaussDAG(nodes=list(dag.nodes), arcs=arcs, biases=means, variances=variances) def _leaky_relu(vals): return np.where(vals > 0, vals, vals * .01) def rand_nn_functions( dag: DAG, num_layers=3, nonlinearity=_leaky_relu, noise=lambda: np.random.laplace(0, 1) ) -> SampleDAG: s = SampleDAG(dag._nodes, arcs=dag._arcs) # for each node, create the conditional for node in dag._nodes: nparents = dag.indegree_of(node) layer_mats = [np.random.rand(nparents, nparents) * 2 for _ in range(num_layers)] def conditional(parent_vals): vals = parent_vals for a in layer_mats: vals = a @ vals vals = nonlinearity(vals) return vals + noise() s.set_conditional(node, conditional) return s def _cam_conditional(parent_vals, c_node, parent_weights, parent_bases, noise): return sum([ c_node * weight * base(val) for weight, base, val in zip(parent_weights, parent_bases, parent_vals) ]) + noise() def _cam_mean_function( parent_vals: np.ndarray, parents: list, c_node: float, parent_weights: np.ndarray, parent2base: dict ): assert parent_vals.shape[1] == len(parents) assert len(parent_weights) == len(parents) assert all(parent in parent2base for parent in parents) if len(parents) == 0: return np.zeros(parent_vals.shape[0]) parent_contribs = np.array([parent2base[parent](parent_vals[:, ix]) for ix, parent in enumerate(parents)]).T parent_contribs = parent_contribs * parent_weights parent_contrib = parent_contribs.sum(axis=1) return c_node * parent_contrib def rand_additive_basis( dag: DAG, basis: list, r2_dict: Optional[Union[Dict[int, float], float]] = None, rand_weight_fn: RandWeightFn = unif_away_zero, noise=lambda size: np.random.normal(0, 1, size=size), num_monte_carlo: int = 10000, progress=False, r2_key="nparents" ): """ Generate a random structural causal model (SCM), using `dag` as the structure, and with each variable being a general additive model (GAM) of its parents. Parameters ---------- dag: A DAG to use as the structure for the model. basis: Basis functions for the GAM. r2_dict: A dictionary mapping each number of parents to the desired signal-to-noise ratio (SNR) for nodes with that many parents. By default, 1/2 for any number of parents. rand_weight_fn: A function to generate random weights for each parent. noise: A function to generate random internal noise for each node. internal_variance: The variance of the above noise function. num_monte_carlo: The number of Monte Carlo samples used when computing coefficients to achieve the desired SNR. Examples -------- >>> import causaldag as cd >>> import numpy as np >>> d = cd.DAG(arcs={(1, 2), (2, 3), (1, 3)}) >>> basis = [np.sin, np.cos, np.exp] >>> r2_dict = {1: 1/2, 2: 2/3} >>> g = cd.rand.rand_additive_basis(d, basis, r2_dict) """ if r2_dict is None: r2_dict = {nparents: 1/2 for nparents in range(dag.nnodes)} if isinstance(r2_dict, float): r2_dict = {nparents: r2_dict for nparents in range(dag.nnodes)} cam_dag = CamDAG(dag._nodes, arcs=dag._arcs) top_order = dag.topological_sort() sample_dict = dict() # for each node, create the conditional node_iterator = top_order if not progress else tqdm(top_order) for node in node_iterator: parents = dag.parents_of(node) nparents = dag.indegree_of(node) parent2base = dict(zip(parents, random.choices(basis, k=nparents))) if nparents > 0: parent_vals = np.array([sample_dict[parent] for parent in parents]).T parent_weights = rand_weight_fn(size=nparents) else: parent_vals = np.zeros([num_monte_carlo, 0]) parent_weights = dict() noise_vals = noise(size=num_monte_carlo) internal_variance = np.var(noise_vals) # print("internal variance", node, internal_variance) c_node = None b_node = np.sqrt(1/internal_variance) if nparents > 0: mean_function_no_c = partial(_cam_mean_function, c_node=1, parent_weights=parent_weights, parent2base=parent2base) values_from_parents = mean_function_no_c(parent_vals, parents) variance_from_parents = np.var(values_from_parents) try: if r2_key == "nparents": desired_r2 = r2_dict[nparents] elif r2_key == "node": desired_r2 = r2_dict[node] except ValueError: raise Exception(f"`r2_dict` does not specify a desired R^2 for nodes with {nparents} parents") c_node = np.sqrt(desired_r2 / variance_from_parents) b_node = np.sqrt((1 - desired_r2) / internal_variance) if np.isnan(c_node): raise ValueError # print(node, parents, variance_from_parents, parent_weights, c_node, b_node) mean_function = partial(_cam_mean_function, c_node=c_node, parent_weights=parent_weights, parent2base=parent2base) mean_vals = mean_function(parent_vals, parents) sample_dict[node] = mean_vals + noise_vals * b_node # print("variance of samples", np.var(sample_dict[node])) cam_dag.set_mean_function(node, mean_function) cam_dag.set_noise(node, create_scaled_noise_function(b_node, noise)) return cam_dag def create_scaled_noise_function(val, noise): def scaled_noise_function(size): return val * noise(size) return scaled_noise_function # OPTION 1 # - equally predictable given parents (signal to noise ratio): internal variance \propto variance from the parents # - compute variance of each parent. add up. set my internal noise variance \propto variance from parents # - always keep noise variance same, scale signal coefficients # OPTION 2 # - bound each variable # OPTION 3 def directed_random_graph(nnodes: int, random_graph_model: Callable, size=1, as_list=False) -> Union[DAG, List[DAG]]: if size == 1: # generate a random undirected graph edges = random_graph_model(nnodes).edges # generate a random permutation random_permutation = np.arange(nnodes) np.random.shuffle(random_permutation) arcs = [] for edge in edges: node1, node2 = edge node1_position = np.where(random_permutation == node1)[0][0] node2_position = np.where(random_permutation == node2)[0][0] if node1_position < node2_position: source = node1 endpoint = node2 else: source = node2 endpoint = node1 arcs.append((source, endpoint)) d = DAG(nodes=set(range(nnodes)), arcs=arcs) return [d] if as_list else d else: return [directed_random_graph(nnodes, random_graph_model) for _ in range(size)] def directed_barabasi(nnodes: int, nattach: int, size=1, as_list=False) -> Union[DAG, List[DAG]]: random_graph_model = lambda nnodes: barabasi_albert_graph(nnodes, nattach) return directed_random_graph(nnodes, random_graph_model, size=size, as_list=as_list) def alter_weights( gdag: GaussDAG, prob_altered: float = None, num_altered: int = None, prob_added: float = None, num_added: int = None, prob_removed: float = None, num_removed: int = None, rand_weight_fn=unif_away_zero, rand_change_fn=unif_away_original ): """ Return a copy of a GaussDAG with some of its arc weights randomly altered by `rand_weight_fn`. Parameters ---------- gdag: GaussDAG prob_altered: Probability each arc has its weight altered. num_altered: Number of arcs whose weights are altered. prob_added: Probability that each missing arc is added. num_added: Number of missing arcs added. prob_removed: Probability that each arc is removed. num_removed: Number of arcs removed. rand_weight_fn: Function that returns a random weight for each new edge. rand_change_fn: Function that takes the current weight of an edge and returns the new weight. """ if num_altered is None and prob_altered is None: raise ValueError("Must specify at least one of `prob_altered` or `num_altered`.") if num_added is None and prob_added is None: raise ValueError("Must specify at least one of `prob_added` or `num_added`.") if num_removed is None and prob_removed is None: raise ValueError("Must specify at least one of `prob_removed` or `num_removed`.") if num_altered + num_removed > gdag.num_arcs: raise ValueError( f"Tried altering {num_altered} arcs and removing {num_removed} arcs, but there are only {gdag.num_arcs} arcs in this DAG.") num_missing_arcs = comb(gdag.nnodes, 2) - gdag.num_arcs if num_added > num_missing_arcs: raise ValueError( f"Tried adding {num_added} arcs but there are only {num_missing_arcs} arcs missing from the DAG.") # GET NUMBER ADDED/CHANGED/REMOVED num_altered = num_altered if num_altered is not None else np.random.binomial(gdag.num_arcs, prob_altered) num_removed = num_removed if num_removed is not None else np.random.binomial(gdag.num_arcs, prob_removed) num_removed = min(num_removed, gdag.num_arcs - num_altered) num_added = num_added if num_added is not None else np.random.binomial(num_missing_arcs, prob_added) # GET ACTUAL ARCS THAT ARE ADDED/CHANGED/REMOVED altered_arcs = random.sample(list(gdag.arcs), num_altered) removed_arcs = random.sample(list(gdag.arcs - set(altered_arcs)), num_removed) valid_arcs_to_add = set(itr.combinations(gdag.topological_sort(), 2)) - gdag.arcs added_arcs = random.sample(list(valid_arcs_to_add), num_added) # CREATE NEW DAG new_gdag = gdag.copy() weights = gdag.arc_weights for i, j in altered_arcs: new_gdag.set_arc_weight(i, j, rand_change_fn(weights[(i, j)])) for i, j in removed_arcs: new_gdag.remove_arc(i, j) new_weights = rand_weight_fn(size=num_added) for (i, j), val in zip(added_arcs, new_weights): new_gdag.set_arc_weight(i, j, val) return new_gdag __all__ = [ 'directed_erdos', 'directed_erdos_with_confounders', 'rand_weights', 'unif_away_zero', 'directed_barabasi', 'directed_random_graph', 'rand_nn_functions', 'unif_away_original', 'alter_weights', 'rand_additive_basis', 'rand_normal_wishart', 'rand_discrete_dag', 'rand_noisy_or_dag' ] if __name__ == '__main__': d = directed_erdos(10, .5, random_order=False) cam_dag = rand_additive_basis(d, [np.sin]) samples = cam_dag.sample(100) cond_means = cam_dag.conditional_mean(cond_values=np.ones([10, 1]), cond_nodes=[0])