#| hide
from nbdev.showdoc import *
8 Simulation Scripts
Here we introduce scripts which support the simulation of CRLD.
#| default_exp SimulationScripts
#| export
import numpy as np
import matplotlib.pyplot as plt
from skopt.sampler import Lhs
from skopt.space import Space
import hashlib
Create initial strategies
#| exports
def initial_strategies(MAEi, # Multi-agent environment interface
int, # Number of strategies to create
number:int=1000 # Latin hyper cube sampling parameter
iterations:->np.ndarray: # Array of initial strategies
)"""
Create a set of inital strategies using latin hyper cube sampling
"""
assert MAEi.M == 2, 'Sampling for M>2 not straightforward'
# https://www.egr.msu.edu/~kdeb/papers/c2018010.pdf
# https://www.cs.cmu.edu/~nasmith/papers/smith+tromble.tr04.pdf
= 10**(-6)
eps = Space(MAEi.N * MAEi.Q * (MAEi.M-1)*[(0.0+eps, 1.0-eps)])
space
# generate latin hyper cubes
= Lhs(criterion="maximin", iterations=iterations)
lhs = lhs.generate(space.dimensions, number, random_state=42)
x = np.array(x).reshape(number, MAEi.N, MAEi.Q, MAEi.M-1)
x
# complete and normalize
= np.zeros((number, MAEi.N, MAEi.Q, MAEi.M))
inits 0] = x[...,0]
inits[..., 1] = 1 - x[...,0]
inits[...,
return inits
For example,
class mae: N=3; Q=4; M=2 # dummy MAEi for demonstration only
= initial_strategies(mae, 7)
Xs Xs.shape
(7, 3, 4, 2)
Compute trajectories
#| exports
def compute_trajectories(MAEi, # Multi-agent environment interface
# Iterable of inital conditions
inits, =1000, # Number of maximum iteration steps for each run
Tmax=10e-5 # Tolerance to classify a trajectory as converged
tol->tuple: # (iterables of trajectories, fixed-point-reacheds)
)"""
Compute learning trajectories from a set of inital strategies.
"""
= []; fprs = []
trjs = len(inits)
leni
for xi, x0 in enumerate(inits):
print("\r ", np.round(xi/leni, 4), end='')
= x0.copy()
x
= MAEi.trajectory(x, Tmax=Tmax, tolerance=tol)
trj, fpr
trjs.append(trj)
fprs.append(fpr)
print()
print('Computed', leni, 'trajectories')
return np.array(trjs, dtype=object), np.array(fprs)
After computing the trajectories, we can check whether or not all converged and look at the histograms of their lengths:
#| exports
def check_run(trjs, # Iterable of learning trajectories
=None): # Iterable of bools whether a fixed point was reached
fprs"""
Perform some checks for an iterable of learning trajectories
"""
if fprs is not None:
print('Unique fixed points reached:', np.unique(fprs))
len(traj) for traj in trjs], bins=20);
plt.hist(['Histrogram of trajectories lengths') plt.title(
Saving & reloading
To not recompute everything from scratch, we save runs to disk and retrieve them more efficiently and faster when needed.
#| exports
def _transform_tensor_into_hash(tens):
"""Transform `tens` into a string for filename saving"""
= int(hashlib.sha512(str(tens).encode('utf-8')).hexdigest()[:16], 16)
r return r
#| exports
def obtain_trajectories(MAEi, # Multi-agent environment interface
# Iterable of inital conditions
inits, =1000, # Number of maximum iteration steps for each run
Tmax=10e-5, # Tolerance to classify a trajectory as converged
tol='data', # Path to data directory to store the results
ddir=1 # Verbosity level
verbose->tuple: # (iterables of trajectories, fixed-point-reacheds)
)"""
Obtain learning trajectories from a set of inital strategies.
Check wether you can load them from disk. If yes, do so. If not, compute.
"""
= ddir + '/' + MAEi.id() + '_' + str(_transform_tensor_into_hash(inits))
fn += ".npz"
fn
try:
= np.load(fn, allow_pickle=True)
dat = dict(zip((k for k in dat), (dat[k] for k in dat)))
ddic print("Loading ", fn) if verbose else None
except:
print("Computing ", fn) if verbose else None
= compute_trajectories(MAEi, inits, Tmax=Tmax, tol=tol)
trjs, fprs
check_run(trjs, fprs)# rtrajs = obtain_rewards(AEi, πtrajs)
= dict(trjs=trjs, fprs=fprs)
ddic **ddic)
np.savez_compressed(fn, = np.load(fn, allow_pickle=True)
dat = dict(zip((k for k in dat), (dat[k] for k in dat)))
ddic
return ddic['trjs'], ddic['fprs']
Final rewards
#| exports
def final_rewards(MAEi, # Multi-agent environment interface
# Iterable of learning trajectories
trjs ->np.ndarray: # Array of final rewards
)"""
Compute final rewards from a set of learning trajectories.
"""
= []
rews for trj in trjs:
= trj[-1].astype(float)
x = np.einsum(MAEi.Ps(x), [0], MAEi.Ris(x), [1,0], [1])
rs # rs = MAEi.Ri(x)
rews.append(rs)
return np.array(rews)
#| hide
import nbdev
"08_SimulationScripts.ipynb", "_code") nbdev.export.nb_export(