|
| 1 | +# SPDX-License-Identifier: MIT |
| 2 | +"""T5 — Coulomb Electrostatic Interaction for Data Ingestion. |
| 3 | +
|
| 4 | +Define charge: |
| 5 | + q_i(t) = OFI_i(t) / σ(OFI_i, 30) # normalised order flow imbalance |
| 6 | + positive charge = net buying pressure |
| 7 | + negative charge = net selling pressure |
| 8 | +
|
| 9 | +Coulomb force: |
| 10 | + F_ij = k · q_i · q_j / r_ij² |
| 11 | + r_ij = correlation distance (same metric as T1) |
| 12 | + k = normalisation s.t. |F_ij| ∈ [0, 1] |
| 13 | +
|
| 14 | +AUDIT RESOLUTION — sign convention: |
| 15 | + F_ij > 0 (same sign charges) → repulsion. |
| 16 | + In markets: same-direction OFI = momentum, which implies ATTRACTION. |
| 17 | + Therefore we FLIP the sign: F_market = -F_coulomb. |
| 18 | +
|
| 19 | + Attraction (opposite signs) → convergent behaviour. |
| 20 | + Repulsion (same signs in Coulomb) → but in market context same-direction |
| 21 | + flow means co-movement → attraction, so we invert. |
| 22 | +
|
| 23 | +Adjacency update: |
| 24 | + A_ij(t) = clip(A_ij(t-1) + α · F_market_ij, 0, 1) |
| 25 | + α = 0.1 (slow adaptation prevents overfitting to noise) |
| 26 | +""" |
| 27 | + |
| 28 | +from __future__ import annotations |
| 29 | + |
| 30 | +import numpy as np |
| 31 | +from numpy.typing import NDArray |
| 32 | + |
| 33 | + |
| 34 | +class CoulombInteraction: |
| 35 | + """Coulomb-inspired electrostatic interaction for market networks. |
| 36 | +
|
| 37 | + Parameters |
| 38 | + ---------- |
| 39 | + alpha : float |
| 40 | + Learning rate for adjacency update (default 0.1). |
| 41 | + lookback : int |
| 42 | + Lookback for OFI normalisation std (default 30). |
| 43 | + """ |
| 44 | + |
| 45 | + def __init__(self, alpha: float = 0.1, lookback: int = 30) -> None: |
| 46 | + if not 0 < alpha <= 1: |
| 47 | + raise ValueError(f"alpha must be in (0, 1], got {alpha}") |
| 48 | + if lookback < 1: |
| 49 | + raise ValueError(f"lookback must be ≥ 1, got {lookback}") |
| 50 | + self._alpha = alpha |
| 51 | + self._lookback = lookback |
| 52 | + |
| 53 | + @property |
| 54 | + def alpha(self) -> float: |
| 55 | + return self._alpha |
| 56 | + |
| 57 | + def compute_charges( |
| 58 | + self, ofi_matrix: NDArray[np.float64] |
| 59 | + ) -> NDArray[np.float64]: |
| 60 | + """Normalised charges q_i = OFI_i / σ(OFI_i). |
| 61 | +
|
| 62 | + Parameters |
| 63 | + ---------- |
| 64 | + ofi_matrix : (T, N) array |
| 65 | + Order flow imbalance time series for N assets. |
| 66 | +
|
| 67 | + Returns |
| 68 | + ------- |
| 69 | + (N,) array of current charges. |
| 70 | + """ |
| 71 | + ofi = np.asarray(ofi_matrix, dtype=np.float64) |
| 72 | + if ofi.ndim != 2: |
| 73 | + raise ValueError(f"Expected 2-D array (T, N), got ndim={ofi.ndim}") |
| 74 | + |
| 75 | + tail = ofi[-self._lookback:] |
| 76 | + current = ofi[-1] |
| 77 | + sigma = np.std(tail, axis=0) |
| 78 | + sigma = np.maximum(sigma, 1e-12) |
| 79 | + return current / sigma |
| 80 | + |
| 81 | + @staticmethod |
| 82 | + def compute_forces( |
| 83 | + charges: NDArray[np.float64], |
| 84 | + correlation_distances: NDArray[np.float64], |
| 85 | + ) -> NDArray[np.float64]: |
| 86 | + """Compute Coulomb force matrix with market sign convention. |
| 87 | +
|
| 88 | + F_market_ij = -k · q_i · q_j / r_ij² |
| 89 | +
|
| 90 | + The negative sign flips Coulomb convention: |
| 91 | + same-sign OFI → negative F_market → attraction (co-movement). |
| 92 | +
|
| 93 | + Forces are normalised to [-1, 1]. |
| 94 | + """ |
| 95 | + charges = np.asarray(charges, dtype=np.float64) |
| 96 | + distances = np.asarray(correlation_distances, dtype=np.float64) |
| 97 | + n = charges.shape[0] |
| 98 | + |
| 99 | + if distances.shape != (n, n): |
| 100 | + raise ValueError( |
| 101 | + f"distances must be ({n}, {n}), got {distances.shape}" |
| 102 | + ) |
| 103 | + |
| 104 | + charge_product = np.outer(charges, charges) |
| 105 | + dist_safe = np.maximum(distances, 1e-6) |
| 106 | + F_coulomb = charge_product / (dist_safe ** 2) |
| 107 | + |
| 108 | + # Flip sign: same-direction OFI = market attraction |
| 109 | + F_market = -F_coulomb |
| 110 | + np.fill_diagonal(F_market, 0.0) |
| 111 | + |
| 112 | + # Normalise to [-1, 1] |
| 113 | + max_abs = np.max(np.abs(F_market)) |
| 114 | + if max_abs > 0: |
| 115 | + F_market = F_market / max_abs |
| 116 | + |
| 117 | + return F_market |
| 118 | + |
| 119 | + def update_adjacency( |
| 120 | + self, |
| 121 | + A: NDArray[np.float64], |
| 122 | + forces: NDArray[np.float64], |
| 123 | + ) -> NDArray[np.float64]: |
| 124 | + """Update adjacency matrix with Coulomb forces. |
| 125 | +
|
| 126 | + A_ij(t) = clip(A_ij(t-1) + α · F_ij, 0, 1) |
| 127 | +
|
| 128 | + Parameters |
| 129 | + ---------- |
| 130 | + A : (N, N) current adjacency matrix. |
| 131 | + forces : (N, N) normalised force matrix from compute_forces. |
| 132 | +
|
| 133 | + Returns |
| 134 | + ------- |
| 135 | + Updated (N, N) adjacency matrix. |
| 136 | + """ |
| 137 | + A = np.asarray(A, dtype=np.float64) |
| 138 | + forces = np.asarray(forces, dtype=np.float64) |
| 139 | + |
| 140 | + if A.shape != forces.shape: |
| 141 | + raise ValueError( |
| 142 | + f"A and forces must match: {A.shape} vs {forces.shape}" |
| 143 | + ) |
| 144 | + |
| 145 | + A_new = A + self._alpha * forces |
| 146 | + A_new = np.clip(A_new, 0.0, 1.0) |
| 147 | + np.fill_diagonal(A_new, 0.0) |
| 148 | + return A_new |
| 149 | + |
| 150 | + |
| 151 | +__all__ = ["CoulombInteraction"] |
0 commit comments