Skip to content

API reference

Auto-generated from the source docstrings.

esnfed.esn

Echo State Network (ESN).

An ESN is a recurrent neural network from the Reservoir Computing paradigm (Jaeger, 2001). A large, fixed, randomly connected recurrent layer -- the reservoir -- projects the input into a high-dimensional dynamical feature space; only a linear readout is trained, here by ridge regression. Training is therefore convex and fast, with no backpropagation through time.

State update (leaky-integrator neurons, Jaeger et al. 2007):

x(t) = (1 - a) x(t-1) + a * tanh( W_in [b; u(t)] + W x(t-1) )

Readout:

y(t) = W_out [b; u(t); x(t)]

where a is the leaking rate, b a bias constant, W the (spectral-radius scaled) reservoir matrix and W_in the input matrix. W_out is the only trained quantity.

Performance

The state-harvesting loop is the hot path. Three optional accelerations are available and all keep the public API and results unchanged:

  • Numba -- if installed (pip install "esnfed[fast]"), the dense float64 harvest loop is JIT-compiled to native speed automatically; otherwise a pure-NumPy fallback is used.
  • Sparse reservoirs (sparse=True) -- store W as a SciPy CSR matrix, turning the per-step matrix-vector product from O(N^2) into O(edges); a large win for big, sparse reservoirs.
  • float32 (dtype=np.float32) -- halves memory traffic for a modest speed gain on memory-bound workloads.

EchoStateNetwork dataclass

EchoStateNetwork(n_inputs: int, n_outputs: int, reservoir: ndarray, spectral_radius: float = 0.9, input_scaling: float = 1.0, leaking_rate: object = 1.0, activation: object = 'tanh', ridge: float = 1e-06, washout: int = 100, bias: float = 1.0, seed: int | None = None, input_weights: ndarray | None = None, dtype: object = np.float64, sparse: bool = False, use_numba: bool | None = None)

A leaky-integrator Echo State Network with a ridge-regression readout.

Parameters:

Name Type Description Default
n_inputs int

Input and output dimensionality.

required
n_outputs int

Input and output dimensionality.

required
reservoir ndarray

Square reservoir weight matrix W (n_reservoir x n_reservoir). Usually produced by :mod:esnfed.topologies. It is rescaled internally to spectral_radius.

required
spectral_radius float

Target largest absolute eigenvalue of W. Must be < 1 for the echo state property to hold for typical inputs (Jaeger, 2001).

0.9
input_scaling float

Scaling applied to the random input weights.

1.0
leaking_rate object

Leaky-integrator rate a in (0, 1]; 1.0 recovers a standard ESN. May be a scalar (homogeneous) or a per-node array of shape (n_reservoir,) for heterogeneous leaking rates / time constants --- different neurons then integrate at different speeds, giving a multi-scale reservoir (see :func:esnfed.topologies.leaking_rates).

1.0
activation object

Node nonlinearity. "tanh" (default), "sigmoid", "relu", "sin" or "identity"; a per-node array of those names for a multi-type (mixed) reservoir (see :func:esnfed.topologies.mixed_activations); or any callable applied element-wise.

'tanh'
ridge float

Tikhonov (ridge) regularisation strength for the readout.

1e-06
washout int

Number of initial timesteps discarded when harvesting states, to remove the dependence on the (zero) initial state.

100
bias float

Constant bias fed to the reservoir and readout.

1.0
seed int | None

Seed for the input-weight RNG (the reservoir itself is supplied).

None
input_weights ndarray | None

Optional caller-supplied input matrix of shape (N, 1 + n_inputs) (e.g. lifted from a ReservoirPy reservoir); if None, drawn randomly.

None
dtype object

Floating-point type for the reservoir and states (np.float64 by default; np.float32 trades precision for memory/speed).

float64
sparse bool

If true, store the reservoir as a SciPy CSR matrix (needs SciPy); the per-step matvec then costs O(edges) instead of O(N^2). Best for large, low-density reservoirs.

False
use_numba bool | None

None (default) uses Numba for the dense float64 harvest if it is installed; True/False force it on/off.

None

harvest

harvest(u: ndarray, x0: ndarray | None = None) -> np.ndarray

Run the reservoir over inputs u and return extended states.

Parameters:

Name Type Description Default
u ndarray

Input array of shape (T, n_inputs).

required
x0 ndarray | None

Optional initial reservoir state (defaults to zeros).

None

Returns:

Type Description
Z

Extended-state matrix of shape (T, 1 + n_inputs + n_reservoir), each row being [bias, u(t), x(t)].

Source code in esnfed/esn.py
def harvest(self, u: np.ndarray, x0: np.ndarray | None = None) -> np.ndarray:
    """Run the reservoir over inputs ``u`` and return extended states.

    Parameters
    ----------
    u
        Input array of shape (T, n_inputs).
    x0
        Optional initial reservoir state (defaults to zeros).

    Returns
    -------
    Z
        Extended-state matrix of shape (T, 1 + n_inputs + n_reservoir),
        each row being ``[bias, u(t), x(t)]``.
    """
    u = np.atleast_2d(u)
    if u.shape[1] != self.n_inputs:
        u = u.reshape(-1, self.n_inputs)
    u = np.ascontiguousarray(u, dtype=self.W_in.dtype)

    if self._numba_enabled and x0 is None:
        fn = _get_numba_harvest()
        if fn is not None:
            Z = fn(self.W, self.W_in, u, float(self._a),
                   float(self.bias), int(self.n_inputs))
            self._last_state = Z[-1, 1 + self.n_inputs:] if len(Z) else \
                np.zeros(self.n_reservoir)
            return Z

    Z = _harvest_numpy(self.W, self.W_in, u, self._a, self.bias,
                       self.n_inputs, self.n_reservoir, x0,
                       act=self._activation_fn)
    self._last_state = Z[-1, 1 + self.n_inputs:] if len(Z) else \
        np.zeros(self.n_reservoir)
    return Z

fit

fit(u: ndarray, y: ndarray) -> 'EchoStateNetwork'

Train the readout by ridge regression on a single sequence.

Source code in esnfed/esn.py
def fit(self, u: np.ndarray, y: np.ndarray) -> "EchoStateNetwork":
    """Train the readout by ridge regression on a single sequence."""
    y = np.atleast_2d(y)
    if y.shape[0] != (np.atleast_2d(u).reshape(-1, self.n_inputs)).shape[0]:
        y = y.reshape(-1, self.n_outputs)
    Z = self.harvest(u)
    Zw, Yw = Z[self.washout :], y[self.washout :]
    A, B = ridge_statistics(Zw, Yw)
    self.W_out = solve_readout(A, B, self.ridge)
    return self

predict

predict(u: ndarray, x0: ndarray | None = None) -> np.ndarray

Predict outputs for inputs u using the trained readout.

Source code in esnfed/esn.py
def predict(self, u: np.ndarray, x0: np.ndarray | None = None) -> np.ndarray:
    """Predict outputs for inputs ``u`` using the trained readout."""
    if self.W_out is None:
        raise RuntimeError("ESN readout is not trained; call fit() first")
    Z = self.harvest(u, x0=x0)
    return Z @ self.W_out

local_statistics

local_statistics(u: ndarray, y: ndarray)

Return the ridge sufficient statistics (A, B) for local data.

A = Z^T Z and B = Z^T Y over the post-washout extended states. Summing these across clients and solving once yields exactly the readout that pooled training would produce -- the basis of exact federated ridge regression.

Source code in esnfed/esn.py
def local_statistics(self, u: np.ndarray, y: np.ndarray):
    """Return the ridge sufficient statistics (A, B) for local data.

    ``A = Z^T Z`` and ``B = Z^T Y`` over the post-washout extended states.
    Summing these across clients and solving once yields *exactly* the
    readout that pooled training would produce -- the basis of exact
    federated ridge regression.
    """
    y = np.atleast_2d(y).reshape(-1, self.n_outputs)
    Z = self.harvest(u)
    return ridge_statistics(Z[self.washout :], y[self.washout :])

ridge_statistics

ridge_statistics(Z: ndarray, Y: ndarray)

Return sufficient statistics A = Z^T Z and B = Z^T Y.

The Gram matrix is accumulated in float64 even when the states are float32: the harvest keeps the float32 speed/memory benefit, while the (small, ill-conditioned) ridge solve stays numerically stable.

Source code in esnfed/esn.py
def ridge_statistics(Z: np.ndarray, Y: np.ndarray):
    """Return sufficient statistics ``A = Z^T Z`` and ``B = Z^T Y``.

    The Gram matrix is accumulated in float64 even when the states are float32:
    the harvest keeps the float32 speed/memory benefit, while the (small,
    ill-conditioned) ridge solve stays numerically stable.
    """
    Z = np.asarray(Z, dtype=np.float64)
    Y = np.asarray(Y, dtype=np.float64)
    return Z.T @ Z, Z.T @ Y

solve_readout

solve_readout(A: ndarray, B: ndarray, ridge: float) -> np.ndarray

Solve (A + ridge * I) W_out = B for the readout weights.

Source code in esnfed/esn.py
def solve_readout(A: np.ndarray, B: np.ndarray, ridge: float) -> np.ndarray:
    """Solve ``(A + ridge * I) W_out = B`` for the readout weights."""
    d = A.shape[0]
    return np.linalg.solve(A + ridge * np.eye(d), B)

esnfed.deep

Hierarchical (deep) Echo State Networks.

A deep ESN stacks several reservoir layers: the states of layer l are the input of layer l+1 (Gallicchio, Micheli & Pedrelli, 2017). Giving each layer its own properties --- size, spectral radius, leaking rate / time-scale, topology or node nonlinearity --- builds a hierarchy of progressively slower, more abstract dynamics, which markedly increases memory and nonlinear capacity over a single-layer reservoir of the same total size.

Only a single linear readout is trained, on the concatenation of all layers' states, so training stays a convex ridge regression and the exact federated scheme of :mod:esnfed.federated applies unchanged (a :class:DeepEchoStateNetwork is a drop-in for :class:~esnfed.esn.EchoStateNetwork in a federated :class:~esnfed.federated.Client).

DeepEchoStateNetwork dataclass

DeepEchoStateNetwork(n_inputs: int, n_outputs: int, reservoirs: list, spectral_radius: object = 0.9, leaking_rate: object = 0.7, activation: object = 'tanh', input_scaling: object = 1.0, ridge: float = 1e-06, washout: int = 100, bias: float = 1.0, seed: int | None = None)

A stack of reservoir layers with a single ridge readout over all states.

Parameters:

Name Type Description Default
n_inputs int

Input and output dimensionality.

required
n_outputs int

Input and output dimensionality.

required
reservoirs list

List of square reservoir matrices, one per layer (e.g. from :mod:esnfed.topologies); len(reservoirs) is the depth.

required
spectral_radius object

Per-layer hyper-parameters. Each may be a single value (shared by all layers) or a list with one entry per layer. Notably leaking_rate can decrease with depth to create progressively slower time-scales, and may itself be a per-node array (heterogeneous within a layer).

0.9
leaking_rate object

Per-layer hyper-parameters. Each may be a single value (shared by all layers) or a list with one entry per layer. Notably leaking_rate can decrease with depth to create progressively slower time-scales, and may itself be a per-node array (heterogeneous within a layer).

0.9
activation object

Per-layer hyper-parameters. Each may be a single value (shared by all layers) or a list with one entry per layer. Notably leaking_rate can decrease with depth to create progressively slower time-scales, and may itself be a per-node array (heterogeneous within a layer).

0.9
input_scaling object

Per-layer hyper-parameters. Each may be a single value (shared by all layers) or a list with one entry per layer. Notably leaking_rate can decrease with depth to create progressively slower time-scales, and may itself be a per-node array (heterogeneous within a layer).

0.9
ridge float

As in :class:~esnfed.esn.EchoStateNetwork.

1e-06
washout float

As in :class:~esnfed.esn.EchoStateNetwork.

1e-06
bias float

As in :class:~esnfed.esn.EchoStateNetwork.

1e-06
seed float

As in :class:~esnfed.esn.EchoStateNetwork.

1e-06

harvest

harvest(u: ndarray, x0=None) -> np.ndarray

Run the stack and return [bias, u(t), x^(1)(t), ..., x^(L)(t)].

Source code in esnfed/deep.py
def harvest(self, u: np.ndarray, x0=None) -> np.ndarray:
    """Run the stack and return ``[bias, u(t), x^(1)(t), ..., x^(L)(t)]``."""
    u = np.atleast_2d(u).reshape(-1, self.n_inputs)
    T = u.shape[0]
    signal = u
    states = []
    for layer in self.layers:
        Z = layer.harvest(signal)
        X = Z[:, 1 + layer.n_inputs:]   # reservoir states of this layer
        states.append(X)
        signal = X                      # feed states to the next layer
    Z = np.empty((T, self.readout_dim), dtype=states[0].dtype)
    Z[:, 0] = self.bias
    Z[:, 1:1 + self.n_inputs] = u
    col = 1 + self.n_inputs
    for X in states:
        Z[:, col:col + X.shape[1]] = X
        col += X.shape[1]
    return Z

esnfed.topologies

Reservoir topology generators.

The recurrent connectivity of an ESN reservoir can be modelled as a directed graph. The choice of graph model shapes the reservoir dynamics and is the central experimental variable of this project. Each generator returns a dense n x n weight matrix whose non-zero entries are drawn from a symmetric uniform distribution; the Echo State Network rescales the matrix to the desired spectral radius afterwards.

Topologies

random Erdos-Renyi G(n, p): each directed edge present with prob. p. small_world Watts-Strogatz ring lattice with random rewiring. scale_free Barabasi-Albert preferential attachment (degree power law). ring Simple deterministic uni-directional ring (delay line).

All functions accept a numpy.random.Generator (or an int seed) so that experiments are fully reproducible.

random_reservoir

random_reservoir(n: int, density: float = 0.1, rng=None) -> np.ndarray

Erdos-Renyi reservoir: each directed edge present with prob. density.

Source code in esnfed/topologies.py
def random_reservoir(n: int, density: float = 0.1, rng=None) -> np.ndarray:
    """Erdos-Renyi reservoir: each directed edge present with prob. ``density``."""
    rng = _as_rng(rng)
    mask = rng.uniform(size=(n, n)) < density
    np.fill_diagonal(mask, False)
    W = np.zeros((n, n))
    W[mask] = rng.uniform(-1.0, 1.0, size=int(mask.sum()))
    return W

small_world_reservoir

small_world_reservoir(n: int, k: int = 6, p: float = 0.1, rng=None) -> np.ndarray

Watts-Strogatz small-world reservoir.

A ring lattice where each node connects to its k nearest neighbours, with each edge rewired with probability p. Small-world reservoirs combine high clustering with short path lengths.

Source code in esnfed/topologies.py
def small_world_reservoir(
    n: int, k: int = 6, p: float = 0.1, rng=None
) -> np.ndarray:
    """Watts-Strogatz small-world reservoir.

    A ring lattice where each node connects to its ``k`` nearest neighbours, with
    each edge rewired with probability ``p``. Small-world reservoirs combine high
    clustering with short path lengths.
    """
    rng = _as_rng(rng)
    seed = int(rng.integers(0, 2**31 - 1))
    g = nx.watts_strogatz_graph(n, k, p, seed=seed)
    adj = nx.to_numpy_array(g)
    return _weight_edges(adj, rng)

scale_free_reservoir

scale_free_reservoir(n: int, m: int = 3, rng=None) -> np.ndarray

Barabasi-Albert scale-free reservoir.

Growth with preferential attachment; each new node attaches to m existing nodes. Produces a power-law degree distribution with a few high-degree hubs.

Source code in esnfed/topologies.py
def scale_free_reservoir(n: int, m: int = 3, rng=None) -> np.ndarray:
    """Barabasi-Albert scale-free reservoir.

    Growth with preferential attachment; each new node attaches to ``m`` existing
    nodes. Produces a power-law degree distribution with a few high-degree hubs.
    """
    rng = _as_rng(rng)
    seed = int(rng.integers(0, 2**31 - 1))
    g = nx.barabasi_albert_graph(n, m, seed=seed)
    adj = nx.to_numpy_array(g)
    return _weight_edges(adj, rng)

ring_reservoir

ring_reservoir(n: int, weight: float = 1.0, rng=None) -> np.ndarray

Deterministic uni-directional ring (a.k.a. simple cycle reservoir).

Each node feeds the next; node n-1 feeds node 0. Despite its simplicity this minimal-complexity reservoir is competitive on many tasks (Rodan & Tino, 2011).

Source code in esnfed/topologies.py
def ring_reservoir(n: int, weight: float = 1.0, rng=None) -> np.ndarray:
    """Deterministic uni-directional ring (a.k.a. simple cycle reservoir).

    Each node feeds the next; node ``n-1`` feeds node ``0``. Despite its
    simplicity this minimal-complexity reservoir is competitive on many tasks
    (Rodan & Tino, 2011).
    """
    rng = _as_rng(rng)
    W = np.zeros((n, n))
    signs = rng.choice([-1.0, 1.0], size=n)
    for i in range(n):
        W[(i + 1) % n, i] = weight * signs[i]
    return W

make_reservoir

make_reservoir(kind: str, n: int, rng=None, **kwargs) -> np.ndarray

Dispatch to a named generator from :data:GENERATORS.

Source code in esnfed/topologies.py
def make_reservoir(kind: str, n: int, rng=None, **kwargs) -> np.ndarray:
    """Dispatch to a named generator from :data:`GENERATORS`."""
    if kind not in GENERATORS:
        raise KeyError(f"unknown topology {kind!r}; choices: {list(GENERATORS)}")
    return GENERATORS[kind](n, rng=rng, **kwargs)

graph_metrics

graph_metrics(W: ndarray) -> dict

Return basic graph descriptors of a reservoir weight matrix.

Useful to characterise structural heterogeneity across federated nodes.

Source code in esnfed/topologies.py
def graph_metrics(W: np.ndarray) -> dict:
    """Return basic graph descriptors of a reservoir weight matrix.

    Useful to characterise structural heterogeneity across federated nodes.
    """
    adj = (W != 0).astype(int)
    g = nx.from_numpy_array(adj, create_using=nx.DiGraph)
    ug = g.to_undirected()
    n = adj.shape[0]
    n_edges = int(adj.sum())
    degrees = np.array([d for _, d in ug.degree()])
    try:
        avg_path = nx.average_shortest_path_length(ug) if nx.is_connected(ug) else float("nan")
    except (nx.NetworkXError, nx.NetworkXPointlessConcept):
        avg_path = float("nan")
    return {
        "n_nodes": n,
        "n_edges": n_edges,
        "density": n_edges / (n * (n - 1)) if n > 1 else 0.0,
        "mean_degree": float(degrees.mean()) if degrees.size else 0.0,
        "clustering": float(nx.average_clustering(ug)),
        "avg_path_length": float(avg_path),
    }

leaking_rates

leaking_rates(n, kind='uniform', low=0.1, high=1.0, n_layers=3, rng=None)

Per-node leaking rates a for a heterogeneous / multi-scale reservoir.

Different neurons integrate at different speeds, which helps tasks with mixed time-scales (finance, chaotic systems such as Mackey-Glass).

Parameters:

Name Type Description Default
n

Number of reservoir nodes.

required
kind

"uniform" (i.i.d. in [low, high]), "log_uniform" (log-spaced, emphasising fast nodes), "layered" (n_layers contiguous blocks with decreasing rates, high → low, i.e. fast → slow groups), or "constant" (all high).

'uniform'
low

Range of leaking rates (each in (0, 1]).

0.1
high

Range of leaking rates (each in (0, 1]).

0.1
n_layers

Number of decreasing blocks for kind="layered".

3
rng

Seed or numpy generator.

None

Returns:

Name Type Description
a ndarray of shape (n,)

Per-node leaking rates, ready to pass as EchoStateNetwork(leaking_rate=a).

Source code in esnfed/topologies.py
def leaking_rates(n, kind="uniform", low=0.1, high=1.0, n_layers=3, rng=None):
    """Per-node leaking rates ``a`` for a *heterogeneous / multi-scale* reservoir.

    Different neurons integrate at different speeds, which helps tasks with mixed
    time-scales (finance, chaotic systems such as Mackey-Glass).

    Parameters
    ----------
    n
        Number of reservoir nodes.
    kind
        ``"uniform"`` (i.i.d. in ``[low, high]``), ``"log_uniform"`` (log-spaced,
        emphasising fast nodes), ``"layered"`` (``n_layers`` contiguous blocks
        with decreasing rates, high → low, i.e. fast → slow groups), or
        ``"constant"`` (all ``high``).
    low, high
        Range of leaking rates (each in ``(0, 1]``).
    n_layers
        Number of decreasing blocks for ``kind="layered"``.
    rng
        Seed or ``numpy`` generator.

    Returns
    -------
    a : ndarray of shape (n,)
        Per-node leaking rates, ready to pass as ``EchoStateNetwork(leaking_rate=a)``.
    """
    rng = np.random.default_rng(rng) if not isinstance(rng, np.random.Generator) else rng
    if kind == "uniform":
        return rng.uniform(low, high, size=n)
    if kind == "log_uniform":
        return np.exp(rng.uniform(np.log(low), np.log(high), size=n))
    if kind == "constant":
        return np.full(n, float(high))
    if kind == "layered":
        levels = np.linspace(high, low, max(1, n_layers))
        return np.concatenate([
            np.full(len(idx), levels[k])
            for k, idx in enumerate(np.array_split(np.arange(n), max(1, n_layers)))
        ])
    raise ValueError(f"unknown kind {kind!r}")

mixed_activations

mixed_activations(n, types=('tanh', 'sigmoid', 'sin'), weights=None, rng=None)

Assign a node nonlinearity to each reservoir node (multi-type reservoir).

Mixing activation functions within one reservoir mimics biological neuronal diversity and broadens the basis of dynamics. Returns an array of activation names ready to pass as EchoStateNetwork(activation=...).

Parameters:

Name Type Description Default
n

Number of reservoir nodes.

required
types

Activation names to mix (see :data:esnfed.esn.ACTIVATIONS).

('tanh', 'sigmoid', 'sin')
weights

Optional mixing proportions (defaults to uniform).

None
rng

Seed or numpy generator.

None
Source code in esnfed/topologies.py
def mixed_activations(n, types=("tanh", "sigmoid", "sin"), weights=None, rng=None):
    """Assign a node nonlinearity to each reservoir node (*multi-type* reservoir).

    Mixing activation functions within one reservoir mimics biological neuronal
    diversity and broadens the basis of dynamics. Returns an array of activation
    names ready to pass as ``EchoStateNetwork(activation=...)``.

    Parameters
    ----------
    n
        Number of reservoir nodes.
    types
        Activation names to mix (see :data:`esnfed.esn.ACTIVATIONS`).
    weights
        Optional mixing proportions (defaults to uniform).
    rng
        Seed or ``numpy`` generator.
    """
    rng = np.random.default_rng(rng) if not isinstance(rng, np.random.Generator) else rng
    types = list(types)
    p = None if weights is None else np.asarray(weights, float) / np.sum(weights)
    return rng.choice(types, size=n, p=p)

esnfed.datasets

Benchmark sequential tasks for Echo State Networks.

These are the standard tasks used to evaluate reservoir computing systems.

NARMA-10 A non-linear auto-regressive moving-average system with a 10-step memory (Atiya & Parlos, 2000). The task is to reproduce y from the random input u; it stresses both memory and non-linearity.

Mackey-Glass A delay differential equation (Mackey & Glass, 1977) that is mildly chaotic for tau = 17. The task is one-step-ahead prediction.

Lorenz The classic 3-variable chaotic attractor (Lorenz, 1963); we predict the next x-coordinate.

Real-world finance from_array / load_csv turn any 1-D series into a forecasting task; load_ted_spread loads a bundled real counterparty-risk series (the TED spread); load_fred downloads any series from FRED on demand. These power the federated counterparty-risk use case: institutions that cannot share raw data jointly forecast a credit/counterparty-risk signal.

All generators return (u, y) as float arrays of shape (T, 1).

SequenceDataset

Bases: NamedTuple

A labelled sequence-classification dataset with a natural client split.

X_* are lists (or 3-D arrays) of per-sequence arrays (T_i, n_features); groups_* give the natural federation unit (speaker / subject id).

narma10

narma10(n: int, rng=None, warmup: int = 50) -> tuple[np.ndarray, np.ndarray]

Generate a NARMA-10 input/target sequence of length n.

Source code in esnfed/datasets.py
def narma10(n: int, rng=None, warmup: int = 50) -> tuple[np.ndarray, np.ndarray]:
    """Generate a NARMA-10 input/target sequence of length ``n``."""
    rng = np.random.default_rng(rng) if not isinstance(rng, np.random.Generator) else rng
    total = n + warmup
    u = rng.uniform(0.0, 0.5, size=total)
    y = np.zeros(total)
    for t in range(10, total):
        y[t] = (
            0.3 * y[t - 1]
            + 0.05 * y[t - 1] * np.sum(y[t - 10 : t])
            + 1.5 * u[t - 10] * u[t - 1]
            + 0.1
        )
    u, y = u[warmup:], y[warmup:]
    return u.reshape(-1, 1), y.reshape(-1, 1)

mackey_glass

mackey_glass(n: int, tau: int = 17, beta: float = 0.2, gamma: float = 0.1, power: int = 10, dt: float = 1.0, seed: int | None = None, discard: int = 250) -> tuple[np.ndarray, np.ndarray]

Generate a Mackey-Glass series; return (x_t, x_{t+1}) for prediction.

Source code in esnfed/datasets.py
def mackey_glass(
    n: int,
    tau: int = 17,
    beta: float = 0.2,
    gamma: float = 0.1,
    power: int = 10,
    dt: float = 1.0,
    seed: int | None = None,
    discard: int = 250,
) -> tuple[np.ndarray, np.ndarray]:
    """Generate a Mackey-Glass series; return ``(x_t, x_{t+1})`` for prediction."""
    rng = np.random.default_rng(seed)
    history_len = tau + 1
    total = n + discard + 1
    x = np.empty(total + history_len)
    x[:history_len] = 1.2 + 0.2 * (rng.random(history_len) - 0.5)
    for t in range(history_len, total + history_len):
        x_tau = x[t - tau]
        x[t] = x[t - 1] + dt * (
            beta * x_tau / (1.0 + x_tau**power) - gamma * x[t - 1]
        )
    series = x[history_len + discard :]
    u = series[:-1].reshape(-1, 1)
    y = series[1:].reshape(-1, 1)
    return u, y

lorenz

lorenz(n: int, dt: float = 0.02, sigma: float = 10.0, rho: float = 28.0, beta: float = 8.0 / 3.0, seed: int | None = None, discard: int = 500) -> tuple[np.ndarray, np.ndarray]

Generate the Lorenz attractor; return (x_t, x_{t+1}) for prediction.

Source code in esnfed/datasets.py
def lorenz(
    n: int,
    dt: float = 0.02,
    sigma: float = 10.0,
    rho: float = 28.0,
    beta: float = 8.0 / 3.0,
    seed: int | None = None,
    discard: int = 500,
) -> tuple[np.ndarray, np.ndarray]:
    """Generate the Lorenz attractor; return ``(x_t, x_{t+1})`` for prediction."""
    rng = np.random.default_rng(seed)
    state = np.array([1.0, 1.0, 1.0]) + 0.01 * rng.standard_normal(3)
    total = n + discard + 1
    xs = np.empty(total)
    s = state
    for t in range(total):
        xs[t] = s[0]
        dx = sigma * (s[1] - s[0])
        dy = s[0] * (rho - s[2]) - s[1]
        dz = s[0] * s[1] - beta * s[2]
        s = s + dt * np.array([dx, dy, dz])
    series = xs[discard:]
    # Normalise to a sensible range for tanh reservoirs.
    series = (series - series.mean()) / series.std()
    return series[:-1].reshape(-1, 1), series[1:].reshape(-1, 1)

split

split(u: ndarray, y: ndarray, train_frac: float = 0.7) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]

Chronological train/test split (no shuffling, to preserve dynamics).

Source code in esnfed/datasets.py
def split(
    u: np.ndarray, y: np.ndarray, train_frac: float = 0.7
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Chronological train/test split (no shuffling, to preserve dynamics)."""
    cut = int(len(u) * train_frac)
    return u[:cut], y[:cut], u[cut:], y[cut:]

partition_iid

partition_iid(u: ndarray, y: ndarray, n_clients: int, rng=None) -> list[tuple[np.ndarray, np.ndarray]]

Split a sequence into n_clients contiguous blocks (federated clients).

Contiguous blocks (rather than shuffled samples) keep each client's slice a valid time series, which is required for reservoir state harvesting.

Source code in esnfed/datasets.py
def partition_iid(
    u: np.ndarray, y: np.ndarray, n_clients: int, rng=None
) -> list[tuple[np.ndarray, np.ndarray]]:
    """Split a sequence into ``n_clients`` contiguous blocks (federated clients).

    Contiguous blocks (rather than shuffled samples) keep each client's slice a
    valid time series, which is required for reservoir state harvesting.
    """
    bounds = np.linspace(0, len(u), n_clients + 1, dtype=int)
    return [(u[a:b], y[a:b]) for a, b in zip(bounds[:-1], bounds[1:])]

from_array

from_array(series, *, predict: str = 'next', normalize: bool = True) -> tuple[np.ndarray, np.ndarray]

Turn a 1-D real series into a one-step-ahead forecasting task.

Parameters:

Name Type Description Default
series

1-D sequence of observations (list or array).

required
predict str

"next" to forecast the next value (level), or "change" to forecast the next first difference.

'next'
normalize bool

If true, z-score the series (recommended for tanh reservoirs).

True

Returns:

Type Description
(u, y)

Input/target arrays of shape (T-1, 1).

Source code in esnfed/datasets.py
def from_array(
    series, *, predict: str = "next", normalize: bool = True
) -> tuple[np.ndarray, np.ndarray]:
    """Turn a 1-D real series into a one-step-ahead forecasting task.

    Parameters
    ----------
    series
        1-D sequence of observations (list or array).
    predict
        ``"next"`` to forecast the next value (level), or ``"change"`` to
        forecast the next first difference.
    normalize
        If true, z-score the series (recommended for tanh reservoirs).

    Returns
    -------
    (u, y)
        Input/target arrays of shape (T-1, 1).
    """
    s = np.asarray(series, dtype=float).ravel()
    s = s[np.isfinite(s)]
    if normalize and s.std() > 0:
        s = (s - s.mean()) / s.std()
    if predict == "next":
        return s[:-1].reshape(-1, 1), s[1:].reshape(-1, 1)
    if predict == "change":
        d = np.diff(s)
        return d[:-1].reshape(-1, 1), d[1:].reshape(-1, 1)
    raise ValueError("predict must be 'next' or 'change'")

load_csv

load_csv(path, column=None, *, predict: str = 'next', normalize: bool = True) -> tuple[np.ndarray, np.ndarray]

Load a column from a CSV file and build a forecasting task.

column may be a header name, an integer index, or None (last column). Missing markers ('', '.', 'NA', 'NaN', 'null') are dropped.

Source code in esnfed/datasets.py
def load_csv(
    path, column=None, *, predict: str = "next", normalize: bool = True
) -> tuple[np.ndarray, np.ndarray]:
    """Load a column from a CSV file and build a forecasting task.

    ``column`` may be a header name, an integer index, or ``None`` (last column).
    Missing markers ('', '.', 'NA', 'NaN', 'null') are dropped.
    """
    text = Path(path).read_text(encoding="utf-8")
    series = _column_from_csv(text, column)
    return from_array(series, predict=predict, normalize=normalize)

load_ted_spread

load_ted_spread(*, predict: str = 'next', normalize: bool = True, raw: bool = False)

Load the bundled TED spread series (interbank/counterparty-risk measure).

The TED spread is the difference between the 3-month interbank rate and the 3-month Treasury bill; it is a classic gauge of perceived counterparty/credit risk in the banking system. Daily, 1986--2022.

Source: Federal Reserve Bank of St. Louis, FRED, series TEDRATE.

With raw=True returns the unprocessed 1-D level array instead of (u, y).

Source code in esnfed/datasets.py
def load_ted_spread(
    *, predict: str = "next", normalize: bool = True, raw: bool = False
):
    """Load the bundled TED spread series (interbank/counterparty-risk measure).

    The TED spread is the difference between the 3-month interbank rate and the
    3-month Treasury bill; it is a classic gauge of perceived counterparty/credit
    risk in the banking system. Daily, 1986--2022.

    Source: Federal Reserve Bank of St. Louis, FRED, series ``TEDRATE``.

    With ``raw=True`` returns the unprocessed 1-D level array instead of (u, y).
    """
    series = _column_from_csv(
        (_DATA_DIR / "ted_spread.csv").read_text(encoding="utf-8"), "ted_spread"
    )
    if raw:
        return series
    return from_array(series, predict=predict, normalize=normalize)

load_fred

load_fred(series_id: str, *, predict: str = 'next', normalize: bool = True, cache_dir=None, timeout: float = 30.0, raw: bool = False)

Download a series from FRED (no API key) and build a forecasting task.

Useful counterparty/credit-risk series include TEDRATE (TED spread), BAMLH0A0HYM2 (US high-yield credit spread), BAA10Y (Baa credit spread) and STLFSI4 / NFCI (financial stress/conditions). The CSV is cached on disk so repeated calls are offline.

Data are retrieved from FRED (Federal Reserve Bank of St. Louis) and are subject to FRED's terms of use.

Source code in esnfed/datasets.py
def load_fred(
    series_id: str,
    *,
    predict: str = "next",
    normalize: bool = True,
    cache_dir=None,
    timeout: float = 30.0,
    raw: bool = False,
):
    """Download a series from FRED (no API key) and build a forecasting task.

    Useful counterparty/credit-risk series include ``TEDRATE`` (TED spread),
    ``BAMLH0A0HYM2`` (US high-yield credit spread), ``BAA10Y`` (Baa credit
    spread) and ``STLFSI4`` / ``NFCI`` (financial stress/conditions). The CSV is
    cached on disk so repeated calls are offline.

    Data are retrieved from FRED (Federal Reserve Bank of St. Louis) and are
    subject to FRED's terms of use.
    """
    cache = Path(cache_dir) if cache_dir else (Path.home() / ".cache" / "esnfed")
    cache.mkdir(parents=True, exist_ok=True)
    cached = cache / f"fred_{series_id}.csv"
    if cached.exists():
        text = cached.read_text(encoding="utf-8")
    else:
        url = f"https://fred.stlouisfed.org/graph/fredgraph.csv?id={series_id}"
        with _urlopen(url, timeout) as resp:
            text = resp.read().decode("utf-8")
        cached.write_text(text, encoding="utf-8")
    series = _column_from_csv(text, series_id)
    if raw:
        return series
    return from_array(series, predict=predict, normalize=normalize)

load_fred_matrix

load_fred_matrix(series_ids, *, target=None, predict: str = 'next', normalize: bool = True, cache_dir=None, timeout: float = 30.0, raw: bool = False)

Download several FRED series and align them into a multivariate task.

The series are intersected on their common dates into a matrix (T, d); the task is to forecast the next-step value of the target series (default: the first) from the full d-dimensional vector. This exercises high-dimensional multivariate scaling --- e.g. forecasting counterparty risk (TEDRATE) from a panel such as ["TEDRATE", "VIXCLS", "DGS10", "DFF"].

Returns (u, y) with u of shape (T-1, d) and y of shape (T-1, 1) (or the raw (T, d) matrix if raw=True).

Source code in esnfed/datasets.py
def load_fred_matrix(
    series_ids,
    *,
    target=None,
    predict: str = "next",
    normalize: bool = True,
    cache_dir=None,
    timeout: float = 30.0,
    raw: bool = False,
):
    """Download several FRED series and align them into a *multivariate* task.

    The series are intersected on their common dates into a matrix ``(T, d)``;
    the task is to forecast the next-step value of the ``target`` series (default:
    the first) from the full d-dimensional vector. This exercises high-dimensional
    multivariate scaling --- e.g. forecasting counterparty risk (``TEDRATE``) from
    a panel such as ``["TEDRATE", "VIXCLS", "DGS10", "DFF"]``.

    Returns ``(u, y)`` with ``u`` of shape ``(T-1, d)`` and ``y`` of shape
    ``(T-1, 1)`` (or the raw ``(T, d)`` matrix if ``raw=True``).
    """
    if isinstance(series_ids, str):
        series_ids = [series_ids]
    cache = _cache_dir(cache_dir)
    dicts = [_fred_series_dict(_fred_text(s, cache, timeout)) for s in series_ids]
    common = sorted(set.intersection(*[set(d) for d in dicts]))
    if not common:
        raise ValueError("the requested FRED series have no overlapping dates")
    M = np.array([[d[dt] for d in dicts] for dt in common], dtype=float)
    if normalize:
        std = M.std(0)
        M = (M - M.mean(0)) / np.where(std > 0, std, 1.0)
    if raw:
        return M
    ti = series_ids.index(target) if target is not None else 0
    if predict == "next":
        return M[:-1], M[1:, ti : ti + 1]
    if predict == "change":
        D = np.diff(M, axis=0)
        return D[:-1], D[1:, ti : ti + 1]
    raise ValueError("predict must be 'next' or 'change'")

load_japanese_vowels

load_japanese_vowels(*, cache_dir=None, timeout: float = 60.0) -> SequenceDataset

Japanese Vowels (UCI 128): 9 male speakers uttering /ae/, encoded as 12-dimensional LPC cepstra, with variable-length utterances (7--29 frames). The task is speaker identification (9 classes). The natural federation unit is the speaker --- which is also the label, giving an extreme label-skew split in which local-only training cannot work and federation is essential.

Source code in esnfed/datasets.py
def load_japanese_vowels(*, cache_dir=None, timeout: float = 60.0) -> SequenceDataset:
    """Japanese Vowels (UCI 128): 9 male speakers uttering ``/ae/``, encoded as
    12-dimensional LPC cepstra, with variable-length utterances (7--29 frames).
    The task is speaker identification (9 classes). The natural federation unit is
    the speaker --- which is *also* the label, giving an extreme label-skew split
    in which local-only training cannot work and federation is essential.
    """
    cache = _cache_dir(cache_dir)
    zp = cache / "japanese_vowels.zip"
    if not zp.exists():
        url = "https://archive.ics.uci.edu/static/public/128/japanese+vowels.zip"
        with _urlopen(url, timeout) as r:
            zp.write_bytes(r.read())
    with zipfile.ZipFile(zp) as z:
        X_train = _parse_jv_blocks(z.read("ae.train").decode())
        X_test = _parse_jv_blocks(z.read("ae.test").decode())
        y_train = _counts_to_labels(z.read("size_ae.train").decode())
        y_test = _counts_to_labels(z.read("size_ae.test").decode())
    return SequenceDataset(X_train, y_train, y_train.copy(),
                           X_test, y_test, y_test.copy(), 9, 12)

load_har

load_har(*, cache_dir=None, timeout: float = 180.0) -> SequenceDataset

Human Activity Recognition Using Smartphones (UCI 240): 30 subjects, 6 activities, raw 3-axial accelerometer + gyroscope signals windowed into 128-timestep, 9-channel sequences at 50 Hz. The natural federation unit is the subject (groups): every subject performs all activities, so the split is feature-non-i.i.d. (people move differently) without label skew --- ideal for comparing the prediction ensemble against parameter aggregation.

The ~60 MB archive is downloaded once and the parsed arrays are cached as a compressed .npz for fast re-loading.

Source code in esnfed/datasets.py
def load_har(*, cache_dir=None, timeout: float = 180.0) -> SequenceDataset:
    """Human Activity Recognition Using Smartphones (UCI 240): 30 subjects, 6
    activities, raw 3-axial accelerometer + gyroscope signals windowed into
    128-timestep, 9-channel sequences at 50 Hz. The natural federation unit is the
    subject (``groups``): every subject performs all activities, so the split is
    feature-non-i.i.d. (people move differently) without label skew --- ideal for
    comparing the prediction ensemble against parameter aggregation.

    The ~60 MB archive is downloaded once and the parsed arrays are cached as a
    compressed ``.npz`` for fast re-loading.
    """
    cache = _cache_dir(cache_dir)
    npz = cache / "har_parsed.npz"
    if npz.exists():
        d = np.load(npz)
        return SequenceDataset(d["Xtr"], d["ytr"], d["gtr"],
                               d["Xte"], d["yte"], d["gte"], 6, 9)
    zp = cache / "har.zip"
    if not zp.exists():
        url = ("https://archive.ics.uci.edu/static/public/240/"
               "human+activity+recognition+using+smartphones.zip")
        with _urlopen(url, timeout) as r:
            zp.write_bytes(r.read())
    with zipfile.ZipFile(zp) as outer:
        inner = outer.read("UCI HAR Dataset.zip")
    with zipfile.ZipFile(io.BytesIO(inner)) as z:
        def load_split(split):
            base = f"UCI HAR Dataset/{split}/"
            chans = [np.loadtxt(io.BytesIO(
                z.read(f"{base}Inertial Signals/{s}_{split}.txt")))
                for s in _HAR_SIGNALS]
            X = np.stack(chans, axis=2).astype(np.float32)  # (n, 128, 9)
            y = np.loadtxt(io.BytesIO(z.read(f"{base}y_{split}.txt"))).astype(int) - 1
            g = np.loadtxt(io.BytesIO(z.read(f"{base}subject_{split}.txt"))).astype(int)
            return X, y, g
        Xtr, ytr, gtr = load_split("train")
        Xte, yte, gte = load_split("test")
    np.savez_compressed(npz, Xtr=Xtr, ytr=ytr, gtr=gtr, Xte=Xte, yte=yte, gte=gte)
    return SequenceDataset(Xtr, ytr, gtr, Xte, yte, gte, 6, 9)

group_clients

group_clients(X, y, groups)

Split labelled sequences into per-group client datasets.

Returns a list of (sequences, labels) pairs, one per unique value in groups (e.g. one client per speaker or per subject).

Source code in esnfed/datasets.py
def group_clients(X, y, groups):
    """Split labelled sequences into per-group client datasets.

    Returns a list of ``(sequences, labels)`` pairs, one per unique value in
    ``groups`` (e.g. one client per speaker or per subject).
    """
    clients = []
    for g in np.unique(groups):
        idx = np.where(np.asarray(groups) == g)[0]
        clients.append(([X[i] for i in idx], np.asarray(y)[idx]))
    return clients

esnfed.metrics

Error metrics for reservoir computing tasks.

mse

mse(y_true: ndarray, y_pred: ndarray) -> float

Mean squared error.

Source code in esnfed/metrics.py
def mse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Mean squared error."""
    y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
    return float(np.mean((y_true - y_pred) ** 2))

rmse

rmse(y_true: ndarray, y_pred: ndarray) -> float

Root mean squared error.

Source code in esnfed/metrics.py
def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Root mean squared error."""
    return float(np.sqrt(mse(y_true, y_pred)))

nrmse

nrmse(y_true: ndarray, y_pred: ndarray) -> float

Normalised RMSE, scaled by the standard deviation of the target.

NRMSE = 1 corresponds to a trivial predictor outputting the target mean. This is the standard figure of merit for NARMA and Mackey-Glass tasks.

Source code in esnfed/metrics.py
def nrmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Normalised RMSE, scaled by the standard deviation of the target.

    NRMSE = 1 corresponds to a trivial predictor outputting the target mean.
    This is the standard figure of merit for NARMA and Mackey-Glass tasks.
    """
    y_true = np.asarray(y_true)
    sigma = y_true.std()
    if sigma == 0:
        return float("nan")
    return rmse(y_true, y_pred) / sigma

r2_score

r2_score(y_true: ndarray, y_pred: ndarray) -> float

Coefficient of determination R^2.

Source code in esnfed/metrics.py
def r2_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """Coefficient of determination R^2."""
    y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - y_true.mean()) ** 2)
    if ss_tot == 0:
        return float("nan")
    return float(1.0 - ss_res / ss_tot)

esnfed.federated

Federated learning strategies for Echo State Networks.

In an ESN only the linear readout is trained, so the federated problem reduces to learning a shared (or combinable) readout across clients whose reservoirs may or may not share the same structure.

Strategies implemented

train_centralized Pool all client data and fit one readout (the privacy-violating upper bound). train_local Each client fits its own readout on its own data only (no collaboration). federated_ridge Exact federated training for a shared reservoir: clients exchange the ridge sufficient statistics A = Z^T Z and B = Z^T Y; the server sums them and solves once. Mathematically identical to pooled training, but no raw data leaves a client. fedavg Iterative FedAvg (McMahan et al., 2017) on the readout for a shared reservoir: clients run local gradient steps and the server averages weights each round. Produces an accuracy-vs-rounds curve. ensemble_predict For heterogeneous reservoirs: each client keeps its own ESN and the server averages their predictions (an ensemble), so no parameter averaging is needed. structural_alignment Interpolates heterogeneous reservoirs toward a shared target structure; at full alignment the readouts become averageable and exact federated ridge applies. Sweeps the alignment level to expose the transition.

Client dataclass

Client(esn: EchoStateNetwork, u: ndarray, y: ndarray)

A federated client: an ESN plus a local time-series partition.

states

states() -> np.ndarray

Harvest (and cache) post-washout extended states for local data.

Source code in esnfed/federated.py
def states(self) -> np.ndarray:
    """Harvest (and cache) post-washout extended states for local data."""
    if self._Z is None:
        self._Z = self.esn.harvest(self.u)
    return self._Z[self.esn.washout :]

invalidate

invalidate() -> None

Drop the cached states (call after changing the reservoir).

Source code in esnfed/federated.py
def invalidate(self) -> None:
    """Drop the cached states (call after changing the reservoir)."""
    self._Z = None

train_centralized

train_centralized(esn: EchoStateNetwork, clients: list[Client]) -> EchoStateNetwork

Fit one readout on the concatenation of all client states (upper bound).

Source code in esnfed/federated.py
def train_centralized(esn: EchoStateNetwork, clients: list[Client]) -> EchoStateNetwork:
    """Fit one readout on the concatenation of all client states (upper bound)."""
    A = np.zeros((esn.readout_dim, esn.readout_dim))
    B = np.zeros((esn.readout_dim, esn.n_outputs))
    for c in clients:
        Ak, Bk = ridge_statistics(c.states(), c.targets())
        A += Ak
        B += Bk
    esn.set_readout(solve_readout(A, B, esn.ridge))
    return esn

train_local

train_local(clients: list[Client]) -> list[EchoStateNetwork]

Each client fits its own readout on its own data only.

Source code in esnfed/federated.py
def train_local(clients: list[Client]) -> list[EchoStateNetwork]:
    """Each client fits its own readout on its own data only."""
    for c in clients:
        Ak, Bk = ridge_statistics(c.states(), c.targets())
        c.esn.set_readout(solve_readout(Ak, Bk, c.esn.ridge))
    return [c.esn for c in clients]

federated_ridge

federated_ridge(clients: list[Client], esn: EchoStateNetwork) -> np.ndarray

Exact federated readout via summed sufficient statistics (shared reservoir).

Returns the shared W_out. Clients transmit only A_k and B_k (which are independent of dataset size and reveal no individual samples), never raw data. The result equals :func:train_centralized.

Source code in esnfed/federated.py
def federated_ridge(clients: list[Client], esn: EchoStateNetwork) -> np.ndarray:
    """Exact federated readout via summed sufficient statistics (shared reservoir).

    Returns the shared ``W_out``. Clients transmit only ``A_k`` and ``B_k`` (which
    are independent of dataset size and reveal no individual samples), never raw
    data. The result equals :func:`train_centralized`.
    """
    A = np.zeros((esn.readout_dim, esn.readout_dim))
    B = np.zeros((esn.readout_dim, esn.n_outputs))
    for c in clients:
        Ak, Bk = ridge_statistics(c.states(), c.targets())
        A += Ak
        B += Bk
    return solve_readout(A, B, esn.ridge)

federated_ridge_dp

federated_ridge_dp(clients: list[Client], esn: EchoStateNetwork, cfg: PrivacyConfig) -> np.ndarray

Differentially private federated ridge (shared reservoir).

Each client privatises its own statistics with the Gaussian mechanism (clip + noise, see :func:esnfed.privacy.dp_statistics) before they are summed and solved once. The readout is then :math:(\varepsilon, \delta)-DP w.r.t. every client's records. Independent noise is drawn per client from cfg.seed. Unlike :func:federated_ridge this is not exact -- the clipping and noise are the price of the formal privacy guarantee. The noisy, ill-conditioned Gram is solved with a ridge augmented by the spectral scale of the injected noise, so the readout stays well-posed at any budget.

Source code in esnfed/federated.py
def federated_ridge_dp(
    clients: list[Client], esn: EchoStateNetwork, cfg: PrivacyConfig
) -> np.ndarray:
    """Differentially private federated ridge (shared reservoir).

    Each client privatises its own statistics with the Gaussian mechanism (clip +
    noise, see :func:`esnfed.privacy.dp_statistics`) before they are summed and
    solved once. The readout is then :math:`(\\varepsilon, \\delta)`-DP w.r.t. every
    client's records. Independent noise is drawn per client from ``cfg.seed``.
    Unlike :func:`federated_ridge` this is *not* exact -- the clipping and noise
    are the price of the formal privacy guarantee. The noisy, ill-conditioned Gram
    is solved with a ridge augmented by the spectral scale of the injected noise,
    so the readout stays well-posed at any budget.
    """
    A = np.zeros((esn.readout_dim, esn.readout_dim))
    B = np.zeros((esn.readout_dim, esn.n_outputs))
    base = np.random.default_rng(cfg.seed)
    for c in clients:
        child = np.random.default_rng(int(base.integers(0, 2**63 - 1)))
        Ak, Bk = dp_statistics(c.states(), c.targets(), cfg, rng=child)
        A += Ak
        B += Bk
    # Summed Gaussian noise has spectral scale ~ 2*sigma*sqrt(K*D) (semicircle law);
    # regularise by that much, else the readout explodes at small epsilon.
    sigma = gaussian_sigma(cfg.epsilon, cfg.delta, cfg.sensitivity())
    dp_ridge = esn.ridge + 2.0 * sigma * np.sqrt(len(clients) * esn.readout_dim)
    return solve_readout(A, B, dp_ridge)

federated_ridge_secure

federated_ridge_secure(clients: list[Client], esn: EchoStateNetwork, *, seed: int | None = None, mask_scale: float = 1.0) -> np.ndarray

Exact federated ridge via secure aggregation (additive masking).

Clients mask their (A_k, B_k) with pairwise-cancelling noise (see :func:esnfed.privacy.secure_sum), so the server obtains only the masked sum and never an individual client's statistics. The masks cancel, so the solved readout equals :func:federated_ridge up to floating-point round-off (it is exact in the fixed-point/modular arithmetic of a real protocol).

Source code in esnfed/federated.py
def federated_ridge_secure(
    clients: list[Client], esn: EchoStateNetwork, *, seed: int | None = None,
    mask_scale: float = 1.0,
) -> np.ndarray:
    """Exact federated ridge via secure aggregation (additive masking).

    Clients mask their ``(A_k, B_k)`` with pairwise-cancelling noise (see
    :func:`esnfed.privacy.secure_sum`), so the server obtains only the masked sum
    and never an individual client's statistics. The masks cancel, so the solved
    readout equals :func:`federated_ridge` up to floating-point round-off
    (it is exact in the fixed-point/modular arithmetic of a real protocol).
    """
    As, Bs = [], []
    for c in clients:
        Ak, Bk = ridge_statistics(c.states(), c.targets())
        As.append(Ak)
        Bs.append(Bk)
    rng = np.random.default_rng(seed)
    A = secure_sum(As, rng=rng, scale=mask_scale)
    B = secure_sum(Bs, rng=rng, scale=mask_scale)
    return solve_readout(A, B, esn.ridge)

fedavg

fedavg(clients: list[Client], esn: EchoStateNetwork, u_test: ndarray, y_test: ndarray, *, rounds: int = 30, local_epochs: int = 5, lr: float = 1.0) -> tuple[np.ndarray, list[float]]

Iterative FedAvg on the readout for a shared reservoir.

lr is a normalised step in (0, 2); see :func:_local_gradient_step. Returns the final W_out and the list of global test NRMSE values, one per communication round.

Source code in esnfed/federated.py
def fedavg(
    clients: list[Client],
    esn: EchoStateNetwork,
    u_test: np.ndarray,
    y_test: np.ndarray,
    *,
    rounds: int = 30,
    local_epochs: int = 5,
    lr: float = 1.0,
) -> tuple[np.ndarray, list[float]]:
    """Iterative FedAvg on the readout for a shared reservoir.

    ``lr`` is a normalised step in (0, 2); see :func:`_local_gradient_step`.
    Returns the final ``W_out`` and the list of global test NRMSE values, one per
    communication round.
    """
    W = np.zeros((esn.readout_dim, esn.n_outputs))
    total = sum(c.n_samples for c in clients)
    y_test = np.atleast_2d(y_test).reshape(-1, esn.n_outputs)
    Z_test = esn.harvest(u_test)[esn.washout :]
    y_eval = y_test[esn.washout :]

    history: list[float] = []
    for _ in range(rounds):
        updates = []
        for c in clients:
            Wk = _local_gradient_step(
                W, c.states(), c.targets(), lr, esn.ridge, local_epochs
            )
            updates.append((c.n_samples, Wk))
        # Weighted average (FedAvg aggregation).
        W = sum((nk / total) * Wk for nk, Wk in updates)
        history.append(nrmse(y_eval, Z_test @ W))
    return W, history

ensemble_predict

ensemble_predict(clients: list[Client], u_test: ndarray, weights: ndarray | None = None) -> np.ndarray

Average the predictions of locally-trained, heterogeneous client ESNs.

Each client must already have a trained readout (see :func:train_local). No parameter averaging is performed, so the clients may have completely different reservoir structures and input weights.

Source code in esnfed/federated.py
def ensemble_predict(
    clients: list[Client], u_test: np.ndarray, weights: np.ndarray | None = None
) -> np.ndarray:
    """Average the predictions of locally-trained, heterogeneous client ESNs.

    Each client must already have a trained readout (see :func:`train_local`).
    No parameter averaging is performed, so the clients may have completely
    different reservoir structures and input weights.
    """
    preds = [c.esn.predict(u_test) for c in clients]
    P = np.stack(preds, axis=0)  # (n_clients, T, n_outputs)
    if weights is None:
        return P.mean(axis=0)
    w = np.asarray(weights, float)
    w = w / w.sum()
    return np.tensordot(w, P, axes=(0, 0))

interpolate_reservoir

interpolate_reservoir(W_local: ndarray, W_target: ndarray, alpha: float) -> np.ndarray

Blend a client reservoir toward a shared target: (1-a)W_local + a W*.

Source code in esnfed/federated.py
def interpolate_reservoir(
    W_local: np.ndarray, W_target: np.ndarray, alpha: float
) -> np.ndarray:
    """Blend a client reservoir toward a shared target: ``(1-a)W_local + a W*``."""
    return (1.0 - alpha) * W_local + alpha * W_target

structural_alignment

structural_alignment(local_reservoirs: list[ndarray], target_reservoir: ndarray, partitions: list[tuple[ndarray, ndarray]], u_test: ndarray, y_test: ndarray, *, alphas=np.linspace(0.0, 1.0, 11), esn_kwargs: dict | None = None, shared_input_seed: int = 0) -> list[dict]

Sweep alignment level and report ensemble vs. parameter-averaging error.

At each alpha every client's reservoir is blended toward the shared target. Two readouts are evaluated on the global test set:

  • ensemble -- clients keep individual readouts, predictions averaged;
  • fedavg/ridge -- exact federated ridge over the (now more similar) reservoirs, valid in the limit alpha = 1 when structures coincide.

Returns one record per alpha with both test NRMSEs and the mean pairwise reservoir distance (a measure of remaining heterogeneity).

Source code in esnfed/federated.py
def structural_alignment(
    local_reservoirs: list[np.ndarray],
    target_reservoir: np.ndarray,
    partitions: list[tuple[np.ndarray, np.ndarray]],
    u_test: np.ndarray,
    y_test: np.ndarray,
    *,
    alphas=np.linspace(0.0, 1.0, 11),
    esn_kwargs: dict | None = None,
    shared_input_seed: int = 0,
) -> list[dict]:
    """Sweep alignment level and report ensemble vs. parameter-averaging error.

    At each ``alpha`` every client's reservoir is blended toward the shared
    target. Two readouts are evaluated on the global test set:

    * **ensemble** -- clients keep individual readouts, predictions averaged;
    * **fedavg/ridge** -- exact federated ridge over the (now more similar)
      reservoirs, valid in the limit ``alpha = 1`` when structures coincide.

    Returns one record per ``alpha`` with both test NRMSEs and the mean pairwise
    reservoir distance (a measure of remaining heterogeneity).
    """
    esn_kwargs = dict(esn_kwargs or {})
    n_in = partitions[0][0].shape[1]
    n_out = partitions[0][1].shape[1]
    y_test = np.atleast_2d(y_test).reshape(-1, n_out)

    records = []
    for alpha in alphas:
        clients = []
        for W_local, (u, y) in zip(local_reservoirs, partitions):
            W = interpolate_reservoir(W_local, target_reservoir, alpha)
            esn = EchoStateNetwork(
                n_in, n_out, W, seed=shared_input_seed, **esn_kwargs
            )
            clients.append(Client(esn, u, y))

        # Heterogeneity measure: mean pairwise Frobenius distance of reservoirs.
        mats = [c.esn.W for c in clients]
        dists = [
            np.linalg.norm(mats[i] - mats[j])
            for i in range(len(mats))
            for j in range(i + 1, len(mats))
        ]
        heterogeneity = float(np.mean(dists)) if dists else 0.0

        # Ensemble of locally trained readouts.
        train_local(clients)
        ens_pred = ensemble_predict(clients, u_test)
        washout = clients[0].esn.washout
        ens_err = nrmse(y_test[washout:], ens_pred[washout:])

        # Parameter aggregation (exact federated ridge over shared input weights).
        ref = clients[0].esn
        W_out = federated_ridge(clients, ref)
        Z_test = ref.harvest(u_test)[washout:]
        fed_err = nrmse(y_test[washout:], Z_test @ W_out)

        records.append(
            {
                "alpha": float(alpha),
                "heterogeneity": heterogeneity,
                "ensemble_nrmse": ens_err,
                "fedavg_nrmse": fed_err,
            }
        )
    return records

make_shared_clients

make_shared_clients(reservoir: ndarray, partitions: list[tuple[ndarray, ndarray]], *, input_seed: int = 0, esn_kwargs: dict | None = None) -> tuple[list[Client], EchoStateNetwork]

Build clients that all share one reservoir and input weights (homogeneous).

Source code in esnfed/federated.py
def make_shared_clients(
    reservoir: np.ndarray,
    partitions: list[tuple[np.ndarray, np.ndarray]],
    *,
    input_seed: int = 0,
    esn_kwargs: dict | None = None,
) -> tuple[list[Client], EchoStateNetwork]:
    """Build clients that all share one reservoir and input weights (homogeneous)."""
    esn_kwargs = dict(esn_kwargs or {})
    n_in = partitions[0][0].shape[1]
    n_out = partitions[0][1].shape[1]
    clients = [
        Client(EchoStateNetwork(n_in, n_out, reservoir, seed=input_seed, **esn_kwargs), u, y)
        for (u, y) in partitions
    ]
    reference = EchoStateNetwork(n_in, n_out, reservoir, seed=input_seed, **esn_kwargs)
    return clients, reference

make_heterogeneous_clients

make_heterogeneous_clients(reservoirs: list[ndarray], partitions: list[tuple[ndarray, ndarray]], *, esn_kwargs: dict | None = None) -> list[Client]

Build clients each with its own reservoir and input weights (heterogeneous).

Source code in esnfed/federated.py
def make_heterogeneous_clients(
    reservoirs: list[np.ndarray],
    partitions: list[tuple[np.ndarray, np.ndarray]],
    *,
    esn_kwargs: dict | None = None,
) -> list[Client]:
    """Build clients each with its own reservoir and input weights (heterogeneous)."""
    esn_kwargs = dict(esn_kwargs or {})
    n_in = partitions[0][0].shape[1]
    n_out = partitions[0][1].shape[1]
    clients = []
    for i, (W, (u, y)) in enumerate(zip(reservoirs, partitions)):
        esn = EchoStateNetwork(n_in, n_out, W, seed=1000 + i, **esn_kwargs)
        clients.append(Client(esn, u, y))
    return clients

federated_prompt_average

federated_prompt_average(prompt_clients, weights=None)

FedAvg of FedResPrompt controllers (the prompt-specific counterpart of :func:federated_ridge's statistics sharing).

Averages each client's readout W_out and bottleneck projection P and broadcasts the means back, so the edge devices converge on a shared prompt controller while keeping their data local. Clients are duck-typed: each must expose W_out and projection.P (see :class:esnfed.llm_orchestration.EdgeClient).

Source code in esnfed/federated.py
def federated_prompt_average(prompt_clients, weights=None):
    """FedAvg of FedResPrompt controllers (the prompt-specific counterpart of
    :func:`federated_ridge`'s statistics sharing).

    Averages each client's readout ``W_out`` and bottleneck projection ``P`` and
    broadcasts the means back, so the edge devices converge on a shared prompt
    controller while keeping their data local. Clients are duck-typed: each must
    expose ``W_out`` and ``projection.P`` (see
    :class:`esnfed.llm_orchestration.EdgeClient`).
    """
    n = len(prompt_clients)
    if n == 0:
        raise ValueError("no clients to aggregate")
    w = np.ones(n) / n if weights is None else np.asarray(weights, float)
    w = w / w.sum()
    W_mean = sum(wi * c.W_out for wi, c in zip(w, prompt_clients))
    P_mean = sum(wi * c.projection.P for wi, c in zip(w, prompt_clients))
    for c in prompt_clients:
        c.W_out = W_mean.copy()
        c.projection.P = P_mean.copy()
    return W_mean, P_mean

esnfed.classification

Sequence classification with Echo State Networks, and its federated variants.

For classification each input is a (possibly variable-length) sequence that maps to a single class label. The reservoir turns each sequence into a fixed feature vector (the mean or last extended state); a ridge readout on one-hot targets then classifies by argmax. Because the readout is again a ridge regression, the exact federated scheme of :mod:esnfed.federated carries over unchanged: clients exchange the summed statistics A = F^T F and B = F^T Y and the server solves once, recovering the pooled classifier exactly.

reservoir_features

reservoir_features(esn: EchoStateNetwork, sequences, pool: str = 'mean') -> np.ndarray

Map each sequence to a fixed feature vector via the reservoir.

Parameters:

Name Type Description Default
sequences

Iterable of arrays of shape (T_i, n_inputs) (lengths may differ).

required
pool str

"mean" (default) averages the extended states over time; "last" takes the final state.

'mean'
Source code in esnfed/classification.py
def reservoir_features(esn: EchoStateNetwork, sequences, pool: str = "mean") -> np.ndarray:
    """Map each sequence to a fixed feature vector via the reservoir.

    Parameters
    ----------
    sequences
        Iterable of arrays of shape ``(T_i, n_inputs)`` (lengths may differ).
    pool
        ``"mean"`` (default) averages the extended states over time; ``"last"``
        takes the final state.
    """
    feats = []
    for seq in sequences:
        Z = esn.harvest(np.atleast_2d(seq).reshape(-1, esn.n_inputs))
        if pool == "mean":
            feats.append(Z.mean(axis=0))
        elif pool == "last":
            feats.append(Z[-1])
        else:
            raise ValueError("pool must be 'mean' or 'last'")
    return np.asarray(feats, dtype=float)

class_statistics

class_statistics(esn, sequences, labels, n_classes, pool='mean')

Ridge sufficient statistics (A, B) for a set of labelled sequences.

Source code in esnfed/classification.py
def class_statistics(esn, sequences, labels, n_classes, pool="mean"):
    """Ridge sufficient statistics ``(A, B)`` for a set of labelled sequences."""
    F = reservoir_features(esn, sequences, pool)
    return ridge_statistics(F, one_hot(labels, n_classes))

train_classifier

train_classifier(esn, sequences, labels, n_classes, pool='mean') -> np.ndarray

Centralised classifier readout (ridge on reservoir features).

Source code in esnfed/classification.py
def train_classifier(esn, sequences, labels, n_classes, pool="mean") -> np.ndarray:
    """Centralised classifier readout (ridge on reservoir features)."""
    A, B = class_statistics(esn, sequences, labels, n_classes, pool)
    return solve_readout(A, B, esn.ridge)

federated_classifier

federated_classifier(esn, client_data, n_classes, pool='mean') -> np.ndarray

Exact federated classifier: sum each client's (A_k, B_k) and solve once.

client_data is a list of (sequences, labels) pairs. The result equals centralised training on the pooled data, but no client shares raw sequences.

Source code in esnfed/classification.py
def federated_classifier(esn, client_data, n_classes, pool="mean") -> np.ndarray:
    """Exact federated classifier: sum each client's ``(A_k, B_k)`` and solve once.

    ``client_data`` is a list of ``(sequences, labels)`` pairs. The result equals
    centralised training on the pooled data, but no client shares raw sequences.
    """
    d = esn.readout_dim
    A = np.zeros((d, d))
    B = np.zeros((d, n_classes))
    for sequences, labels in client_data:
        Ak, Bk = class_statistics(esn, sequences, labels, n_classes, pool)
        A += Ak
        B += Bk
    return solve_readout(A, B, esn.ridge)

predict_labels

predict_labels(esn, sequences, W_out, pool='mean') -> np.ndarray

Predict class labels (argmax of the readout) for each sequence.

Source code in esnfed/classification.py
def predict_labels(esn, sequences, W_out, pool="mean") -> np.ndarray:
    """Predict class labels (argmax of the readout) for each sequence."""
    F = reservoir_features(esn, sequences, pool)
    return np.argmax(F @ W_out, axis=1)

predict_proba

predict_proba(esn, sequences, W_out, pool='mean') -> np.ndarray

Softmax class probabilities for each sequence.

Source code in esnfed/classification.py
def predict_proba(esn, sequences, W_out, pool="mean") -> np.ndarray:
    """Softmax class probabilities for each sequence."""
    F = reservoir_features(esn, sequences, pool)
    logits = F @ W_out
    logits -= logits.max(axis=1, keepdims=True)
    p = np.exp(logits)
    return p / p.sum(axis=1, keepdims=True)

ensemble_classify

ensemble_classify(members, sequences, pool='mean') -> np.ndarray

Average the class probabilities of heterogeneous member classifiers.

members is a list of (esn, W_out) pairs (each may have its own reservoir); predictions are combined by averaging softmax probabilities.

Source code in esnfed/classification.py
def ensemble_classify(members, sequences, pool="mean") -> np.ndarray:
    """Average the class probabilities of heterogeneous member classifiers.

    ``members`` is a list of ``(esn, W_out)`` pairs (each may have its own
    reservoir); predictions are combined by averaging softmax probabilities.
    """
    probs = None
    for esn, W_out in members:
        p = predict_proba(esn, sequences, W_out, pool)
        probs = p if probs is None else probs + p
    return np.argmax(probs, axis=1)

esnfed.privacy

Privacy-preserving aggregation for exact federated ridge.

Exact federated ridge (:mod:esnfed.federated) already keeps raw samples on the device: clients exchange only the sufficient statistics A = Z^T Z and B = Z^T Y. Those summed second moments still encode information about individual records, so this module hardens the exchange in two complementary ways, both NumPy-only:

  • Differential privacy (:func:dp_statistics) -- output privacy. Each client clips its per-record contribution and adds calibrated Gaussian noise to (A, B), giving a formal :math:(\varepsilon, \delta) guarantee on the statistics it releases (Dwork & Roth, 2014). Even the released statistics cannot be used to single out a record.
  • Secure aggregation (:func:secure_sum) -- aggregation privacy. Clients add pairwise-cancelling random masks, so the server learns only the sum sum_k (A_k, B_k) and never any individual client's statistics (Bonawitz et al., 2017).

The two compose: a client can privatise its statistics and then mask them. See :func:esnfed.federated.federated_ridge_dp and :func:esnfed.federated.federated_ridge_secure for the client-level wrappers.

PrivacyConfig dataclass

PrivacyConfig(epsilon: float, delta: float = 1e-05, clip_state: float = 1.0, clip_target: float = 1.0, seed: int | None = None)

:math:(\varepsilon, \delta)-DP configuration for releasing (A, B).

clip_state (:math:C_z) and clip_target (:math:C_y) bound the per-record L2 norms of the extended state and the target; this fixes the mechanism's sensitivity. Smaller clips mean less noise but more bias.

sensitivity

sensitivity() -> float

Joint L2 (Frobenius) sensitivity of (A, B) to one record.

Adding/removing one record changes A by z z^T and B by z y^T; with ||z|| <= C_z and ||y|| <= C_y their Frobenius norms are C_z^2 and C_z C_y, so the joint sensitivity of the released pair is C_z * sqrt(C_z^2 + C_y^2).

Source code in esnfed/privacy.py
def sensitivity(self) -> float:
    """Joint L2 (Frobenius) sensitivity of ``(A, B)`` to one record.

    Adding/removing one record changes ``A`` by ``z z^T`` and ``B`` by
    ``z y^T``; with ``||z|| <= C_z`` and ``||y|| <= C_y`` their Frobenius norms
    are ``C_z^2`` and ``C_z C_y``, so the joint sensitivity of the released
    pair is ``C_z * sqrt(C_z^2 + C_y^2)``.
    """
    cz, cy = self.clip_state, self.clip_target
    return cz * math.sqrt(cz * cz + cy * cy)

gaussian_sigma

gaussian_sigma(epsilon: float, delta: float, sensitivity: float, *, method: str = 'analytic') -> float

Noise std dev for the Gaussian mechanism under :math:(\varepsilon,\delta)-DP.

method="analytic" (default) uses the analytic Gaussian mechanism (Balle & Wang, 2018): the smallest sigma for which the mechanism is :math:(\varepsilon, \delta)-DP, found by a short bisection. It is valid for any epsilon > 0 and is never looser than the classic bound.

method="classic" uses the textbook bound (Dwork & Roth, 2014, App. A), :math:\sigma = \Delta_2 \sqrt{2\ln(1.25/\delta)} / \varepsilon, which is only valid for epsilon <= 1 (a warning is issued above that).

Source code in esnfed/privacy.py
def gaussian_sigma(epsilon: float, delta: float, sensitivity: float,
                   *, method: str = "analytic") -> float:
    """Noise std dev for the Gaussian mechanism under :math:`(\\varepsilon,\\delta)`-DP.

    ``method="analytic"`` (default) uses the **analytic Gaussian mechanism**
    (Balle & Wang, 2018): the smallest ``sigma`` for which the mechanism is
    :math:`(\\varepsilon, \\delta)`-DP, found by a short bisection. It is valid for
    *any* ``epsilon > 0`` and is never looser than the classic bound.

    ``method="classic"`` uses the textbook bound (Dwork & Roth, 2014, App. A),
    :math:`\\sigma = \\Delta_2 \\sqrt{2\\ln(1.25/\\delta)} / \\varepsilon`, which is only
    valid for ``epsilon <= 1`` (a warning is issued above that).
    """
    if not (0.0 < delta < 1.0):
        raise ValueError("delta must be in (0, 1)")
    if epsilon <= 0.0:
        raise ValueError("epsilon must be > 0")
    if sensitivity < 0.0:
        raise ValueError("sensitivity must be >= 0")
    if sensitivity == 0.0:
        return 0.0

    if method == "classic":
        if epsilon > 1.0:
            warnings.warn(
                "the classic Gaussian-mechanism bound assumes epsilon <= 1; use "
                "method='analytic' for a guarantee at larger epsilon",
                stacklevel=2,
            )
        return sensitivity * math.sqrt(2.0 * math.log(1.25 / delta)) / epsilon

    if method != "analytic":
        raise ValueError("method must be 'analytic' or 'classic'")

    # Analytic Gaussian mechanism. With the scale-free ratio s = sigma/Delta, the
    # mechanism is (epsilon, delta)-DP iff B(s) <= delta, where B is decreasing:
    #   B(s) = Phi(1/(2s) - eps*s) - e^eps * Phi(-1/(2s) - eps*s).
    def B(s: float) -> float:
        return (_std_normal_cdf(1.0 / (2.0 * s) - epsilon * s)
                - math.exp(epsilon) * _std_normal_cdf(-1.0 / (2.0 * s) - epsilon * s))

    lo, hi = 1e-9, 1.0
    while B(hi) > delta:
        hi *= 2.0
    for _ in range(200):
        mid = 0.5 * (lo + hi)
        if B(mid) > delta:
            lo = mid
        else:
            hi = mid
    return sensitivity * hi

clip_rows

clip_rows(M: ndarray, max_norm: float) -> np.ndarray

Scale each row of M so its L2 norm is at most max_norm.

Source code in esnfed/privacy.py
def clip_rows(M: np.ndarray, max_norm: float) -> np.ndarray:
    """Scale each row of ``M`` so its L2 norm is at most ``max_norm``."""
    if max_norm <= 0.0:
        raise ValueError("max_norm must be > 0")
    M = np.asarray(M, dtype=np.float64)
    norms = np.linalg.norm(M, axis=1, keepdims=True)
    return M * np.minimum(1.0, max_norm / np.maximum(norms, 1e-12))

dp_statistics

dp_statistics(Z, Y, cfg: PrivacyConfig, rng=None)

Differentially private sufficient statistics (A, B) for one client.

Clips each post-washout record to the norms in cfg, forms A, B from the clipped data, and adds Gaussian noise calibrated by :func:gaussian_sigma to every entry (the Gaussian mechanism). A is symmetrised afterwards (post-processing, which preserves DP). The result is :math:(\varepsilon, \delta)-DP w.r.t. the client's records; summing the private statistics across clients and solving yields a private federated readout. Because of clipping and noise this is not exact (unlike :func:esnfed.federated.federated_ridge) -- it trades accuracy for privacy.

Source code in esnfed/privacy.py
def dp_statistics(Z, Y, cfg: PrivacyConfig, rng=None):
    """Differentially private sufficient statistics ``(A, B)`` for one client.

    Clips each post-washout record to the norms in ``cfg``, forms ``A, B`` from
    the clipped data, and adds Gaussian noise calibrated by :func:`gaussian_sigma`
    to every entry (the Gaussian mechanism). ``A`` is symmetrised afterwards
    (post-processing, which preserves DP). The result is
    :math:`(\\varepsilon, \\delta)`-DP w.r.t. the client's records; summing the
    private statistics across clients and solving yields a private federated
    readout. Because of clipping and noise this is *not* exact (unlike
    :func:`esnfed.federated.federated_ridge`) -- it trades accuracy for privacy.
    """
    rng = _as_rng(cfg.seed if rng is None else rng)
    Zc = clip_rows(Z, cfg.clip_state)
    Yc = clip_rows(np.atleast_2d(np.asarray(Y, dtype=np.float64)), cfg.clip_target)
    A, B = ridge_statistics(Zc, Yc)
    sigma = gaussian_sigma(cfg.epsilon, cfg.delta, cfg.sensitivity())
    A = A + rng.normal(0.0, sigma, size=A.shape)
    A = 0.5 * (A + A.T)  # restore symmetry (post-processing of the DP output)
    B = B + rng.normal(0.0, sigma, size=B.shape)
    return A, B

zero_sum_masks

zero_sum_masks(n: int, shape, rng=None, scale: float = 1.0) -> list[np.ndarray]

Return n mask arrays of shape that sum to (numerically) zero.

Pairwise masking (Bonawitz et al., 2017): for each pair (i, j) a shared random mask is added by client i and subtracted by client j, so every client's contribution is hidden yet all masks cancel on summation.

Source code in esnfed/privacy.py
def zero_sum_masks(n: int, shape, rng=None, scale: float = 1.0) -> list[np.ndarray]:
    """Return ``n`` mask arrays of ``shape`` that sum to (numerically) zero.

    Pairwise masking (Bonawitz et al., 2017): for each pair ``(i, j)`` a shared
    random mask is added by client ``i`` and subtracted by client ``j``, so every
    client's contribution is hidden yet all masks cancel on summation.
    """
    rng = _as_rng(rng)
    masks = [np.zeros(shape, dtype=np.float64) for _ in range(n)]
    for i in range(n):
        for j in range(i + 1, n):
            r = rng.normal(0.0, scale, size=shape)
            masks[i] += r
            masks[j] -= r
    return masks

secure_sum

secure_sum(arrays, rng=None, scale: float = 1.0) -> np.ndarray

Sum per-client arrays without revealing any individual one.

Each client adds a pairwise-cancelling mask before sending; the masks cancel in the sum, so the server recovers sum_k arrays[k] (exactly in fixed-point/modular arithmetic; here up to floating-point round-off) while never seeing an unmasked client array.

Source code in esnfed/privacy.py
def secure_sum(arrays, rng=None, scale: float = 1.0) -> np.ndarray:
    """Sum per-client arrays without revealing any individual one.

    Each client adds a pairwise-cancelling mask before sending; the masks cancel
    in the sum, so the server recovers ``sum_k arrays[k]`` (exactly in
    fixed-point/modular arithmetic; here up to floating-point round-off) while
    never seeing an unmasked client array.
    """
    arrays = [np.asarray(a, dtype=np.float64) for a in arrays]
    if not arrays:
        raise ValueError("no arrays to aggregate")
    masks = zero_sum_masks(len(arrays), arrays[0].shape, rng=rng, scale=scale)
    total = np.zeros_like(arrays[0])
    for a, m in zip(arrays, masks):
        total += a + m  # the server only ever adds masked contributions
    return total

esnfed.streaming

Incremental / streaming ridge for continual (federated) learning.

The ridge readout depends on the data only through the sums A = Z^T Z and B = Z^T Y (see :mod:esnfed.federated), so training can be made incremental simply by accumulating those sums as new data arrives -- the result is identical to batch training on all data seen so far. Two tools are provided:

  • :class:StreamingRidge -- accumulate (A, B) over chunks and solve on demand. :meth:StreamingRidge.merge adds another accumulator's statistics, which is exactly the federated sum, so clients can stream locally and the server periodically merges and re-solves (continual federated learning).
  • :class:RLSReadout -- recursive least squares: rank-1 updates of the readout and of the inverse Gram matrix (Sherman-Morrison) at O(D^2) per sample, for true per-sample online learning without re-solving. With unit forgetting it converges to the same readout as batch ridge.

StreamingRidge dataclass

StreamingRidge(readout_dim: int, n_outputs: int, ridge: float = 1e-06)

Accumulate ridge sufficient statistics incrementally; solve on demand.

Exact: after any sequence of :meth:update calls the readout equals batch ridge over all data seen so far.

update

update(Z, Y) -> 'StreamingRidge'

Accumulate a new batch of extended states Z and targets Y.

Source code in esnfed/streaming.py
def update(self, Z, Y) -> "StreamingRidge":
    """Accumulate a new batch of extended states ``Z`` and targets ``Y``."""
    Z = np.asarray(Z, dtype=np.float64)
    Y = np.atleast_2d(np.asarray(Y, dtype=np.float64))
    if Y.shape[0] != Z.shape[0]:
        Y = Y.reshape(Z.shape[0], -1)
    Ak, Bk = ridge_statistics(Z, Y)
    self.A += Ak
    self.B += Bk
    self.n_seen += Z.shape[0]
    return self

merge

merge(other: 'StreamingRidge') -> 'StreamingRidge'

Add another accumulator's statistics (exactly the federated sum).

Source code in esnfed/streaming.py
def merge(self, other: "StreamingRidge") -> "StreamingRidge":
    """Add another accumulator's statistics (exactly the federated sum)."""
    self.A += other.A
    self.B += other.B
    self.n_seen += other.n_seen
    return self

readout

readout() -> np.ndarray

Solve (A + ridge I) W = B for the current readout.

Source code in esnfed/streaming.py
def readout(self) -> np.ndarray:
    """Solve ``(A + ridge I) W = B`` for the current readout."""
    return solve_readout(self.A, self.B, self.ridge)

RLSReadout dataclass

RLSReadout(readout_dim: int, n_outputs: int, ridge: float = 1e-06, forgetting: float = 1.0)

Recursive least squares readout (online ridge via Sherman-Morrison).

Maintains the readout W and the inverse Gram P = (sum z z^T + ridge I)^{-1} and updates both with each sample at O(D^2) cost. Initialised with P = I / ridge so the ridge term acts as the usual Tikhonov prior; with forgetting = 1.0 the readout after processing all samples equals batch ridge. forgetting < 1 down-weights old samples (useful for non-stationary streams).

update

update(z, y) -> 'RLSReadout'

One rank-1 update from a single record (z, y).

Source code in esnfed/streaming.py
def update(self, z, y) -> "RLSReadout":
    """One rank-1 update from a single record ``(z, y)``."""
    z = np.asarray(z, dtype=np.float64).reshape(-1)
    y = np.asarray(y, dtype=np.float64).reshape(-1)
    lam = self.forgetting
    Pz = self.P @ z
    denom = lam + float(z @ Pz)
    k = Pz / denom
    err = y - self.W.T @ z
    self.W = self.W + np.outer(k, err)
    self.P = (self.P - np.outer(k, Pz)) / lam
    self.n_seen += 1
    return self

update_batch

update_batch(Z, Y) -> 'RLSReadout'

Apply :meth:update to each row of (Z, Y) in order.

Source code in esnfed/streaming.py
def update_batch(self, Z, Y) -> "RLSReadout":
    """Apply :meth:`update` to each row of ``(Z, Y)`` in order."""
    Z = np.asarray(Z, dtype=np.float64)
    Y = np.atleast_2d(np.asarray(Y, dtype=np.float64))
    if Y.shape[0] != Z.shape[0]:
        Y = Y.reshape(Z.shape[0], -1)
    for z, y in zip(Z, Y):
        self.update(z, y)
    return self

esnfed.interop

Interoperability with ReservoirPy.

Use ReservoirPy <https://reservoirpy.readthedocs.io>_ to design reservoirs (its strength: rich node API, hyper-parameter search) and esnfed to federate them. These adapters lift the reservoir and input weights out of a ReservoirPy Reservoir node and wrap them in an :class:esnfed.EchoStateNetwork, so a reservoir tuned in ReservoirPy can be dropped straight into the federated strategies of :mod:esnfed.federated.

ReservoirPy is an optional dependency::

pip install "esnfed[reservoirpy]"

reservoir_matrix

reservoir_matrix(reservoir, n_inputs: int = 1) -> np.ndarray

Extract the dense recurrent weight matrix W from a ReservoirPy reservoir.

Source code in esnfed/interop.py
def reservoir_matrix(reservoir, n_inputs: int = 1) -> np.ndarray:
    """Extract the dense recurrent weight matrix ``W`` from a ReservoirPy reservoir."""
    _ensure_initialized(reservoir, n_inputs)
    return _to_dense(reservoir.W)

input_matrix

input_matrix(reservoir, n_inputs: int = 1) -> np.ndarray

Build an esnfed input matrix [bias | Win] from a ReservoirPy reservoir.

esnfed packs the bias into column 0 of the input matrix, whereas ReservoirPy keeps a separate bias vector; this helper reconciles the two conventions.

Source code in esnfed/interop.py
def input_matrix(reservoir, n_inputs: int = 1) -> np.ndarray:
    """Build an esnfed input matrix ``[bias | Win]`` from a ReservoirPy reservoir.

    esnfed packs the bias into column 0 of the input matrix, whereas ReservoirPy
    keeps a separate bias vector; this helper reconciles the two conventions.
    """
    _ensure_initialized(reservoir, n_inputs)
    win = _to_dense(reservoir.Win)  # (units, input_dim)
    n_units = win.shape[0]
    bias = _to_dense(getattr(reservoir, "bias", None))
    if bias is None or bias.size == 0:
        bias_col = np.zeros((n_units, 1))
    elif bias.size == n_units:
        bias_col = bias.reshape(n_units, 1)
    elif bias.size == 1:
        # Scalar bias shared across units.
        bias_col = np.full((n_units, 1), float(bias.ravel()[0]))
    else:
        bias_col = np.zeros((n_units, 1))
    return np.hstack([bias_col, win])

to_esn

to_esn(reservoir, *, n_inputs: int = 1, n_outputs: int = 1, use_input_weights: bool = True, spectral_radius: float | None = None, **esn_kwargs) -> EchoStateNetwork

Wrap a ReservoirPy Reservoir as an :class:esnfed.EchoStateNetwork.

By default the reservoir's own spectral radius and leaking rate are preserved (so the dynamics are unchanged) and its input weights and bias are reused. Pass spectral_radius or other ESN keyword arguments to override.

Parameters:

Name Type Description Default
reservoir

A reservoirpy.nodes.Reservoir instance.

required
n_inputs int

Task dimensions (used to initialise the reservoir if needed).

1
n_outputs int

Task dimensions (used to initialise the reservoir if needed).

1
use_input_weights bool

If true, reuse ReservoirPy's input weights and bias; otherwise let esnfed draw fresh random input weights.

True
Source code in esnfed/interop.py
def to_esn(
    reservoir,
    *,
    n_inputs: int = 1,
    n_outputs: int = 1,
    use_input_weights: bool = True,
    spectral_radius: float | None = None,
    **esn_kwargs,
) -> EchoStateNetwork:
    """Wrap a ReservoirPy ``Reservoir`` as an :class:`esnfed.EchoStateNetwork`.

    By default the reservoir's own spectral radius and leaking rate are preserved
    (so the dynamics are unchanged) and its input weights and bias are reused.
    Pass ``spectral_radius`` or other ESN keyword arguments to override.

    Parameters
    ----------
    reservoir
        A ``reservoirpy.nodes.Reservoir`` instance.
    n_inputs, n_outputs
        Task dimensions (used to initialise the reservoir if needed).
    use_input_weights
        If true, reuse ReservoirPy's input weights and bias; otherwise let
        esnfed draw fresh random input weights.
    """
    _ensure_initialized(reservoir, n_inputs)
    W = reservoir_matrix(reservoir, n_inputs)

    # Preserve the reservoir's actual spectral radius unless told otherwise.
    if spectral_radius is None:
        spectral_radius = _spectral_radius(W)
    # Preserve the leaking rate if the caller did not set one.
    if "leaking_rate" not in esn_kwargs and hasattr(reservoir, "lr"):
        esn_kwargs["leaking_rate"] = float(np.asarray(reservoir.lr).ravel()[0])

    if use_input_weights:
        win = input_matrix(reservoir, n_inputs)
        # Column 0 already holds ReservoirPy's bias vector, so use bias=1.0.
        esn_kwargs.setdefault("bias", 1.0)
        return EchoStateNetwork(
            n_inputs, n_outputs, W,
            spectral_radius=spectral_radius, input_weights=win, **esn_kwargs,
        )
    return EchoStateNetwork(
        n_inputs, n_outputs, W, spectral_radius=spectral_radius, **esn_kwargs,
    )

esnfed.viz

Visualisation utilities for Echo State Networks (optional).

Four ways to see an ESN:

  • :func:plot_reservoir -- the reservoir connectivity as a network graph (nodes sized/coloured by degree); makes the topology differences visible.
  • :func:plot_spectrum -- the reservoir eigenvalues in the complex plane with the unit circle and spectral radius; makes the echo state property visible.
  • :func:plot_states -- a sample of reservoir activations over time (the "echoes"); shows whether the reservoir is rich or saturated.
  • :func:plot_forecast -- predicted vs. actual with the NRMSE in the title.

Three backends are supported and selected with backend=: "plotly" (the default, interactive), "matplotlib" and "seaborn" (seaborn is matplotlib with the seaborn theme applied). The plotting libraries are imported lazily, so importing this module never pulls them in; install them with::

pip install "esnfed[viz]"

Each function returns the native figure object (a Plotly Figure or a Matplotlib Figure); use :func:save or the object's own methods to render.

plot_reservoir

plot_reservoir(obj, *, backend: str = DEFAULT_BACKEND, layout: str = 'spring', seed: int = 0, title: str | None = None)

Draw the reservoir connectivity as a network graph.

Source code in esnfed/viz.py
def plot_reservoir(obj, *, backend: str = DEFAULT_BACKEND, layout: str = "spring",
                   seed: int = 0, title: str | None = None):
    """Draw the reservoir connectivity as a network graph."""
    import networkx as nx

    _check_backend(backend)
    W = _reservoir_matrix(obj)
    n = W.shape[0]
    A = (W != 0).astype(int)
    G = nx.from_numpy_array(A, create_using=nx.DiGraph)
    layouts = {
        "spring": lambda g: nx.spring_layout(g, seed=seed),
        "circular": nx.circular_layout,
        "kamada_kawai": nx.kamada_kawai_layout,
    }
    if layout not in layouts:
        raise ValueError(f"layout must be one of {list(layouts)}")
    pos = layouts[layout](G)
    deg = _degrees(W)
    dmax = max(int(deg.max()), 1)
    title = title or f"Reservoir topology (N={n}, {int(A.sum())} edges)"

    if backend == "plotly":
        go = _plotly()
        ex, ey = [], []
        for u, v in G.edges():
            ex += [pos[u][0], pos[v][0], None]
            ey += [pos[u][1], pos[v][1], None]
        edge_trace = go.Scatter(x=ex, y=ey, mode="lines",
                                line=dict(width=0.4, color="rgba(120,120,120,0.4)"),
                                hoverinfo="none")
        node_trace = go.Scatter(
            x=[pos[i][0] for i in G.nodes()], y=[pos[i][1] for i in G.nodes()],
            mode="markers", hoverinfo="text",
            text=[f"node {i} · degree {int(deg[i])}" for i in G.nodes()],
            marker=dict(size=[6 + 22 * deg[i] / dmax for i in G.nodes()],
                        color=[int(deg[i]) for i in G.nodes()],
                        colorscale="Viridis", showscale=True,
                        colorbar=dict(title="degree"), line=dict(width=0.5,
                        color="white")),
        )
        fig = go.Figure([edge_trace, node_trace])
        fig.update_layout(title=title, showlegend=False, template="plotly_white",
                          xaxis=dict(visible=False), yaxis=dict(visible=False))
        return fig

    plt = _matplotlib(backend)
    fig, ax = plt.subplots(figsize=(6, 5))
    for u, v in G.edges():
        ax.plot([pos[u][0], pos[v][0]], [pos[u][1], pos[v][1]],
                color="grey", lw=0.3, alpha=0.4, zorder=1)
    xs = [pos[i][0] for i in G.nodes()]
    ys = [pos[i][1] for i in G.nodes()]
    sc = ax.scatter(xs, ys, c=deg, s=[20 + 120 * d / dmax for d in deg],
                    cmap="viridis", edgecolors="white", linewidths=0.5, zorder=2)
    fig.colorbar(sc, ax=ax, label="degree")
    ax.set_title(title)
    ax.set_axis_off()
    fig.tight_layout()
    return fig

plot_spectrum

plot_spectrum(obj, *, backend: str = DEFAULT_BACKEND, title: str | None = None)

Plot the reservoir eigenvalues with the unit circle and spectral radius.

Source code in esnfed/viz.py
def plot_spectrum(obj, *, backend: str = DEFAULT_BACKEND, title: str | None = None):
    """Plot the reservoir eigenvalues with the unit circle and spectral radius."""
    _check_backend(backend)
    W = _reservoir_matrix(obj)
    eig = np.linalg.eigvals(W)
    sr = float(np.max(np.abs(eig))) if eig.size else 0.0
    title = title or f"Reservoir spectrum (spectral radius = {sr:.3f})"
    theta = np.linspace(0, 2 * np.pi, 256)

    if backend == "plotly":
        go = _plotly()
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=np.cos(theta), y=np.sin(theta), mode="lines",
                                 line=dict(color="grey", dash="dash"),
                                 name="unit circle"))
        if sr > 0:
            fig.add_trace(go.Scatter(x=sr * np.cos(theta), y=sr * np.sin(theta),
                                     mode="lines", line=dict(color="#C44E52"),
                                     name=f"ρ = {sr:.3f}"))
        fig.add_trace(go.Scatter(x=eig.real, y=eig.imag, mode="markers",
                                 marker=dict(size=5, color="#4C72B0", opacity=0.7),
                                 name="eigenvalues"))
        fig.update_layout(title=title, template="plotly_white",
                          xaxis_title="Re", yaxis_title="Im")
        fig.update_yaxes(scaleanchor="x", scaleratio=1)
        return fig

    plt = _matplotlib(backend)
    fig, ax = plt.subplots(figsize=(5.2, 5))
    ax.plot(np.cos(theta), np.sin(theta), color="grey", ls="--", label="unit circle")
    if sr > 0:
        ax.plot(sr * np.cos(theta), sr * np.sin(theta), color="#C44E52",
                label=f"$\\rho$ = {sr:.3f}")
    ax.scatter(eig.real, eig.imag, s=18, color="#4C72B0", alpha=0.7,
               label="eigenvalues")
    ax.axhline(0, color="black", lw=0.5)
    ax.axvline(0, color="black", lw=0.5)
    ax.set_aspect("equal")
    ax.set_xlabel("Re")
    ax.set_ylabel("Im")
    ax.set_title(title)
    ax.legend(loc="upper right", fontsize=8)
    fig.tight_layout()
    return fig

plot_states

plot_states(esn, u, *, n_neurons: int = 8, max_steps: int = 300, backend: str = DEFAULT_BACKEND, seed: int = 0, title: str | None = None)

Plot a sample of reservoir activations over time.

Source code in esnfed/viz.py
def plot_states(esn, u, *, n_neurons: int = 8, max_steps: int = 300,
                backend: str = DEFAULT_BACKEND, seed: int = 0,
                title: str | None = None):
    """Plot a sample of reservoir activations over time."""
    _check_backend(backend)
    u = np.atleast_2d(np.asarray(u, dtype=float))
    if u.shape[0] < u.shape[1]:
        u = u.reshape(-1, esn.n_inputs)
    Z = esn.harvest(u[:max_steps])
    states = Z[:, 1 + esn.n_inputs:]
    rng = np.random.default_rng(seed)
    k = min(n_neurons, states.shape[1])
    idx = rng.choice(states.shape[1], size=k, replace=False)
    t = np.arange(states.shape[0])
    title = title or f"Reservoir activations ({k} of {states.shape[1]} neurons)"

    if backend == "plotly":
        go = _plotly()
        fig = go.Figure()
        for j in idx:
            fig.add_trace(go.Scatter(x=t, y=states[:, j], mode="lines",
                                     line=dict(width=1), name=f"neuron {j}"))
        fig.update_layout(title=title, template="plotly_white",
                          xaxis_title="time step", yaxis_title="activation x(t)")
        return fig

    plt = _matplotlib(backend)
    fig, ax = plt.subplots(figsize=(8, 4))
    for j in idx:
        ax.plot(t, states[:, j], lw=0.8, alpha=0.85, label=f"neuron {j}")
    ax.set_xlabel("time step")
    ax.set_ylabel("activation x(t)")
    ax.set_title(title)
    if k <= 10:
        ax.legend(ncol=2, fontsize=7)
    fig.tight_layout()
    return fig

plot_forecast

plot_forecast(y_true, y_pred, *, backend: str = DEFAULT_BACKEND, washout: int = 0, title: str | None = None, max_points: int = 2000)

Overlay predicted vs. actual, with the NRMSE in the title.

Source code in esnfed/viz.py
def plot_forecast(y_true, y_pred, *, backend: str = DEFAULT_BACKEND,
                  washout: int = 0, title: str | None = None,
                  max_points: int = 2000):
    """Overlay predicted vs. actual, with the NRMSE in the title."""
    from .metrics import nrmse

    _check_backend(backend)
    yt = np.asarray(y_true, dtype=float).ravel()[washout:]
    yp = np.asarray(y_pred, dtype=float).ravel()[washout:]
    err = nrmse(yt, yp)
    if len(yt) > max_points:  # keep the figure light
        yt, yp = yt[:max_points], yp[:max_points]
    t = np.arange(len(yt))
    title = title or f"Forecast vs. actual (NRMSE = {err:.3f})"

    if backend == "plotly":
        go = _plotly()
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=t, y=yt, mode="lines", name="actual",
                                 line=dict(color="#333333", width=1)))
        fig.add_trace(go.Scatter(x=t, y=yp, mode="lines", name="forecast",
                                 line=dict(color="#4C72B0", width=1)))
        fig.update_layout(title=title, template="plotly_white",
                          xaxis_title="time step", yaxis_title="value")
        return fig

    plt = _matplotlib(backend)
    fig, ax = plt.subplots(figsize=(8, 3.5))
    ax.plot(t, yt, color="#333333", lw=0.9, label="actual")
    ax.plot(t, yp, color="#4C72B0", lw=0.9, alpha=0.85, label="forecast")
    ax.set_xlabel("time step")
    ax.set_ylabel("value")
    ax.set_title(title)
    ax.legend()
    fig.tight_layout()
    return fig

save

save(fig, path: str) -> str

Save a figure produced by this module.

Plotly figures are written to .html (interactive) or, if the extension is an image format and kaleido is installed, to a static image. Matplotlib figures are written with savefig.

Source code in esnfed/viz.py
def save(fig, path: str) -> str:
    """Save a figure produced by this module.

    Plotly figures are written to ``.html`` (interactive) or, if the extension
    is an image format and ``kaleido`` is installed, to a static image.
    Matplotlib figures are written with ``savefig``.
    """
    path = str(path)
    if type(fig).__module__.startswith("plotly"):
        if path.lower().endswith(".html"):
            fig.write_html(path)
        else:
            fig.write_image(path)  # needs kaleido
    else:
        fig.savefig(path, bbox_inches="tight", dpi=200)
    return path

esnfed.llm_orchestration

FedResPrompt - Federated Reservoir Prompt Orchestration (experimental).

A split-federated architecture in which an Echo State Network acts as an ultra-lightweight prompt controller at the edge. The reservoir maps local context to a small "bottleneck" vector, which a linear projection lifts into a language model's embedding space to form a soft prompt. Only the soft-prompt embedding (uplink) and its loss gradient (downlink) cross the client-server boundary; the heavy language model lives on the server.

local context --ESN--> state z --W_out--> bottleneck b --P--> soft prompt p
                                                                   |  (uplink)
                                                                   v
                                          server LM: loss + dL/dp  |  (downlink)
                                                                   v
update P and W_out locally  <--- backprop dL/dp through P and W_out

Why it matters. Classical federated LLM tuning (e.g. federated LoRA) ships adapter weights for every layer each round; FedResPrompt ships only a single soft-prompt vector and its gradient, which is orders of magnitude smaller, and keeps the forward/backward pass of the frozen model on the server, off the edge device. The communication and edge-compute savings are analysed in experiments/exp7_fedres_prompt.py.

The server language model is pluggable. By default a lightweight NumPy surrogate is used --- a frozen linear soft prompt -> vocabulary head with the exact cross-entropy gradient with respect to the prompt --- so the architecture runs anywhere with only NumPy. If transformers (with a torch backend) is installed, a real AutoModelForCausalLM can be plugged in via :class:TransformersLM (the gradient w.r.t. the input embedding is obtained from the autograd backward pass).

This module is experimental and optional: pip install "esnfed[llm]".

BottleneckProjection

BottleneckProjection(k: int, d: int, n_tokens: int = 1, *, seed: int = 0, scale: float = 0.1)

Linear projection from the bottleneck space (R^k) to the LM embedding space (R^{n_tokens x d}), trained by SGD.

The soft prompt is p = P b reshaped to (n_tokens, d).

Source code in esnfed/llm_orchestration.py
def __init__(self, k: int, d: int, n_tokens: int = 1, *, seed: int = 0,
             scale: float = 0.1):
    rng = np.random.default_rng(seed)
    self.k = k
    self.d = d
    self.n_tokens = n_tokens
    self.P = scale * rng.standard_normal((n_tokens * d, k))

forward

forward(b: ndarray) -> np.ndarray

b: (k,) -> soft prompt (n_tokens, d).

Source code in esnfed/llm_orchestration.py
def forward(self, b: np.ndarray) -> np.ndarray:
    """b: (k,) -> soft prompt (n_tokens, d)."""
    self._b = b
    return (self.P @ b).reshape(self.n_tokens, self.d)

backward

backward(grad_prompt: ndarray)

Given dL/dp (n_tokens, d), return dL/db (k,) and store dL/dP.

Source code in esnfed/llm_orchestration.py
def backward(self, grad_prompt: np.ndarray):
    """Given dL/dp (n_tokens, d), return dL/db (k,) and store dL/dP."""
    g = np.asarray(grad_prompt, float).reshape(-1)  # (n_tokens*d,)
    self.grad_P = np.outer(g, self._b)  # (n_tokens*d, k)
    return self.P.T @ g  # dL/db, shape (k,)

SurrogateLM

SurrogateLM(vocab_size: int, d: int, *, seed: int = 0)

A lightweight, frozen stand-in for a server-side language model.

A single linear head U maps a (mean-pooled) soft prompt to logits over a small vocabulary. It is frozen (as the LLM is in prompt tuning) and returns the cross-entropy loss together with the exact gradient with respect to the prompt embedding -- exactly the signal a real CausalLM would back-propagate to its input embeddings.

Source code in esnfed/llm_orchestration.py
def __init__(self, vocab_size: int, d: int, *, seed: int = 0):
    rng = np.random.default_rng(seed)
    self.vocab_size = vocab_size
    self.d = d
    self.U = rng.standard_normal((vocab_size, d)) / np.sqrt(d)  # frozen

loss_and_grad

loss_and_grad(prompt: ndarray, target: int)

prompt: (n_tokens, d) -> (loss, dL/dprompt with same shape).

Source code in esnfed/llm_orchestration.py
def loss_and_grad(self, prompt: np.ndarray, target: int):
    """prompt: (n_tokens, d) -> (loss, dL/dprompt with same shape)."""
    p = np.atleast_2d(prompt)
    h = p.mean(axis=0)  # mean-pool the prompt tokens -> (d,)
    logits = self.U @ h  # (V,)
    logits -= logits.max()
    probs = np.exp(logits)
    probs /= probs.sum()
    loss = -np.log(probs[target] + 1e-12)
    grad_h = self.U.T @ (probs - _onehot(target, self.vocab_size))  # (d,)
    grad_prompt = np.tile(grad_h / p.shape[0], (p.shape[0], 1))  # (n_tokens, d)
    return float(loss), grad_prompt

EdgeClient dataclass

EdgeClient(esn: EchoStateNetwork, bottleneck_dim: int, embed_dim: int, n_prompt_tokens: int = 1, seed: int = 0, lr: float = 0.05)

An edge device: a (frozen) reservoir + a trainable readout and projection.

The reservoir and its input weights are fixed; only W_out (state -> bottleneck) and the :class:BottleneckProjection are learned, by back-propagating the gradient the server returns.

apply_server_gradient

apply_server_gradient(grad_prompt: ndarray)

Back-propagate dL/dp through P and W_out and take an SGD step.

Source code in esnfed/llm_orchestration.py
def apply_server_gradient(self, grad_prompt: np.ndarray):
    """Back-propagate dL/dp through P and W_out and take an SGD step."""
    self.bytes_down += np.asarray(grad_prompt).size * FLOAT_BYTES  # downlink
    grad_b = self.projection.backward(grad_prompt)  # (k,)
    grad_W = np.outer(grad_b, self._z)  # (k, readout_dim)
    self.projection.step(self.lr)
    self.W_out -= self.lr * grad_W

Server dataclass

Server(lm: SurrogateLM)

The server hosting the (frozen) language model.

evaluate

evaluate(prompt: ndarray, target: int)

Run the LM forward/backward; return (loss, dL/dprompt).

Source code in esnfed/llm_orchestration.py
def evaluate(self, prompt: np.ndarray, target: int):
    """Run the LM forward/backward; return (loss, dL/dprompt)."""
    self.bytes_up += prompt.size * FLOAT_BYTES  # received from client
    loss, grad_prompt = self.lm.loss_and_grad(prompt, target)
    self.bytes_down += grad_prompt.size * FLOAT_BYTES  # sent to client
    self.flops += self.lm.forward_flops(prompt.shape[0])
    return loss, grad_prompt

TransformersLM

TransformersLM(model_name: str = 'Qwen/Qwen2.5-0.5B')

A real, frozen server-side CausalLM (optional), e.g. a small Qwen.

The soft prompt is fed as inputs_embeds; the next-token logits after the prompt give the cross-entropy against a target token id, and the gradient with respect to the prompt is read from the autograd backward pass --- the same interface as :class:SurrogateLM, so it is a drop-in for :class:Server.

Requires transformers and torch (pip install "esnfed[llm]" plus a torch build). Tested with Qwen/Qwen2.5-0.5B.

Source code in esnfed/llm_orchestration.py
def __init__(self, model_name: str = "Qwen/Qwen2.5-0.5B"):
    try:
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
    except ImportError as e:
        raise ImportError(
            "TransformersLM needs `transformers` and `torch`; "
            "install esnfed[llm] (and torch). The default SurrogateLM has no "
            "such requirement."
        ) from e
    self._torch = torch
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.model = AutoModelForCausalLM.from_pretrained(model_name)
    self.model.float()  # fp32 on CPU
    self.model.eval()
    for param in self.model.parameters():
        param.requires_grad_(False)  # frozen LLM
    self.d = self.model.get_input_embeddings().embedding_dim
    self.vocab_size = self.model.config.vocab_size

loss_and_grad

loss_and_grad(prompt: ndarray, target: int)

Next-token cross-entropy against token id target and dL/dprompt.

Source code in esnfed/llm_orchestration.py
def loss_and_grad(self, prompt: np.ndarray, target: int):
    """Next-token cross-entropy against token id ``target`` and dL/dprompt."""
    torch = self._torch
    p_leaf, logits = self._logits_tensor(prompt)
    loss = torch.nn.functional.cross_entropy(
        logits, torch.tensor([int(target)]))
    loss.backward()
    grad_prompt = p_leaf.grad.detach().to(torch.float32).numpy()
    return float(loss.item()), grad_prompt

restricted_loss_and_grad

restricted_loss_and_grad(prompt: ndarray, class_ids, target_idx: int)

Cross-entropy over only the candidate class tokens (the standard way to do classification with an LLM) and dL/dprompt. target_idx indexes into class_ids.

Source code in esnfed/llm_orchestration.py
def restricted_loss_and_grad(self, prompt: np.ndarray, class_ids, target_idx: int):
    """Cross-entropy over only the candidate class tokens (the standard way to
    do classification with an LLM) and dL/dprompt. ``target_idx`` indexes
    into ``class_ids``."""
    torch = self._torch
    p_leaf, logits = self._logits_tensor(prompt)
    sub = logits[0, list(class_ids)].unsqueeze(0)  # (1, K)
    loss = torch.nn.functional.cross_entropy(sub, torch.tensor([int(target_idx)]))
    loss.backward()
    return float(loss.item()), p_leaf.grad.detach().to(torch.float32).numpy()

logits

logits(prompt: ndarray) -> np.ndarray

Next-token logit vector (numpy) for the given soft prompt.

Source code in esnfed/llm_orchestration.py
def logits(self, prompt: np.ndarray) -> np.ndarray:
    """Next-token logit vector (numpy) for the given soft prompt."""
    with self._torch.no_grad():
        _, logits = self._logits_tensor(prompt)
    return logits.squeeze(0).numpy()

split_federated_step

split_federated_step(client: EdgeClient, server: Server, context: ndarray, target: int) -> float

One split-federated example: client builds prompt, server scores it, client updates from the returned gradient. Returns the loss.

Source code in esnfed/llm_orchestration.py
def split_federated_step(client: EdgeClient, server: Server,
                         context: np.ndarray, target: int) -> float:
    """One split-federated example: client builds prompt, server scores it,
    client updates from the returned gradient. Returns the loss."""
    prompt = client.make_prompt(context)
    loss, grad_prompt = server.evaluate(prompt, target)
    client.apply_server_gradient(grad_prompt)
    return loss

fedresprompt_bytes_per_round

fedresprompt_bytes_per_round(d_model: int, n_prompt_tokens: int = 1, dtype_bytes: int = FLOAT_BYTES) -> int

Client<->server bytes per example: soft prompt up + its gradient down.

Source code in esnfed/llm_orchestration.py
def fedresprompt_bytes_per_round(d_model: int, n_prompt_tokens: int = 1,
                                 dtype_bytes: int = FLOAT_BYTES) -> int:
    """Client<->server bytes per example: soft prompt up + its gradient down."""
    return 2 * n_prompt_tokens * d_model * dtype_bytes

fedlora_bytes_per_round

fedlora_bytes_per_round(d_model: int, n_layers: int, rank: int, adapters_per_layer: int = 2, dtype_bytes: int = FLOAT_BYTES) -> int

Bytes per round for federated LoRA: adapter weights up + aggregate down.

Each adapted projection contributes two low-rank factors of size rank x d_model (A and B), so 2 * rank * d_model parameters; there are adapters_per_layer of them per layer (e.g. query and value).

Source code in esnfed/llm_orchestration.py
def fedlora_bytes_per_round(d_model: int, n_layers: int, rank: int,
                            adapters_per_layer: int = 2,
                            dtype_bytes: int = FLOAT_BYTES) -> int:
    """Bytes per round for federated LoRA: adapter weights up + aggregate down.

    Each adapted projection contributes two low-rank factors of size
    ``rank x d_model`` (A and B), so ``2 * rank * d_model`` parameters; there are
    ``adapters_per_layer`` of them per layer (e.g. query and value).
    """
    params = n_layers * adapters_per_layer * 2 * rank * d_model
    return 2 * params * dtype_bytes  # upload + download

llm_flops

llm_flops(n_params: int, tokens: int) -> int

Standard estimate: forward ~2PT, backward ~4PT -> 6PT.

Source code in esnfed/llm_orchestration.py
def llm_flops(n_params: int, tokens: int) -> int:
    """Standard estimate: forward ~2*P*T, backward ~4*P*T -> 6*P*T."""
    return 6 * n_params * tokens

esn_edge_flops

esn_edge_flops(n_reservoir: int, steps: int, readout_dim: int, bottleneck_dim: int) -> int

Edge cost of FedResPrompt: reservoir run + readout (no LLM on device).

Source code in esnfed/llm_orchestration.py
def esn_edge_flops(n_reservoir: int, steps: int, readout_dim: int,
                   bottleneck_dim: int) -> int:
    """Edge cost of FedResPrompt: reservoir run + readout (no LLM on device)."""
    return steps * n_reservoir * n_reservoir + bottleneck_dim * readout_dim