#| hide
from nbdev.showdoc import *8 Simulation Scripts
Here we introduce scripts which support the simulation of CRLD.
#| default_exp SimulationScripts #| export
import numpy as np
import matplotlib.pyplot as plt
from skopt.sampler import Lhs
from skopt.space import Space
import hashlibCreate initial strategies
#| exports
def initial_strategies(MAEi, # Multi-agent environment interface
number:int, # Number of strategies to create
iterations:int=1000 # Latin hyper cube sampling parameter
)->np.ndarray: # Array of initial strategies
"""
Create a set of inital strategies using latin hyper cube sampling
"""
assert MAEi.M == 2, 'Sampling for M>2 not straightforward'
# https://www.egr.msu.edu/~kdeb/papers/c2018010.pdf
# https://www.cs.cmu.edu/~nasmith/papers/smith+tromble.tr04.pdf
eps = 10**(-6)
space = Space(MAEi.N * MAEi.Q * (MAEi.M-1)*[(0.0+eps, 1.0-eps)])
# generate latin hyper cubes
lhs = Lhs(criterion="maximin", iterations=iterations)
x = lhs.generate(space.dimensions, number, random_state=42)
x = np.array(x).reshape(number, MAEi.N, MAEi.Q, MAEi.M-1)
# complete and normalize
inits = np.zeros((number, MAEi.N, MAEi.Q, MAEi.M))
inits[..., 0] = x[...,0]
inits[..., 1] = 1 - x[...,0]
return initsFor example,
class mae: N=3; Q=4; M=2 # dummy MAEi for demonstration only
Xs = initial_strategies(mae, 7)
Xs.shape(7, 3, 4, 2)
Compute trajectories
#| exports
def compute_trajectories(MAEi, # Multi-agent environment interface
inits, # Iterable of inital conditions
Tmax=1000, # Number of maximum iteration steps for each run
tol=10e-5 # Tolerance to classify a trajectory as converged
)->tuple: # (iterables of trajectories, fixed-point-reacheds)
"""
Compute learning trajectories from a set of inital strategies.
"""
trjs = []; fprs = []
leni = len(inits)
for xi, x0 in enumerate(inits):
print("\r ", np.round(xi/leni, 4), end='')
x = x0.copy()
trj, fpr = MAEi.trajectory(x, Tmax=Tmax, tolerance=tol)
trjs.append(trj)
fprs.append(fpr)
print()
print('Computed', leni, 'trajectories')
return np.array(trjs, dtype=object), np.array(fprs)After computing the trajectories, we can check whether or not all converged and look at the histograms of their lengths:
#| exports
def check_run(trjs, # Iterable of learning trajectories
fprs=None): # Iterable of bools whether a fixed point was reached
"""
Perform some checks for an iterable of learning trajectories
"""
if fprs is not None:
print('Unique fixed points reached:', np.unique(fprs))
plt.hist([len(traj) for traj in trjs], bins=20);
plt.title('Histrogram of trajectories lengths')Saving & reloading
To not recompute everything from scratch, we save runs to disk and retrieve them more efficiently and faster when needed.
#| exports
def _transform_tensor_into_hash(tens):
"""Transform `tens` into a string for filename saving"""
r = int(hashlib.sha512(str(tens).encode('utf-8')).hexdigest()[:16], 16)
return r #| exports
def obtain_trajectories(MAEi, # Multi-agent environment interface
inits, # Iterable of inital conditions
Tmax=1000, # Number of maximum iteration steps for each run
tol=10e-5, # Tolerance to classify a trajectory as converged
ddir='data', # Path to data directory to store the results
verbose=1 # Verbosity level
)->tuple: # (iterables of trajectories, fixed-point-reacheds)
"""
Obtain learning trajectories from a set of inital strategies.
Check wether you can load them from disk. If yes, do so. If not, compute.
"""
fn = ddir + '/' + MAEi.id() + '_' + str(_transform_tensor_into_hash(inits))
fn += ".npz"
try:
dat = np.load(fn, allow_pickle=True)
ddic = dict(zip((k for k in dat), (dat[k] for k in dat)))
print("Loading ", fn) if verbose else None
except:
print("Computing ", fn) if verbose else None
trjs, fprs = compute_trajectories(MAEi, inits, Tmax=Tmax, tol=tol)
check_run(trjs, fprs)
# rtrajs = obtain_rewards(AEi, πtrajs)
ddic = dict(trjs=trjs, fprs=fprs)
np.savez_compressed(fn, **ddic)
dat = np.load(fn, allow_pickle=True)
ddic = dict(zip((k for k in dat), (dat[k] for k in dat)))
return ddic['trjs'], ddic['fprs']Final rewards
#| exports
def final_rewards(MAEi, # Multi-agent environment interface
trjs # Iterable of learning trajectories
)->np.ndarray: # Array of final rewards
"""
Compute final rewards from a set of learning trajectories.
"""
rews = []
for trj in trjs:
x = trj[-1].astype(float)
rs = np.einsum(MAEi.Ps(x), [0], MAEi.Ris(x), [1,0], [1])
# rs = MAEi.Ri(x)
rews.append(rs)
return np.array(rews) #| hide
import nbdev
nbdev.export.nb_export("08_SimulationScripts.ipynb", "_code")