8  Simulation Scripts

Here we introduce scripts which support the simulation of CRLD.

 #| hide
from nbdev.showdoc import *
 #| default_exp SimulationScripts
 #| export
import numpy as np
import matplotlib.pyplot as plt
from skopt.sampler import Lhs
from skopt.space import Space
import hashlib

Create initial strategies

 #| exports
def initial_strategies(MAEi,  # Multi-agent environment interface
                       number:int,  # Number of strategies to create
                       iterations:int=1000  # Latin hyper cube sampling parameter
                       )->np.ndarray:  # Array of initial strategies
    """
    Create a set of inital strategies using latin hyper cube sampling
    """
    assert MAEi.M == 2, 'Sampling for M>2 not straightforward'
    # https://www.egr.msu.edu/~kdeb/papers/c2018010.pdf
    # https://www.cs.cmu.edu/~nasmith/papers/smith+tromble.tr04.pdf
    
    eps = 10**(-6)
    space = Space(MAEi.N * MAEi.Q * (MAEi.M-1)*[(0.0+eps, 1.0-eps)])
    
    # generate latin hyper cubes
    lhs = Lhs(criterion="maximin", iterations=iterations)
    x = lhs.generate(space.dimensions, number, random_state=42)
    x = np.array(x).reshape(number, MAEi.N, MAEi.Q, MAEi.M-1)
    
    # complete and normalize
    inits = np.zeros((number, MAEi.N, MAEi.Q, MAEi.M))
    inits[..., 0] = x[...,0]
    inits[..., 1] = 1 - x[...,0]
    
    return inits

For example,

class mae: N=3; Q=4; M=2 # dummy MAEi for demonstration only
Xs = initial_strategies(mae, 7)
Xs.shape
(7, 3, 4, 2)

Compute trajectories

 #| exports
def compute_trajectories(MAEi,  # Multi-agent environment interface
                         inits,  # Iterable of inital conditions
                         Tmax=1000,  # Number of maximum iteration steps for each run
                         tol=10e-5  # Tolerance to classify a trajectory as converged
                         )->tuple:  # (iterables of trajectories, fixed-point-reacheds)
    """
    Compute learning trajectories from a set of inital strategies.
    """
    trjs = []; fprs = []
    leni = len(inits)
    
    for xi, x0 in enumerate(inits):
        print("\r ", np.round(xi/leni, 4), end='')
        x = x0.copy()
        
        trj, fpr = MAEi.trajectory(x, Tmax=Tmax, tolerance=tol)
        
        trjs.append(trj)
        fprs.append(fpr)
        
    print()
    print('Computed', leni, 'trajectories')
    
    return np.array(trjs, dtype=object), np.array(fprs)

After computing the trajectories, we can check whether or not all converged and look at the histograms of their lengths:

 #| exports
def check_run(trjs,  # Iterable of learning trajectories
              fprs=None):  # Iterable of bools whether a fixed point was reached
    """
    Perform some checks for an iterable of learning trajectories
    """
    if fprs is not None:
        print('Unique fixed points reached:', np.unique(fprs))
    plt.hist([len(traj) for traj in trjs], bins=20);
    plt.title('Histrogram of trajectories lengths')

Saving & reloading

To not recompute everything from scratch, we save runs to disk and retrieve them more efficiently and faster when needed.

 #| exports
def _transform_tensor_into_hash(tens):
    """Transform `tens` into a string for filename saving"""
    r = int(hashlib.sha512(str(tens).encode('utf-8')).hexdigest()[:16], 16)
    return r
 #| exports
def obtain_trajectories(MAEi,  # Multi-agent environment interface
                        inits,  # Iterable of inital conditions
                        Tmax=1000,  # Number of maximum iteration steps for each run
                        tol=10e-5,  # Tolerance to classify a trajectory as converged
                        ddir='data',  # Path to data directory to store the results
                        verbose=1  # Verbosity level
                        )->tuple:  # (iterables of trajectories, fixed-point-reacheds)
    """
    Obtain learning trajectories from a set of inital strategies.
    Check wether you can load them from disk. If yes, do so. If not, compute.
    """
    fn = ddir + '/' + MAEi.id() + '_' + str(_transform_tensor_into_hash(inits))
    fn += ".npz"
    
    try:
        dat = np.load(fn, allow_pickle=True)
        ddic = dict(zip((k for k in dat), (dat[k] for k in dat)))
        print("Loading ", fn) if verbose else None
    
    except:
        print("Computing ", fn) if verbose else None
        trjs, fprs = compute_trajectories(MAEi, inits, Tmax=Tmax, tol=tol)
        check_run(trjs, fprs)
        # rtrajs = obtain_rewards(AEi, πtrajs)
        
        ddic = dict(trjs=trjs, fprs=fprs)
        np.savez_compressed(fn, **ddic)
        dat = np.load(fn, allow_pickle=True)
        ddic = dict(zip((k for k in dat), (dat[k] for k in dat)))
    
    return ddic['trjs'], ddic['fprs']

Final rewards

 #| exports
def final_rewards(MAEi, # Multi-agent environment interface
                  trjs  # Iterable of learning trajectories
                  )->np.ndarray:  # Array of final rewards
    """
    Compute final rewards from a set of learning trajectories.
    """
    rews = []
    for trj in trjs:
        x = trj[-1].astype(float)
        rs = np.einsum(MAEi.Ps(x), [0], MAEi.Ris(x), [1,0], [1])        
        # rs = MAEi.Ri(x)
        rews.append(rs)
        
    return np.array(rews)
 #| hide
import nbdev
nbdev.export.nb_export("08_SimulationScripts.ipynb", "_code")