Module mudcod.network_simulations
Expand source code
import itertools
from pathlib import Path
import numpy as np
from numpy.random import default_rng
_eps = 10 ** (-5)
class DCBM:
def __init__(self, n=None, model_order_K=None, p_in=None, p_out=None, seed=0):
assert isinstance(n, int) and n > 1
assert isinstance(model_order_K, int) and model_order_K > 1
assert isinstance(p_in, tuple) and isinstance(p_out, tuple)
assert all(isinstance(p, float) for p in p_in) and len(p_in) == 2
assert all(isinstance(p, float) for p in p_out) and len(p_in) == 2
assert all(p > 0 for p in p_in) and all(p > 0 for p in p_out)
assert (p_out[1] < p_in[0]) and (p_out[1] + p_in[1] <= 1)
assert (p_in[0] <= p_in[1]) and (p_out[0] <= p_out[1])
self.n = n
self.model_order_K = model_order_K
self.p_in = p_in
self.p_out = p_out
self.rng = default_rng(seed)
def _get_random_z(self):
rng = self.rng
model_order_K = self.model_order_K
n = self.n
z_init = np.nonzero(
rng.multinomial(1, [1 / model_order_K] * model_order_K, size=n)
)[1]
return z_init
def _evolve_z(self, z_prev, r):
assert z_prev.shape[0] == self.n
assert isinstance(r, float) and abs(r) <= 1.0
rng = self.rng
n = self.n
model_order_K = self.model_order_K
e_r = rng.binomial(1, r, size=n)
z_tn = np.nonzero(
rng.multinomial(1, [1 / model_order_K] * model_order_K, size=n)
)[1]
z_new = np.where(e_r == 1, z_tn, z_prev)
return z_new
def get_adjacency(self, z, connectivity_matrix, psi):
assert z.shape[0] == self.n
assert (
connectivity_matrix.shape[0]
== connectivity_matrix.shape[1]
== self.model_order_K
)
rng = self.rng
n = self.n
adj = np.zeros((n, n), dtype=int)
p = np.empty((n, n))
bz = connectivity_matrix[np.ix_(z[:], z[:])]
p[:, :] = psi[np.newaxis, :] * psi[:, np.newaxis] * bz
adj_triu = rng.binomial(1, p)
adj[:, :] = np.triu(adj_triu, 1) + np.triu(adj_triu, 1).T
return adj
def simulate_dcbm(self, z, psi=None):
assert z.shape[0] == self.n
rng = self.rng
n = self.n
model_order_K = self.model_order_K
p_in, p_out = self.p_in, self.p_out
connectivity_matrix = np.full(
(model_order_K, model_order_K), rng.uniform(p_out[0], p_out[1], 1)
)
connectivity_matrix[np.diag_indices(model_order_K)] = rng.uniform(
p_in[0], p_in[1], model_order_K
)
if psi is None:
gamma1 = 1
gamma0 = 0.5
# gamma0 = np.sqrt(1 / max(p_in)) - 1 - _eps
psi = gamma1 * (np.random.permutation(n) / n) + gamma0
else:
assert all((psi**2 * max(p_in)) < 1) and all(psi > 0)
adj = self.get_adjacency(z, connectivity_matrix, psi)
return adj, z
class DynamicDCBM(DCBM):
def __init__(
self,
n=None,
model_order_K=None,
p_in=None,
p_out=None,
time_horizon=None,
r_time=None,
seed=0,
):
assert isinstance(r_time, float) and (r_time < 1) and (r_time >= 0)
assert isinstance(time_horizon, int) and (time_horizon > 1)
self.time_horizon = time_horizon
self.r_time = r_time
super().__init__(n, model_order_K, p_in, p_out)
def simulate_dynamic_dcbm(self):
n = self.n
th = self.time_horizon
adj_dynamic = np.empty((th, n, n), dtype=int)
z_dynamic = np.empty((th, n), dtype=int)
z_init = self._get_random_z()
adj_dynamic[0, :, :], z_dynamic[0, :] = self.simulate_dcbm(z_init)
for t in range(1, th):
z_dynamic[t, :] = self._evolve_z(z_dynamic[t - 1, :], self.r_time)
adj_dynamic[t, :, :], _ = self.simulate_dcbm(z_dynamic[t, :])
return adj_dynamic, z_dynamic
class MuSDynamicDCBM(DynamicDCBM):
def __init__(
self,
n=None,
model_order_K=None,
p_in=None,
p_out=None,
time_horizon=None,
r_time=None,
num_subjects=None,
r_subject=None,
seed=0,
):
assert isinstance(r_subject, float) and (r_subject < 1) and (r_subject >= 0)
assert isinstance(num_subjects, int) and (num_subjects > 1)
self.num_subjects = num_subjects
self.r_subject = r_subject
super().__init__(n, model_order_K, p_in, p_out, time_horizon, r_time)
def simulate_mus_dynamic_dcbm(self, setting=3):
n = self.n
th = self.time_horizon
num_sbj = self.num_subjects
adj_mus_dynamic = np.empty((num_sbj, th, n, n), dtype=int)
z_mus_dynamic = np.empty((num_sbj, th, n), dtype=int)
# Totally independent subjects, evolve independently.
if setting == 0:
for sbj in range(num_sbj):
(
adj_mus_dynamic[sbj, :, :, :],
z_mus_dynamic[sbj, :, :],
) = self.simulate_dynamic_dcbm()
# Subjects are siblings at time 0, then they evolve independently.
elif setting == 1:
z_init = self._get_random_z()
adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm(
z_init
)
for sbj in range(1, num_sbj):
z_mus_dynamic[sbj, 0, :] = self._evolve_z(z_init, self.r_subject)
adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, 0, :]
)
for sbj in range(0, num_sbj):
for t in range(1, th):
z_mus_dynamic[sbj, t, :] = self._evolve_z(
z_mus_dynamic[sbj, t - 1, :], self.r_time
)
adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, t, :]
)
# Subjects are siblings at each time point.
elif setting == 2:
(
adj_mus_dynamic[0, :, :, :],
z_mus_dynamic[0, :, :],
) = self.simulate_dynamic_dcbm()
for sbj in range(1, num_sbj):
for t in range(0, th):
z_mus_dynamic[sbj, t, :] = self._evolve_z(
z_mus_dynamic[0, t, :], self.r_subject
)
adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, t, :]
)
# Subjects are parents of each other at time 0, then they evolve independently.
elif setting == 3:
z_init = self._get_random_z()
adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm(
z_init
)
for sbj in range(1, num_sbj):
z_mus_dynamic[sbj, 0, :] = self._evolve_z(
z_mus_dynamic[sbj - 1, 0, :], self.r_subject
)
adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, 0, :]
)
for sbj in range(0, num_sbj):
for t in range(1, th):
z_mus_dynamic[sbj, t, :] = self._evolve_z(
z_mus_dynamic[sbj, t - 1, :], self.r_time
)
adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, t, :]
)
# Subjects are parents of each other at each time point.
elif setting == 4:
(
adj_mus_dynamic[0, :, :, :],
z_mus_dynamic[0, :, :],
) = self.simulate_dynamic_dcbm()
for sbj in range(1, num_sbj):
for t in range(0, th):
z_mus_dynamic[sbj, t, :] = self._evolve_z(
z_mus_dynamic[sbj - 1, t, :], self.r_subject
)
adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm(
z_mus_dynamic[sbj, t, :]
)
else:
raise ValueError(f"Given setting number {setting} is not defined.")
return adj_mus_dynamic, z_mus_dynamic
Classes
class DCBM (n=None, model_order_K=None, p_in=None, p_out=None, seed=0)
-
Expand source code
class DCBM: def __init__(self, n=None, model_order_K=None, p_in=None, p_out=None, seed=0): assert isinstance(n, int) and n > 1 assert isinstance(model_order_K, int) and model_order_K > 1 assert isinstance(p_in, tuple) and isinstance(p_out, tuple) assert all(isinstance(p, float) for p in p_in) and len(p_in) == 2 assert all(isinstance(p, float) for p in p_out) and len(p_in) == 2 assert all(p > 0 for p in p_in) and all(p > 0 for p in p_out) assert (p_out[1] < p_in[0]) and (p_out[1] + p_in[1] <= 1) assert (p_in[0] <= p_in[1]) and (p_out[0] <= p_out[1]) self.n = n self.model_order_K = model_order_K self.p_in = p_in self.p_out = p_out self.rng = default_rng(seed) def _get_random_z(self): rng = self.rng model_order_K = self.model_order_K n = self.n z_init = np.nonzero( rng.multinomial(1, [1 / model_order_K] * model_order_K, size=n) )[1] return z_init def _evolve_z(self, z_prev, r): assert z_prev.shape[0] == self.n assert isinstance(r, float) and abs(r) <= 1.0 rng = self.rng n = self.n model_order_K = self.model_order_K e_r = rng.binomial(1, r, size=n) z_tn = np.nonzero( rng.multinomial(1, [1 / model_order_K] * model_order_K, size=n) )[1] z_new = np.where(e_r == 1, z_tn, z_prev) return z_new def get_adjacency(self, z, connectivity_matrix, psi): assert z.shape[0] == self.n assert ( connectivity_matrix.shape[0] == connectivity_matrix.shape[1] == self.model_order_K ) rng = self.rng n = self.n adj = np.zeros((n, n), dtype=int) p = np.empty((n, n)) bz = connectivity_matrix[np.ix_(z[:], z[:])] p[:, :] = psi[np.newaxis, :] * psi[:, np.newaxis] * bz adj_triu = rng.binomial(1, p) adj[:, :] = np.triu(adj_triu, 1) + np.triu(adj_triu, 1).T return adj def simulate_dcbm(self, z, psi=None): assert z.shape[0] == self.n rng = self.rng n = self.n model_order_K = self.model_order_K p_in, p_out = self.p_in, self.p_out connectivity_matrix = np.full( (model_order_K, model_order_K), rng.uniform(p_out[0], p_out[1], 1) ) connectivity_matrix[np.diag_indices(model_order_K)] = rng.uniform( p_in[0], p_in[1], model_order_K ) if psi is None: gamma1 = 1 gamma0 = 0.5 # gamma0 = np.sqrt(1 / max(p_in)) - 1 - _eps psi = gamma1 * (np.random.permutation(n) / n) + gamma0 else: assert all((psi**2 * max(p_in)) < 1) and all(psi > 0) adj = self.get_adjacency(z, connectivity_matrix, psi) return adj, z
Subclasses
Methods
def get_adjacency(self, z, connectivity_matrix, psi)
-
Expand source code
def get_adjacency(self, z, connectivity_matrix, psi): assert z.shape[0] == self.n assert ( connectivity_matrix.shape[0] == connectivity_matrix.shape[1] == self.model_order_K ) rng = self.rng n = self.n adj = np.zeros((n, n), dtype=int) p = np.empty((n, n)) bz = connectivity_matrix[np.ix_(z[:], z[:])] p[:, :] = psi[np.newaxis, :] * psi[:, np.newaxis] * bz adj_triu = rng.binomial(1, p) adj[:, :] = np.triu(adj_triu, 1) + np.triu(adj_triu, 1).T return adj
def simulate_dcbm(self, z, psi=None)
-
Expand source code
def simulate_dcbm(self, z, psi=None): assert z.shape[0] == self.n rng = self.rng n = self.n model_order_K = self.model_order_K p_in, p_out = self.p_in, self.p_out connectivity_matrix = np.full( (model_order_K, model_order_K), rng.uniform(p_out[0], p_out[1], 1) ) connectivity_matrix[np.diag_indices(model_order_K)] = rng.uniform( p_in[0], p_in[1], model_order_K ) if psi is None: gamma1 = 1 gamma0 = 0.5 # gamma0 = np.sqrt(1 / max(p_in)) - 1 - _eps psi = gamma1 * (np.random.permutation(n) / n) + gamma0 else: assert all((psi**2 * max(p_in)) < 1) and all(psi > 0) adj = self.get_adjacency(z, connectivity_matrix, psi) return adj, z
class DynamicDCBM (n=None, model_order_K=None, p_in=None, p_out=None, time_horizon=None, r_time=None, seed=0)
-
Expand source code
class DynamicDCBM(DCBM): def __init__( self, n=None, model_order_K=None, p_in=None, p_out=None, time_horizon=None, r_time=None, seed=0, ): assert isinstance(r_time, float) and (r_time < 1) and (r_time >= 0) assert isinstance(time_horizon, int) and (time_horizon > 1) self.time_horizon = time_horizon self.r_time = r_time super().__init__(n, model_order_K, p_in, p_out) def simulate_dynamic_dcbm(self): n = self.n th = self.time_horizon adj_dynamic = np.empty((th, n, n), dtype=int) z_dynamic = np.empty((th, n), dtype=int) z_init = self._get_random_z() adj_dynamic[0, :, :], z_dynamic[0, :] = self.simulate_dcbm(z_init) for t in range(1, th): z_dynamic[t, :] = self._evolve_z(z_dynamic[t - 1, :], self.r_time) adj_dynamic[t, :, :], _ = self.simulate_dcbm(z_dynamic[t, :]) return adj_dynamic, z_dynamic
Ancestors
Subclasses
Methods
def simulate_dynamic_dcbm(self)
-
Expand source code
def simulate_dynamic_dcbm(self): n = self.n th = self.time_horizon adj_dynamic = np.empty((th, n, n), dtype=int) z_dynamic = np.empty((th, n), dtype=int) z_init = self._get_random_z() adj_dynamic[0, :, :], z_dynamic[0, :] = self.simulate_dcbm(z_init) for t in range(1, th): z_dynamic[t, :] = self._evolve_z(z_dynamic[t - 1, :], self.r_time) adj_dynamic[t, :, :], _ = self.simulate_dcbm(z_dynamic[t, :]) return adj_dynamic, z_dynamic
class MuSDynamicDCBM (n=None, model_order_K=None, p_in=None, p_out=None, time_horizon=None, r_time=None, num_subjects=None, r_subject=None, seed=0)
-
Expand source code
class MuSDynamicDCBM(DynamicDCBM): def __init__( self, n=None, model_order_K=None, p_in=None, p_out=None, time_horizon=None, r_time=None, num_subjects=None, r_subject=None, seed=0, ): assert isinstance(r_subject, float) and (r_subject < 1) and (r_subject >= 0) assert isinstance(num_subjects, int) and (num_subjects > 1) self.num_subjects = num_subjects self.r_subject = r_subject super().__init__(n, model_order_K, p_in, p_out, time_horizon, r_time) def simulate_mus_dynamic_dcbm(self, setting=3): n = self.n th = self.time_horizon num_sbj = self.num_subjects adj_mus_dynamic = np.empty((num_sbj, th, n, n), dtype=int) z_mus_dynamic = np.empty((num_sbj, th, n), dtype=int) # Totally independent subjects, evolve independently. if setting == 0: for sbj in range(num_sbj): ( adj_mus_dynamic[sbj, :, :, :], z_mus_dynamic[sbj, :, :], ) = self.simulate_dynamic_dcbm() # Subjects are siblings at time 0, then they evolve independently. elif setting == 1: z_init = self._get_random_z() adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm( z_init ) for sbj in range(1, num_sbj): z_mus_dynamic[sbj, 0, :] = self._evolve_z(z_init, self.r_subject) adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, 0, :] ) for sbj in range(0, num_sbj): for t in range(1, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj, t - 1, :], self.r_time ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are siblings at each time point. elif setting == 2: ( adj_mus_dynamic[0, :, :, :], z_mus_dynamic[0, :, :], ) = self.simulate_dynamic_dcbm() for sbj in range(1, num_sbj): for t in range(0, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[0, t, :], self.r_subject ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are parents of each other at time 0, then they evolve independently. elif setting == 3: z_init = self._get_random_z() adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm( z_init ) for sbj in range(1, num_sbj): z_mus_dynamic[sbj, 0, :] = self._evolve_z( z_mus_dynamic[sbj - 1, 0, :], self.r_subject ) adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, 0, :] ) for sbj in range(0, num_sbj): for t in range(1, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj, t - 1, :], self.r_time ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are parents of each other at each time point. elif setting == 4: ( adj_mus_dynamic[0, :, :, :], z_mus_dynamic[0, :, :], ) = self.simulate_dynamic_dcbm() for sbj in range(1, num_sbj): for t in range(0, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj - 1, t, :], self.r_subject ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) else: raise ValueError(f"Given setting number {setting} is not defined.") return adj_mus_dynamic, z_mus_dynamic
Ancestors
Methods
def simulate_mus_dynamic_dcbm(self, setting=3)
-
Expand source code
def simulate_mus_dynamic_dcbm(self, setting=3): n = self.n th = self.time_horizon num_sbj = self.num_subjects adj_mus_dynamic = np.empty((num_sbj, th, n, n), dtype=int) z_mus_dynamic = np.empty((num_sbj, th, n), dtype=int) # Totally independent subjects, evolve independently. if setting == 0: for sbj in range(num_sbj): ( adj_mus_dynamic[sbj, :, :, :], z_mus_dynamic[sbj, :, :], ) = self.simulate_dynamic_dcbm() # Subjects are siblings at time 0, then they evolve independently. elif setting == 1: z_init = self._get_random_z() adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm( z_init ) for sbj in range(1, num_sbj): z_mus_dynamic[sbj, 0, :] = self._evolve_z(z_init, self.r_subject) adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, 0, :] ) for sbj in range(0, num_sbj): for t in range(1, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj, t - 1, :], self.r_time ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are siblings at each time point. elif setting == 2: ( adj_mus_dynamic[0, :, :, :], z_mus_dynamic[0, :, :], ) = self.simulate_dynamic_dcbm() for sbj in range(1, num_sbj): for t in range(0, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[0, t, :], self.r_subject ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are parents of each other at time 0, then they evolve independently. elif setting == 3: z_init = self._get_random_z() adj_mus_dynamic[0, 0, :, :], z_mus_dynamic[0, 0, :] = self.simulate_dcbm( z_init ) for sbj in range(1, num_sbj): z_mus_dynamic[sbj, 0, :] = self._evolve_z( z_mus_dynamic[sbj - 1, 0, :], self.r_subject ) adj_mus_dynamic[sbj, 0, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, 0, :] ) for sbj in range(0, num_sbj): for t in range(1, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj, t - 1, :], self.r_time ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) # Subjects are parents of each other at each time point. elif setting == 4: ( adj_mus_dynamic[0, :, :, :], z_mus_dynamic[0, :, :], ) = self.simulate_dynamic_dcbm() for sbj in range(1, num_sbj): for t in range(0, th): z_mus_dynamic[sbj, t, :] = self._evolve_z( z_mus_dynamic[sbj - 1, t, :], self.r_subject ) adj_mus_dynamic[sbj, t, :, :], _ = self.simulate_dcbm( z_mus_dynamic[sbj, t, :] ) else: raise ValueError(f"Given setting number {setting} is not defined.") return adj_mus_dynamic, z_mus_dynamic